如何确保多级缓存的一致性?

举报
一颗小谷粒 发表于 2024/11/27 15:34:23 2024/11/27
【摘要】 思考对于这种一致性问题,可以使用通用的方案,也就是当修改数据库中的数据后,将对应的缓存清空,Redis的缓存好办,可以直接删除掉但是本地缓存就会有个问题,如果存在多实例,那么要怎么处理?就拿节目服务来说,假设线上部署了5个实例节点,经过一段时间运行后,每个实例都有了自己的本地缓存,那么如果进行了数据的修改操作后,就要将这5个实例节点的数据都清空那么要如何通知这5个节点呢?可以有这几种方式:定...

思考

对于这种一致性问题,可以使用通用的方案,也就是当修改数据库中的数据后,将对应的缓存清空,Redis的缓存好办,可以直接删除掉

但是本地缓存就会有个问题,如果存在多实例,那么要怎么处理?

就拿节目服务来说,假设线上部署了5个实例节点,经过一段时间运行后,每个实例都有了自己的本地缓存,那么如果进行了数据的修改操作后,就要将这5个实例节点的数据都清空

那么要如何通知这5个节点呢?可以有这几种方式:

  1. 定时任务查询 定时从库中扫描失效的数据,对于已经失效的数据就在缓存中删除。这种只能应对简单而且数据量小的业务,而且不好估算定时任务的执行时间,频率高了对数据库的压力很大,频率低了缓存又不及时被清除,而且假如某段时间没有修改数据或者主动要失效的操作,那么就白执行了。而且这种多实例的情况,就只能每个实例都要查询一遍数据库,属实没有必要
  2. 使用MQ消息中间件(Kafka、RocketMQ、RabbitMQ) 这种方式确实是可以,但是要思考如果之前没有引入MQ的话难道要为了这个功能特意引入额外的中间件吗?另外一种情况就是如果确实在使用MQ,MQ上又都是比较重要的业务在使用,是否有必要在这个轻量级的功能上使用MQ,说白了 就是一个取舍问题,因为这个功能是比较轻量级的,就算通知有延迟也没关系,顶多就是缓存中没有清掉呗,还有个过期时间来兜底呢
  3. Redis的PUB/SUB,订阅/发布模式 这种发布订阅模式有个致命的问题就是没有办法进行持久化的,如果出现网络断开、Redis宕机的话,消息就会丢失,这种也不是很推荐
  4. Redis的Stream 可以理解成是Redis对消息队列MQ的完善实现,支持分组消费和广播消费,并且可以将消息进行持久化


