Rocket重试机制
环境介绍 |
|
技术栈 |
springboot+mybatis-plus+mysql+rocketmq |
软件 |
版本 |
mysql |
8 |
IDEA |
IntelliJ IDEA 2022.2.1 |
JDK |
17 |
Spring Boot |
3.1.7 |
dynamic-datasource |
3.6.1 |
mybatis-plus |
3.5.3.2 |
rocketmq |
4.9.4 |
加入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions><!-- 排除logback依赖 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--Log4j2场景启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3</version>
<exclusions>
<exclusion>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.14</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>p6spy</groupId>
<artifactId>p6spy</artifactId>
<version>3.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
</dependencies>
Broker:经纪人(经理人)
Topic主题:消息区分,分类,虚拟结构
Queue:消息队列
Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
Rocket重试机制
RocketMQ的重试机制是指:当消费者消费消息失败时,RocketMQ会在一定时间后重新将消息发送给消费者进行消费,以确保消息的可靠消费。
自动重试:Consumer在消费失败后,会在一定重试策略下定期重试消费失败的消息,直到成功或达到最大重试次数。
消息重发:如果Consumer在最大重试次数内仍然消费失败,Broker会定期扫描被标记为消费失败的消息,并将其重发给其他Consumer。
灵活的重试策略:RocketMQ提供多种重试策略来控制重试时机和频率,主要有:
生产者重试
生产者设置消息失败后重试次数
//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);
Int 重试的次数
//重试
//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);
@Test
void retryProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("retryGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
producer.setDefaultTopicQueueNums(1);
//自身业务key唯一
String Key = UUID.randomUUID().toString();
System.out.println(Key);
//重试
//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
//producer.setRetryTimesWhenSendAsyncFailed(2);
//创建消息
Message message = new Message("retry", null,Key, "重试测试内存内容".getBytes());
//发送消息
producer.send(message);
System.out.println("发送成功");
//关闭生产者
producer.shutdown();
}
消费者重试
设置自定义最大重试
consumer.setMaxReconsumeTimes(6);
死信消息(超过重试次数,并未处理的消息),放在死信主题中,%DLQ% retry
@Test
void retryConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("retry","*");
//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
//获取key
for (MessageExt messageExt : msgs) {
System.out.println(new Date());
System.out.println("消息内容"+new String(messageExt.getBody()));
}
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭 consumer.shutdown();
}
死信处理方案
死信处理方案1、单独订阅死信主题2、监听死信主题(业务流程控制)
通过存入单独数据库表,业务发送短信等方式通知人工处理
1、单独订阅死信主题
@Test
void retryDeadMonitorConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅死信主题 *表示该主题的所有消息
consumer.subscribe("%DLQ%retryConsumerTest","*");
//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
//获取key
for (MessageExt messageExt : msgs) {
System.out.println(new Date());
System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));
}
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭 consumer.shutdown();
}
2、监听死信主题(业务流程控制)
//死信处理方案二、监听死信主题
@Test
void retryDeadMonitorConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//设置每次重试次数
consumer.setMaxReconsumeTimes(3);
// 订阅需要的多个主题列表
List<String> topics = Arrays.asList("retry", "TopicA", "TopicB");
// 订阅主题列表中的所有主题
for (String topic : topics) {
consumer.subscribe(topic, "*"); // 这里的tag是用来过滤消息的,"*" 表示接收这个主题下的所有消息
}
//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理
//获取key
for (MessageExt messageExt : msgs) {
try{
//业务代码
System.out.println("业务代码");
System.out.println("消息内容"+new String(messageExt.getBody()));
int i =1/0;//模拟代码出错
}catch (Exception e){
//获取重试次数
int reconsumeTimes = messageExt.getReconsumeTimes();
String key = messageExt.getKeys();
if (reconsumeTimes > 2){
OrderLog orderLog = new OrderLog();
orderLog.setType(2);
orderLog.setOrderid(key);
orderLog.setUsername("重试超过2次,死信消息");
orderMapper.insert(orderLog);
System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));
//发送短信通知人工处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭 consumer.shutdown();
}
- 点赞
- 收藏
- 关注作者
评论(0)