如何保障节目数据在缓存与数据库间的一致性

举报
幼儿园老大* 发表于 2024/11/27 15:48:17 2024/11/27
【摘要】 户购票生成订单过程 把缓存中的余票数量扣除了,缓存中的座位从未售卖修改为锁定中 支付回调过程把缓存中的座位从锁定中修改为已售卖 之所以把这些数据存到缓存中,是因为缓存的效率执行起来比数据库要快的多,但有没有注意到,从始至终都是操作的缓存,到现在都没有关于数据库的余票数量和座位的操作啊而本文就是要介绍数据库的状态是什么时候更新的,缓存和数据库的一致性要如何保证更新数据库中的余票数量和座位状态...
  • 户购票生成订单过程 把缓存中的余票数量扣除了,缓存中的座位从未售卖修改为锁定中
  • 支付回调过程把缓存中的座位从锁定中修改为已售卖


之所以把这些数据存到缓存中,是因为缓存的效率执行起来比数据库要快的多,但有没有注意到,从始至终都是操作的缓存,到现在都没有关于数据库的余票数量和座位的操作啊


而本文就是要介绍数据库的状态是什么时候更新的,缓存和数据库的一致性要如何保证


更新数据库中的余票数量和座位状态


让我们再去看一下支付回调执行的到的 updateProgramRelatedDataResolution 方法


com.damai.service.OrderService#updateProgramRelatedDataResolution


public void updateProgramRelatedDataResolution(Long programId,List<String> seatIdList,OrderStatus orderStatus){
    
    //订单状态修改成了已支付 缓存中的余票扣除了,座位修改成已售卖了...
    
    if (Objects.equals(orderStatus.getCode(), OrderStatus.PAY.getCode())) {
        ProgramOperateDataDto programOperateDataDto = new ProgramOperateDataDto();
        programOperateDataDto.setProgramId(programId);
        //要将锁定修改已售卖的座位id集合
        programOperateDataDto.setSeatIdList(unLockSeatIdList);
        //票档数量
        programOperateDataDto.setTicketCategoryCountDtoList(ticketCategoryCountDtoList);
        //修改为已售卖状态
        programOperateDataDto.setSellStatus(SellStatus.SOLD.getCode());
        //放到延迟队列中
        delayOperateProgramDataSend.sendMessage(JSON.toJSONString(programOperateDataDto));
    }
}


com.damai.dto.ProgramOperateDataDto


@Data
@ApiModel(value="ProgramOperateDataDto", description ="节目数据操作")
public class ProgramOperateDataDto {
    
    @ApiModelProperty(name ="programId", dataType ="Long", value ="节目id",required = true)
    @NotNull
    private Long programId;
    
    @ApiModelProperty(name ="ticketCategoryCountMap", dataType ="List<TicketCategoryCountDto>",required = true)
    @NotNull
    private List<TicketCategoryCountDto> ticketCategoryCountDtoList;
    
    @ApiModelProperty(name ="seatIdList", dataType ="List<Long>", value ="座位id集合",required = true)
    @NotNull
    private List<Long> seatIdList;
    
    @ApiModelProperty(name ="sellStatus", dataType ="Long", value ="座位状态",required = true)
    @NotNull
    private Integer sellStatus;
}


com.damai.service.delaysend.DelayOperateProgramDataSend


@Slf4j
@Component
public class DelayOperateProgramDataSend {
    
    @Autowired
    private DelayQueueContext delayQueueContext;
    
    public void sendMessage(String message){
        try {
            delayQueueContext.sendMessage(SpringUtil.getPrefixDistinctionName() + "-" + DELAY_OPERATE_PROGRAM_DATA_TOPIC,
                    message, DELAY_OPERATE_PROGRAM_DATA_TIME, DELAY_OPERATE_PROGRAM_DATA_TIME_UNIT);
        }catch (Exception e) {
            log.error("send message error message : {}",message,e);
        }
        
    }
}


到这里就明确了,其实当支付回调执行把订单状态和缓存的数据都成功执行后,发送更新节目和座位的数据消息到延迟队列中,由节目服务来消费消息进行数据库中的更新,有小伙伴可能会想了,这使用延迟队列,缓存和数据库不就不能保证一致性了吗?这里先卖个关子,先继续介绍,下文中会有答案


我们再去节目服务查看,是如何消费消息的


节目服务消费消息更新数据库


消息监听器


com.damai.service.delayconsumer.DelayOperateProgramDataConsumer


@Slf4j
@Component
public class DelayOperateProgramDataConsumer implements ConsumerTask {
    
    @Autowired
    private ProgramService programService;
    
