如何保障节目数据在缓存与数据库间的一致性
- 户购票生成订单过程 把缓存中的余票数量扣除了,缓存中的座位从未售卖修改为锁定中
- 支付回调过程把缓存中的座位从锁定中修改为已售卖
之所以把这些数据存到缓存中,是因为缓存的效率执行起来比数据库要快的多,但有没有注意到,从始至终都是操作的缓存,到现在都没有关于数据库的余票数量和座位的操作啊
而本文就是要介绍数据库的状态是什么时候更新的,缓存和数据库的一致性要如何保证
更新数据库中的余票数量和座位状态
让我们再去看一下支付回调执行的到的 updateProgramRelatedDataResolution 方法
com.damai.service.OrderService#updateProgramRelatedDataResolution
public void updateProgramRelatedDataResolution(Long programId,List<String> seatIdList,OrderStatus orderStatus){
//订单状态修改成了已支付 缓存中的余票扣除了,座位修改成已售卖了...
if (Objects.equals(orderStatus.getCode(), OrderStatus.PAY.getCode())) {
ProgramOperateDataDto programOperateDataDto = new ProgramOperateDataDto();
programOperateDataDto.setProgramId(programId);
//要将锁定修改已售卖的座位id集合
programOperateDataDto.setSeatIdList(unLockSeatIdList);
//票档数量
programOperateDataDto.setTicketCategoryCountDtoList(ticketCategoryCountDtoList);
//修改为已售卖状态
programOperateDataDto.setSellStatus(SellStatus.SOLD.getCode());
//放到延迟队列中
delayOperateProgramDataSend.sendMessage(JSON.toJSONString(programOperateDataDto));
}
}
com.damai.dto.ProgramOperateDataDto
@Data
@ApiModel(value="ProgramOperateDataDto", description ="节目数据操作")
public class ProgramOperateDataDto {
@ApiModelProperty(name ="programId", dataType ="Long", value ="节目id",required = true)
@NotNull
private Long programId;
@ApiModelProperty(name ="ticketCategoryCountMap", dataType ="List<TicketCategoryCountDto>",required = true)
@NotNull
private List<TicketCategoryCountDto> ticketCategoryCountDtoList;
@ApiModelProperty(name ="seatIdList", dataType ="List<Long>", value ="座位id集合",required = true)
@NotNull
private List<Long> seatIdList;
@ApiModelProperty(name ="sellStatus", dataType ="Long", value ="座位状态",required = true)
@NotNull
private Integer sellStatus;
}
com.damai.service.delaysend.DelayOperateProgramDataSend
@Slf4j
@Component
public class DelayOperateProgramDataSend {
@Autowired
private DelayQueueContext delayQueueContext;
public void sendMessage(String message){
try {
delayQueueContext.sendMessage(SpringUtil.getPrefixDistinctionName() + "-" + DELAY_OPERATE_PROGRAM_DATA_TOPIC,
message, DELAY_OPERATE_PROGRAM_DATA_TIME, DELAY_OPERATE_PROGRAM_DATA_TIME_UNIT);
}catch (Exception e) {
log.error("send message error message : {}",message,e);
}
}
}
到这里就明确了,其实当支付回调执行把订单状态和缓存的数据都成功执行后,发送更新节目和座位的数据消息到延迟队列中,由节目服务来消费消息进行数据库中的更新,有小伙伴可能会想了,这使用延迟队列,缓存和数据库不就不能保证一致性了吗?这里先卖个关子,先继续介绍,下文中会有答案
我们再去节目服务查看,是如何消费消息的
节目服务消费消息更新数据库
消息监听器
com.damai.service.delayconsumer.DelayOperateProgramDataConsumer
@Slf4j
@Component
public class DelayOperateProgramDataConsumer implements ConsumerTask {
@Autowired
private ProgramService programService;
@Override
public void execute(String content) {
log.info("延迟操作节目数据消息进行消费 content : {}", content);
if (StringUtil.isEmpty(content)) {
log.error("延迟队列消息不存在");
return;
}
ProgramOperateDataDto programOperateDataDto = JSON.parseObject(content, ProgramOperateDataDto.class);
programService.operateProgramData(programOperateDataDto);
}
@Override
public String topic() {
return SpringUtil.getPrefixDistinctionName() + "-" + DELAY_OPERATE_PROGRAM_DATA_TOPIC;
}
}
com.damai.service.ProgramService#operateProgramData
@RepeatExecuteLimit(name = CANCEL_PROGRAM_ORDER,keys = {"#programOperateDataDto.programId","#programOperateDataDto.seatIdList"})
@Transactional(rollbackFor = Exception.class)
public void operateProgramData(ProgramOperateDataDto programOperateDataDto){
List<TicketCategoryCountDto> ticketCategoryCountDtoList = programOperateDataDto.getTicketCategoryCountDtoList();
//从库中查询座位集合
List<Long> seatIdList = programOperateDataDto.getSeatIdList();
//根据节目id和座位id查询座位集合
LambdaQueryWrapper<Seat> seatLambdaQueryWrapper =
Wrappers.lambdaQuery(Seat.class)
.eq(Seat::getProgramId,programOperateDataDto.getProgramId())
.in(Seat::getId, seatIdList);
List<Seat> seatList = seatMapper.selectList(seatLambdaQueryWrapper);
//如果库中的座位集合为空,则抛出异常
if (CollectionUtil.isEmpty(seatList)) {
throw new DaMaiFrameException(BaseCode.SEAT_NOT_EXIST);
}
//如果库中的座位集合数量和传入的座位数量不相同,则抛出异常
if (seatList.size() != seatIdList.size()) {
throw new DaMaiFrameException(BaseCode.SEAT_UPDATE_REL_COUNT_NOT_EQUAL_PRESET_COUNT);
}
for (Seat seat : seatList) {
//如果库中的座位有一个已经是已售卖的状态,则抛出异常
if (Objects.equals(seat.getSellStatus(), SellStatus.SOLD.getCode())) {
throw new DaMaiFrameException(BaseCode.SEAT_SOLD);
}
}
//将库中的座位集合批量更新为售卖状态
LambdaUpdateWrapper<Seat> seatLambdaUpdateWrapper =
Wrappers.lambdaUpdate(Seat.class)
.eq(Seat::getProgramId,programOperateDataDto.getProgramId())
.in(Seat::getId, seatIdList);
Seat updateSeat = new Seat();
updateSeat.setSellStatus(SellStatus.SOLD.getCode());
seatMapper.update(updateSeat,seatLambdaUpdateWrapper);
//将库中的对应票档进行更新库存
int updateRemainNumberCount =
ticketCategoryMapper.batchUpdateRemainNumber(ticketCategoryCountDtoList,programOperateDataDto.getProgramId());
if (updateRemainNumberCount != ticketCategoryCountDtoList.size()) {
throw new DaMaiFrameException(BaseCode.UPDATE_TICKET_CATEGORY_COUNT_NOT_CORRECT);
}
}
这里在执行前通过节目id+座位id集合来验证是否幂等。
里面的流程比较简单,先验证状态,然后更新座位状态,再更新票档数量。这里的票档数量是使用的自定义sql来更新
int batchUpdateRemainNumber(@Param("ticketCategoryCountDtoList")
List<TicketCategoryCountDto> ticketCategoryCountDtoList,
@Param("programId")
Long programId);
<update id="batchUpdateRemainNumber">
<foreach collection="ticketCategoryCountDtoList" item="ticketCategoryCountDto" index="index">
update
d_ticket_category
set remain_number = remain_number - #{ticketCategoryCountDto.count,jdbcType=BIGINT}
where id = #{ticketCategoryCountDto.ticketCategoryId,jdbcType=BIGINT}
and program_id = #{programId,jdbcType=BIGINT}
</foreach>
</update>
注意
查询更新座位的条件
LambdaQueryWrapper<Seat> seatLambdaQueryWrapper =
Wrappers.lambdaQuery(Seat.class)
.eq(Seat::getProgramId,programOperateDataDto.getProgramId())
.in(Seat::getId, seatIdList);
更新座位的条件
LambdaUpdateWrapper<Seat> seatLambdaUpdateWrapper =
Wrappers.lambdaUpdate(Seat.class)
.eq(Seat::getProgramId,programOperateDataDto.getProgramId())
.in(Seat::getId, seatIdList);
这里都用上了 节目id和座位id,因为座位表是使用 节目id 作为分片键的,任何操作都要带有分片键,否则会发生读扩散的问题!
- 点赞
- 收藏
- 关注作者
评论(0)