而在项目中对于这种多实例清除本地缓存的业务,使用Redis的Stream是比较适合的,主要原因有以下几点:

  1. 这种通知清除缓存的功能是比较轻量级的,不是很频繁的操作,不像MQ那样是专门为了解决高并发下的问题,所以使用RedisStream就完全足够
  2. 使用RedisStream只需要连接Redis即可,而基本每个项目都需要Redis,这样就不需要再额外引入中间件,没有额外的部署成本
  3. RedisStream能够将数据保存到磁盘,以确保数据不会丢失
  4. 另外也是为了大家学习RedisStream,为了方便以后工作需要的话可以直接使用
  5. RedisStream相关配置

    spring:
      data:
        # redis相关配置
        redis:
          database: 0
          host: 127.0.0.1
          port: 6379
          timeout: 3000
          # redisStream相关配置
          stream:
            # stream的key
            streamName: invalid_program
            # 消费类型:广播
            consumerType: broadcast

    配置广播类型的消息消费,可以实现多个实例节点都能收到消息

    接口

    com.damai.controller.ProgramController#invalid

    @ApiOperation(value = "节目失效(根据id)")
    @PostMapping(value = "/invalid")
    public ApiResponse<Boolean> invalid(@Valid @RequestBody ProgramInvalidDto programInvalidDto) {
        return ApiResponse.ok(programService.invalid(programInvalidDto));
    }
    public Boolean invalid(final ProgramInvalidDto programInvalidDto) {
        Program program = new Program();
        program.setId(programInvalidDto.getId());
        //修改数据库中的节目状态为下线状态
        program.setProgramStatus(BusinessStatus.NO.getCode());
        int result = programMapper.updateById(program);
        if (result > 0) {
            //删除Redis的缓存
            delRedisData(programInvalidDto.getId());
            //向RedisStream发送消息
            redisStreamPushHandler.push(String.valueOf(programInvalidDto.getId()));
            //删除elasticsearch中的数据
            programEs.deleteByProgramId(programInvalidDto.getId());
            return true;
        }else {
            return false;
        }
    }

    删除Redis的缓存

    public void delRedisData(Long programId){
        Program program = Optional.ofNullable(programMapper.selectById(programId))
                .orElseThrow(() -> new DaMaiFrameException(BaseCode.PROGRAM_NOT_EXIST));
        redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM,programId));
        redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP,program.getProgramGroupId()));
        redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME,programId));
        redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_NO_SOLD_HASH, programId));
        redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_LOCK_HASH, programId));
        redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_SOLD_HASH, programId));
        redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_CATEGORY_LIST, programId));
        redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_REMAIN_NUMBER_HASH, programId));
    }

    删除Redis的缓存数据,包括:节目、节目分组、节目演出时间、节目座位(未售卖、锁定中、已售卖)、票档、余票数量

    向RedisStream发送消息

    redisStreamPushHandler.push(String.valueOf(programInvalidDto.getId()));


    监听RedisStream消息

    @Slf4j
    @Component
    public class ProgramRedisStreamConsumer implements MessageConsumer {
        
        @Autowired
        private ProgramService programService;
        
        @Override
        public void accept(ObjectRecord<String, String> message) {
            Long programId = Long.parseLong(message.getValue());
            programService.delLocalCache(programId);
        }
    }
    public void delLocalCache(Long programId){
        log.info("删除本地缓存 programId : {}",programId);
        localCacheProgram.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM, programId).getRelKey());
        localCacheProgramGroup.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP, programId).getRelKey());
        localCacheProgramShowTime.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME, programId).getRelKey());
        localCacheTicketCategory.del(programId);
    }

    处理逻辑也是比较简单,监听到消息后,直接从本地缓存删除


    删除elasticsearch中的数据

    public void deleteByProgramId(Long programId){
        try {
            //通过节目id查询elasticsearch中的数据
            List<EsDataQueryDto> esDataQueryDtoList = new ArrayList<>();
            EsDataQueryDto programIdDto = new EsDataQueryDto();
            programIdDto.setParamName(ProgramDocumentParamName.ID);
            programIdDto.setParamValue(programId);
            esDataQueryDtoList.add(programIdDto);
            
            List<ProgramListVo> programListVos = 
                    businessEsHandle.query(
                            SpringUtil.getPrefixDistinctionName() + "-" + ProgramDocumentParamName.INDEX_NAME, 
                            ProgramDocumentParamName.INDEX_TYPE, 
                            esDataQueryDtoList, 
                            ProgramListVo.class);
            //如果数据存在,则通过文档id删除                
            if (CollectionUtil.isNotEmpty(programListVos)) {
                for (ProgramListVo programListVo : programListVos) {
                    businessEsHandle.deleteByDocumentId(
                            SpringUtil.getPrefixDistinctionName() + "-" + ProgramDocumentParamName.INDEX_NAME,
                            programListVo.getEsId());
                }
            }
        }catch (Exception e) {
            log.error("deleteByProgramId error",e);
        }
    }

    流程是先通过节目id查询elasticsearch中的数据,在此方法中,ProgramListVo对象设置了字段 esId

    @Data
    @ApiModel(value="ProgramListVo", description ="节目列表")
    public class ProgramListVo {
        
        /**
         * es中的文档id
         * */
        private String esId;
    }
    

    使用封装的组件 businessEsHandle.query 的方法查询后,会自动的将elasticsearch中的文档id映射到ProgramListVo中的esId字段上


    当获取到数据后,就可以通过文档id将数据删除了,这里同样使用封装的组件 businessEsHandle.deleteByDocumentId 方法



【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。