如何确保多级缓存的一致性?
【摘要】 思考对于这种一致性问题,可以使用通用的方案,也就是当修改数据库中的数据后,将对应的缓存清空,Redis的缓存好办,可以直接删除掉但是本地缓存就会有个问题,如果存在多实例,那么要怎么处理?就拿节目服务来说,假设线上部署了5个实例节点,经过一段时间运行后,每个实例都有了自己的本地缓存,那么如果进行了数据的修改操作后,就要将这5个实例节点的数据都清空那么要如何通知这5个节点呢?可以有这几种方式:定...
思考
对于这种一致性问题,可以使用通用的方案,也就是当修改数据库中的数据后,将对应的缓存清空,Redis的缓存好办,可以直接删除掉
但是本地缓存就会有个问题,如果存在多实例,那么要怎么处理?
就拿节目服务来说,假设线上部署了5个实例节点,经过一段时间运行后,每个实例都有了自己的本地缓存,那么如果进行了数据的修改操作后,就要将这5个实例节点的数据都清空
那么要如何通知这5个节点呢?可以有这几种方式:
- 定时任务查询 定时从库中扫描失效的数据,对于已经失效的数据就在缓存中删除。这种只能应对简单而且数据量小的业务,而且不好估算定时任务的执行时间,频率高了对数据库的压力很大,频率低了缓存又不及时被清除,而且假如某段时间没有修改数据或者主动要失效的操作,那么就白执行了。而且这种多实例的情况,就只能每个实例都要查询一遍数据库,属实没有必要
- 使用MQ消息中间件(Kafka、RocketMQ、RabbitMQ) 这种方式确实是可以,但是要思考如果之前没有引入MQ的话难道要为了这个功能特意引入额外的中间件吗?另外一种情况就是如果确实在使用MQ,MQ上又都是比较重要的业务在使用,是否有必要在这个轻量级的功能上使用MQ,说白了 就是一个取舍问题,因为这个功能是比较轻量级的,就算通知有延迟也没关系,顶多就是缓存中没有清掉呗,还有个过期时间来兜底呢
- Redis的PUB/SUB,订阅/发布模式 这种发布订阅模式有个致命的问题就是没有办法进行持久化的,如果出现网络断开、Redis宕机的话,消息就会丢失,这种也不是很推荐
- Redis的Stream 可以理解成是Redis对消息队列MQ的完善实现,支持分组消费和广播消费,并且可以将消息进行持久化
而在项目中对于这种多实例清除本地缓存的业务,使用Redis的Stream是比较适合的,主要原因有以下几点:
- 这种通知清除缓存的功能是比较轻量级的,不是很频繁的操作,不像MQ那样是专门为了解决高并发下的问题,所以使用RedisStream就完全足够
- 使用RedisStream只需要连接Redis即可,而基本每个项目都需要Redis,这样就不需要再额外引入中间件,没有额外的部署成本
- RedisStream能够将数据保存到磁盘,以确保数据不会丢失
- 另外也是为了大家学习RedisStream,为了方便以后工作需要的话可以直接使用
-
RedisStream相关配置
spring: data: # redis相关配置 redis: database: 0 host: 127.0.0.1 port: 6379 timeout: 3000 # redisStream相关配置 stream: # stream的key streamName: invalid_program # 消费类型:广播 consumerType: broadcast
配置广播类型的消息消费,可以实现多个实例节点都能收到消息
接口
com.damai.controller.ProgramController#invalid
@ApiOperation(value = "节目失效(根据id)") @PostMapping(value = "/invalid") public ApiResponse<Boolean> invalid(@Valid @RequestBody ProgramInvalidDto programInvalidDto) { return ApiResponse.ok(programService.invalid(programInvalidDto)); }
public Boolean invalid(final ProgramInvalidDto programInvalidDto) { Program program = new Program(); program.setId(programInvalidDto.getId()); //修改数据库中的节目状态为下线状态 program.setProgramStatus(BusinessStatus.NO.getCode()); int result = programMapper.updateById(program); if (result > 0) { //删除Redis的缓存 delRedisData(programInvalidDto.getId()); //向RedisStream发送消息 redisStreamPushHandler.push(String.valueOf(programInvalidDto.getId())); //删除elasticsearch中的数据 programEs.deleteByProgramId(programInvalidDto.getId()); return true; }else { return false; } }
删除Redis的缓存
public void delRedisData(Long programId){ Program program = Optional.ofNullable(programMapper.selectById(programId)) .orElseThrow(() -> new DaMaiFrameException(BaseCode.PROGRAM_NOT_EXIST)); redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM,programId)); redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP,program.getProgramGroupId())); redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME,programId)); redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_NO_SOLD_HASH, programId)); redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_LOCK_HASH, programId)); redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SEAT_SOLD_HASH, programId)); redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_CATEGORY_LIST, programId)); redisCache.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_TICKET_REMAIN_NUMBER_HASH, programId)); }
删除Redis的缓存数据,包括:节目、节目分组、节目演出时间、节目座位(未售卖、锁定中、已售卖)、票档、余票数量
向RedisStream发送消息
redisStreamPushHandler.push(String.valueOf(programInvalidDto.getId()));
监听RedisStream消息
@Slf4j @Component public class ProgramRedisStreamConsumer implements MessageConsumer { @Autowired private ProgramService programService; @Override public void accept(ObjectRecord<String, String> message) { Long programId = Long.parseLong(message.getValue()); programService.delLocalCache(programId); } }
public void delLocalCache(Long programId){ log.info("删除本地缓存 programId : {}",programId); localCacheProgram.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM, programId).getRelKey()); localCacheProgramGroup.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_GROUP, programId).getRelKey()); localCacheProgramShowTime.del(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_SHOW_TIME, programId).getRelKey()); localCacheTicketCategory.del(programId); }
处理逻辑也是比较简单,监听到消息后,直接从本地缓存删除
删除elasticsearch中的数据
public void deleteByProgramId(Long programId){ try { //通过节目id查询elasticsearch中的数据 List<EsDataQueryDto> esDataQueryDtoList = new ArrayList<>(); EsDataQueryDto programIdDto = new EsDataQueryDto(); programIdDto.setParamName(ProgramDocumentParamName.ID); programIdDto.setParamValue(programId); esDataQueryDtoList.add(programIdDto); List<ProgramListVo> programListVos = businessEsHandle.query( SpringUtil.getPrefixDistinctionName() + "-" + ProgramDocumentParamName.INDEX_NAME, ProgramDocumentParamName.INDEX_TYPE, esDataQueryDtoList, ProgramListVo.class); //如果数据存在,则通过文档id删除 if (CollectionUtil.isNotEmpty(programListVos)) { for (ProgramListVo programListVo : programListVos) { businessEsHandle.deleteByDocumentId( SpringUtil.getPrefixDistinctionName() + "-" + ProgramDocumentParamName.INDEX_NAME, programListVo.getEsId()); } } }catch (Exception e) { log.error("deleteByProgramId error",e); } }
流程是先通过节目id查询elasticsearch中的数据,在此方法中,ProgramListVo对象设置了字段 esId
@Data @ApiModel(value="ProgramListVo", description ="节目列表") public class ProgramListVo { /** * es中的文档id * */ private String esId; }
使用封装的组件 businessEsHandle.query 的方法查询后,会自动的将elasticsearch中的文档id映射到ProgramListVo中的esId字段上
当获取到数据后,就可以通过文档id将数据删除了,这里同样使用封装的组件 businessEsHandle.deleteByDocumentId 方法
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)