颠覆认知!高并发场景下订单也能异步处理?揭秘新策略

举报
幼儿园老大* 发表于 2024/11/27 15:36:27 2024/11/27
【摘要】 瓶颈其实是在调用订单服务生成订单这个步骤中,项目之间使用的是 Feign 的框架进行调用,Feign的底层其实是http调用,虽然可以替换连接池,比如 feign-okhttp,来提高性能,但是本质还是基于http的方式,性能就会受到影响那么既然是高并发场景,在同一时间会有大量的订单请求,那么就要将生成订单的步骤变成异步,那么如何才能异步呢?这时我们就要借助mq中间件,现在我们直接开门见山,...

瓶颈其实是在调用订单服务生成订单这个步骤中,项目之间使用的是 Feign 的框架进行调用,Feign的底层其实是http调用,虽然可以替换连接池,比如 feign-okhttp,来提高性能,但是本质还是基于http的方式,性能就会受到影响

那么既然是高并发场景,在同一时间会有大量的订单请求,那么就要将生成订单的步骤变成异步,那么如何才能异步呢?这时我们就要借助mq中间件,现在我们直接开门见山,介绍整个流程到底是怎样的?

当用户选择演唱会后,通过前端服务,向后端服务发送进行扣减的操作


在redis通过lua判断余票扣减后数量是否充足,如果不充足,则直接通知用户无余票。如果充足,进行相应的扣除后,就可以构建订单信息直接交给mq了,然后把订单编号直接返回给前端服务。


这时,订单服务会一直消费mq的消息,收到后进行订单入库和订单入redis的操作


但这里还没有完,虽然第2步返回了订单编号,但由于异步场景,不一定真正的入库了,所以前端要有个定时任务,不断的去轮训订单是否真的存在,如果查询到了,才说明订单是真正的创建成功


由于是高并发场景,会有大量的订单请求,所以前端的定时任务也不能一直的去轮训,要设置个超时时间,在项目中设置的是在5s内,每隔200ms去轮训一次订单是否存在,如果存在,则停止定时任务,订单生成成功,跳转支付页面,如果超过了5s还没有轮训到,那么直接通知用户生成订单失败


到这里还有个问题,那就是前端轮训超过了5s没有查询到订单,提示了订单失败,但其实后端ma中间件消费延迟了,超过了5s才消费到,这时我们就不要再入库了,直接把这个超过5s的订单丢弃!

代码

com.damai.controller.ProgramOrderController#createV4

@ApiOperation(value = "购票V4")
@PostMapping(value = "/create/v4")
public ApiResponse<String> createV4(@Valid @RequestBody ProgramOrderCreateDto programOrderCreateDto) {
    return ApiResponse.ok(programOrderLock.createV4(programOrderCreateDto));
}
@RepeatExecuteLimit(
        name = RepeatExecuteLimitConstants.CREATE_PROGRAM_ORDER,
        keys = {"#programOrderCreateDto.userId","#programOrderCreateDto.programId"})
public String createV4(ProgramOrderCreateDto programOrderCreateDto) {
    compositeContainer.execute(CompositeCheckType.PROGRAM_ORDER_CREATE_CHECK.getValue(),programOrderCreateDto);
    return localLockCreateOrder(PROGRAM_ORDER_CREATE_V4,programOrderCreateDto,
            () -> programOrderService.createNewAsync(programOrderCreateDto));
}
public String createNewAsync(ProgramOrderCreateDto programOrderCreateDto) {
    //通过reids+lua进行余票数量的判断,进行扣减,以及座位状态的锁定
    List<SeatVo> purchaseSeatList = createOrderOperateProgramCacheResolution(programOrderCreateDto);
    return doCreateV2(programOrderCreateDto,purchaseSeatList);
}

createOrderOperateProgramCacheResolution 中的流程是通过reids+lua进行余票数量的判断,进行扣减,以及座位状态的锁定,关于此流程的详细介绍在生成订单v3版本中详细的进行了介绍,这里就不再赘述,

构建订单发送给消息队列

构建订单

当成功扣减后,接下来就是进行购机订单信息,然后发送给kafka的操作了

private String doCreateV2(ProgramOrderCreateDto programOrderCreateDto,List<SeatVo> purchaseSeatList){
    //构建主订单和购票人订单信息
    OrderCreateDto orderCreateDto = buildCreateOrderParam(programOrderCreateDto, purchaseSeatList);
    //发送给kafka
    String orderNumber = createOrderByMq(orderCreateDto,purchaseSeatList);
    //延迟队列关闭订单发送
    DelayOrderCancelDto delayOrderCancelDto = new DelayOrderCancelDto();
    delayOrderCancelDto.setOrderNumber(orderCreateDto.getOrderNumber());
    delayOrderCancelSend.sendMessage(JSON.toJSONString(delayOrderCancelDto));
    
    return orderNumber;
}


buildCreateOrderParam 中的流程是构建主订单和购票人订单信息,关于此步骤的详细介绍在生成订单v1版本中详细的进行了介绍,这里就不再赘述

发送kafka

createOrderByMq 中的流程是发送kafka