    @Override
    public void execute(String content) {
        log.info("延迟操作节目数据消息进行消费 content : {}", content);
        if (StringUtil.isEmpty(content)) {
            log.error("延迟队列消息不存在");
            return;
        }
        ProgramOperateDataDto programOperateDataDto = JSON.parseObject(content, ProgramOperateDataDto.class);
        programService.operateProgramData(programOperateDataDto);
    }
    
    @Override
    public String topic() {
        return SpringUtil.getPrefixDistinctionName() + "-" + DELAY_OPERATE_PROGRAM_DATA_TOPIC;
    }
}


com.damai.service.ProgramService#operateProgramData


@RepeatExecuteLimit(name = CANCEL_PROGRAM_ORDER,keys = {"#programOperateDataDto.programId","#programOperateDataDto.seatIdList"})
@Transactional(rollbackFor = Exception.class)
public void operateProgramData(ProgramOperateDataDto programOperateDataDto){
    List<TicketCategoryCountDto> ticketCategoryCountDtoList = programOperateDataDto.getTicketCategoryCountDtoList();
    //从库中查询座位集合
    List<Long> seatIdList = programOperateDataDto.getSeatIdList();
    //根据节目id和座位id查询座位集合	
    LambdaQueryWrapper<Seat> seatLambdaQueryWrapper = 
                Wrappers.lambdaQuery(Seat.class)
                        .eq(Seat::getProgramId,programOperateDataDto.getProgramId())
                        .in(Seat::getId, seatIdList);
    List<Seat> seatList = seatMapper.selectList(seatLambdaQueryWrapper);
    //如果库中的座位集合为空,则抛出异常
    if (CollectionUtil.isEmpty(seatList)) {
            throw new DaMaiFrameException(BaseCode.SEAT_NOT_EXIST);
    }
    //如果库中的座位集合数量和传入的座位数量不相同,则抛出异常
    if (seatList.size() != seatIdList.size()) {
        throw new DaMaiFrameException(BaseCode.SEAT_UPDATE_REL_COUNT_NOT_EQUAL_PRESET_COUNT);
    }
    for (Seat seat : seatList) {
        //如果库中的座位有一个已经是已售卖的状态,则抛出异常
        if (Objects.equals(seat.getSellStatus(), SellStatus.SOLD.getCode())) {
            throw new DaMaiFrameException(BaseCode.SEAT_SOLD);
        }
    }
    //将库中的座位集合批量更新为售卖状态
    LambdaUpdateWrapper<Seat> seatLambdaUpdateWrapper = 
                Wrappers.lambdaUpdate(Seat.class)
                        .eq(Seat::getProgramId,programOperateDataDto.getProgramId())
                        .in(Seat::getId, seatIdList);
    Seat updateSeat = new Seat();
    updateSeat.setSellStatus(SellStatus.SOLD.getCode());
    seatMapper.update(updateSeat,seatLambdaUpdateWrapper);

    //将库中的对应票档进行更新库存
    int updateRemainNumberCount = 
            ticketCategoryMapper.batchUpdateRemainNumber(ticketCategoryCountDtoList,programOperateDataDto.getProgramId());
    if (updateRemainNumberCount != ticketCategoryCountDtoList.size()) {
        throw new DaMaiFrameException(BaseCode.UPDATE_TICKET_CATEGORY_COUNT_NOT_CORRECT);
    }
}


这里在执行前通过节目id+座位id集合来验证是否幂等。

里面的流程比较简单,先验证状态,然后更新座位状态,再更新票档数量。这里的票档数量是使用的自定义sql来更新


int batchUpdateRemainNumber(@Param("ticketCategoryCountDtoList") 
                                List<TicketCategoryCountDto> ticketCategoryCountDtoList,
                                @Param("programId")
                                Long programId);


<update id="batchUpdateRemainNumber">
    <foreach collection="ticketCategoryCountDtoList" item="ticketCategoryCountDto" index="index">
        update
            d_ticket_category
        set remain_number = remain_number - #{ticketCategoryCountDto.count,jdbcType=BIGINT}
        where id = #{ticketCategoryCountDto.ticketCategoryId,jdbcType=BIGINT}
        and program_id = #{programId,jdbcType=BIGINT}
    </foreach>
</update>


注意

查询更新座位的条件

LambdaQueryWrapper<Seat> seatLambdaQueryWrapper = 
                Wrappers.lambdaQuery(Seat.class)
                        .eq(Seat::getProgramId,programOperateDataDto.getProgramId())
                        .in(Seat::getId, seatIdList);

更新座位的条件

LambdaUpdateWrapper<Seat> seatLambdaUpdateWrapper = 
                Wrappers.lambdaUpdate(Seat.class)
                        .eq(Seat::getProgramId,programOperateDataDto.getProgramId())
                        .in(Seat::getId, seatIdList);

这里都用上了 节目id和座位id,因为座位表是使用 节目id 作为分片键的,任何操作都要带有分片键,否则会发生读扩散的问题!

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。