五分钟带你玩转rocketMQ(八)提升消息稳定性——重试

举报
小鲍侃java 发表于 2021/09/10 00:42:00 2021/09/10
【摘要】 消费端消息重试实现   生产端消息重试 重试两次  消费端消息重试 重试16次 然后加入死信 消费端模拟重试代码 @Component public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently { ...

消费端消息重试实现

 

生产端消息重试

重试两次 

消费端消息重试

重试16次 然后加入死信

消费端模拟重试代码

@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
    private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
    /**
     *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
     *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        if(CollectionUtils.isEmpty(msgs)){
            logger.info("接受到的消息为空,不处理,直接返回成功");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = msgs.get(0);
        logger.info("接受到的消息为:"+messageExt.toString());
        if(messageExt.getTopic().equals("NewMessage")){
            if(messageExt.getTags().equals("TagA")){
                //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)

                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                System.out.printf(df.format(new Date()) + ", %s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                int reconsume = messageExt.getReconsumeTimes();
                System.out.println("重试的次数为"+reconsume);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                //TODO 获取该消息重试次数
                //int reconsume = messageExt.getReconsumeTimes();
                //if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
                //    //记录日志
                //    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //}
                //TODO 处理对应的业务逻辑


            }
        }
        // 如果没有return success ,consumer会重新消费该消息,直到return success
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }


}

用以上代码模拟消费端的重试 ,每一次重试的时间与延时队列时间等级一样(16次时间还是比较长的)

重试16次之后投入死信

然后通过业务进行判断 如流水号等

文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。

原文链接:baocl.blog.csdn.net/article/details/104408288

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。