private String createOrderByMq(OrderCreateDto orderCreateDto,List<SeatVo> purchaseSeatList){
    CreateOrderMqDomain createOrderMqDomain = new CreateOrderMqDomain();
    CountDownLatch latch = new CountDownLatch(1);
    //发送kafka
    createOrderSend.sendMessage(JSON.toJSONString(orderCreateDto),sendResult -> {
        //发送成功
        createOrderMqDomain.orderNumber = String.valueOf(orderCreateDto.getOrderNumber());
        assert sendResult != null;
        log.info("创建订单kafka发送消息成功 topic : {}",sendResult.getRecordMetadata().topic());
        latch.countDown();
    },ex -> {
        //发送失败
        log.error("创建订单kafka发送消息失败 error",ex);
        log.error("创建订单失败 需人工处理 orderCreateDto : {}",JSON.toJSONString(orderCreateDto));
        updateProgramCacheDataResolution(orderCreateDto.getProgramId(),purchaseSeatList,OrderStatus.CANCEL);
        createOrderMqDomain.daMaiFrameException = new DaMaiFrameException(ex);
        latch.countDown();
    });
    try {
        //使用CountDownLatch等待发送结果
        latch.await();
    } catch (InterruptedException e) {
        log.error("createOrderByMq InterruptedException",e);
        throw new DaMaiFrameException(e);
    }
    //如果发送失败,则直接抛出异常
    if (Objects.nonNull(createOrderMqDomain.daMaiFrameException)) {
        throw createOrderMqDomain.daMaiFrameException;
    }
    return createOrderMqDomain.orderNumber;
}

在调用发送kafka的api时,使用了带有回调方式,当发送成功或者发送失败后,可以进行回调,但有一点要注意,此回调是异步执行的!所以我们如果想要知道发送后到底是成功还是失败,需要进行等待


那么如果才能等待呢?这里我们使用了jdk中并发包下的工具 CountDownLatch,俗称门栓,当发送给kafka后,会直接到 latch.await(),由于没有门栓没有释放,所以这里会阻塞


当发送后进行了回调后,无论是成功还是失败,都会执行 latch.countDown(),此方法是进行门栓的释放,当释放后,上一步的阻塞就可以被唤醒,然后继续执行流程


执行到这里后,就可以借助 CreateOrderMqDomain对象,判断到底是发送成功了还是失败了,如果存在了异常,说明确实发送失败了,则直接抛出异常。如果没有异常说明发送成功了,则把订单编号返回


然后还是通过延迟队列发送订单关闭的消息,此流程在之前文章已经详细的讲解过,这里就不再赘述


消费消息队列的订单消息

当订单成功发送给kafka后,就会开始执行消息的消费流程

com.damai.service.kafka.CreateOrderConsumer

@Slf4j
@AllArgsConstructor
@Component
public class CreateOrderConsumer {
    
    @Autowired
    private OrderService orderService;
    
    public static Long MESSAGE_DELAY_TIME = 5000L;
    
    @KafkaListener(topics = {SPRING_INJECT_PREFIX_DISTINCTION_NAME+"-"+"${spring.kafka.topic:create_order}"})
    public void consumerOrderMessage(ConsumerRecord<String,String> consumerRecord){
        try {
            Optional.ofNullable(consumerRecord.value()).map(String::valueOf).ifPresent(value -> {
                
                OrderCreateDto orderCreateDto = JSON.parseObject(value, OrderCreateDto.class);
                
                long createOrderTimeTimestamp = orderCreateDto.getCreateOrderTime().getTime();
                
                long currentTimeTimestamp = System.currentTimeMillis();
                
                long delayTime = currentTimeTimestamp - createOrderTimeTimestamp;
                
                log.info("消费到kafka的创建订单消息 消息体: {} 延迟时间 : {} 毫秒",value,delayTime);
                //如果消费到消息时,延迟时间超过了5s,那么此订单丢弃,将库存回滚回去
                if (currentTimeTimestamp - createOrderTimeTimestamp > MESSAGE_DELAY_TIME) {
                    log.info("消费到kafka的创建订单消息延迟时间大于了 {} 毫秒 此订单消息被丢弃 订单号 : {}",
                            delayTime,orderCreateDto.getOrderNumber());
                    Map<Long, List<OrderTicketUserCreateDto>> orderTicketUserSeatList =
                            orderCreateDto.getOrderTicketUserCreateDtoList().stream().collect(Collectors.groupingBy(OrderTicketUserCreateDto::getTicketCategoryId));
                    Map<Long,List<Long>> seatMap = new HashMap<>(orderTicketUserSeatList.size());
                    orderTicketUserSeatList.forEach((k,v) -> {
                        seatMap.put(k,v.stream().map(OrderTicketUserCreateDto::getSeatId).collect(Collectors.toList()));
                    });
                    //数据恢复
                    orderService.updateProgramRelatedDataMq(orderCreateDto.getProgramId(),seatMap, OrderStatus.CANCEL);
                }else {
                    String orderNumber = orderService.createMq(orderCreateDto);
                    log.info("消费到kafka的创建订单消息 创建订单成功 订单号 : {}",orderNumber);
                }
            });
        }catch (Exception e) {
            log.error("处理消费到kafka的创建订单消息失败 error",e);
        }
    }
}

当消息到消息时,要判断延时时间是否大于了5s,如果大于了,此订单就要被丢弃,丢弃此订单也要将数据进行回滚,因为之前在节目服务中通过redis+lua进行了扣减余票数量,座位的状态锁定操作,所以要把这些数据要恢复回去,


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。