Rocket重试机制

举报
QGS 发表于 2024/03/16 22:39:25 2024/03/16
【摘要】 Rocket重试机制及死信的处理

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

dynamic-datasource

3.6.1

mybatis-plus

3.5.3.2

rocketmq

4.9.4

加入依赖

<dependencies>
    <dependency>
        <groupId>
org.springframework.boot</groupId>
        <artifactId>
spring-boot-starter-web</artifactId>
        <exclusions>
<!-- 排除logback依赖 -->
           
<exclusion>
                <groupId>
org.springframework.boot</groupId>
                <artifactId>
spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

   
<!--Log4j2场景启动器 -->
   
<dependency>
        <groupId>
org.springframework.boot</groupId>
        <artifactId>
spring-boot-starter-log4j2</artifactId>
    </dependency>

    <dependency>
        <groupId>
com.baomidou</groupId>
        <artifactId>
mybatis-plus-boot-starter</artifactId>
        <version>
3.5.3</version>
        <exclusions>
            <exclusion>
                <groupId>
com.baomidou</groupId>
                <artifactId>
mybatis-plus-generator</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>
com.mysql</groupId>
        <artifactId>
mysql-connector-j</artifactId>
        <scope>
runtime</scope>
    </dependency>
    <dependency>
        <groupId>
org.projectlombok</groupId>
        <artifactId>
lombok</artifactId>
        <optional>
true</optional>
    </dependency>
    <dependency>
        <groupId>
org.springframework.boot</groupId>
        <artifactId>
spring-boot-starter-test</artifactId>
        <scope>
test</scope>
    </dependency>

    <dependency>
        <groupId>
com.alibaba</groupId>
        <artifactId>
druid-spring-boot-starter</artifactId>
        <version>
1.1.14</version>
    </dependency>
    <dependency>
        <groupId>
com.baomidou</groupId>
        <artifactId>
dynamic-datasource-spring-boot-starter</artifactId>
        <version>
3.6.1</version>
    </dependency>
    <dependency>
        <groupId>
p6spy</groupId>
        <artifactId>
p6spy</artifactId>
        <version>
3.9.1</version>
    </dependency>

    <dependency>
        <groupId>
org.apache.rocketmq</groupId>
        <artifactId>
rocketmq-client</artifactId>
        <version>
4.9.2</version>
    </dependency>

</dependencies>


Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。



Rocket重试机制

RocketMQ的重试机制是指:当消费者消费消息失败时,RocketMQ会在一定时间后重新将消息发送给消费者进行消费,以确保消息的可靠消费。


自动重试:Consumer在消费失败后,会在一定重试策略下定期重试消费失败的消息,直到成功或达到最大重试次数。

消息重发:如果Consumer在最大重试次数内仍然消费失败,Broker会定期扫描被标记为消费失败的消息,并将其重发给其他Consumer。

灵活的重试策略:RocketMQ提供多种重试策略来控制重试时机和频率,主要有:


生产者重试

生产者设置消息失败后重试次数

//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);


Int 重试的次数


//重试
//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);


@Test
void retryProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("retryGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
producer.setDefaultTopicQueueNums(1);
//自身业务key唯一
String Key = UUID.randomUUID().toString();
System.out.println(Key);
//重试
//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
//producer.setRetryTimesWhenSendAsyncFailed(2);

//创建消息
Message message = new Message("retry", null,Key, "重试测试内存内容".getBytes());
//发送消息
producer.send(message);
System.out.println("发送成功");
//关闭生产者
producer.shutdown();
}




消费者重试

设置自定义最大重试

consumer.setMaxReconsumeTimes(6);


死信消息(超过重试次数,并未处理的消息),放在死信主题中,%DLQ% retry




@Test
void retryConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("retry","*");

//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
//获取key
for (MessageExt messageExt : msgs) {
System.out.println(new Date());
System.out.println("消息内容"+new String(messageExt.getBody()));
}
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭 consumer.shutdown();
}



死信处理方案

死信处理方案1、单独订阅死信主题2、监听死信主题(业务流程控制)

通过存入单独数据库表,业务发送短信等方式通知人工处理


1、单独订阅死信主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅死信主题 *表示该主题的所有消息
consumer.subscribe("%DLQ%retryConsumerTest","*");

//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
//获取key
for (MessageExt messageExt : msgs) {
System.out.println(new Date());
System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));
}
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭 consumer.shutdown();
}


2、监听死信主题(业务流程控制)

//死信处理方案二、监听死信主题
@Test
void retryDeadMonitorConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//设置每次重试次数
consumer.setMaxReconsumeTimes(3);
// 订阅需要的多个主题列表
List<String> topics = Arrays.asList("retry", "TopicA", "TopicB");
// 订阅主题列表中的所有主题
for (String topic : topics) {
consumer.subscribe(topic, "*"); // 这里的tag是用来过滤消息的,"*" 表示接收这个主题下的所有消息
}

//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
//获取key
for (MessageExt messageExt : msgs) {
try{
//业务代码
System.out.println("业务代码");
System.out.println("消息内容"+new String(messageExt.getBody()));
int i =1/0;//模拟代码出错
}catch (Exception e){
//获取重试次数
int reconsumeTimes = messageExt.getReconsumeTimes();
String key = messageExt.getKeys();
if (reconsumeTimes > 2){
OrderLog orderLog = new OrderLog();
orderLog.setType(2);
orderLog.setOrderid(key);
orderLog.setUsername("重试超过2次,死信消息");
orderMapper.insert(orderLog);
System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));
//发送短信通知人工处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭 consumer.shutdown();
}




【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。