五分钟带你玩转rabbitmq(五)死信队列
【摘要】
文件目录如下
业务背景:
如果有有错误消息 如果手动nack同时将消息放回到队列中 那么这条消息会反复消费 留在队列中
如果nack后将消息丢弃 那么如果碰到网络抖动 消息也会丢失 。 所以 建立死信队列避免消息丢失。
原理 :
当消息进入进入业务队列后 如果收到nack那么就将这条消息放入另一条队列中 。
...
文件目录如下
业务背景:
如果有有错误消息 如果手动nack同时将消息放回到队列中 那么这条消息会反复消费 留在队列中
如果nack后将消息丢弃 那么如果碰到网络抖动 消息也会丢失 。 所以 建立死信队列避免消息丢失。
原理 :
当消息进入进入业务队列后 如果收到nack那么就将这条消息放入另一条队列中 。
1.pom文件
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
2.配置文件
-
server:
-
port: 8088
-
spring:
-
rabbitmq:
-
host: 192.168.*.*
-
port: 5672
-
username: root
-
password: root
-
virtual-host: /
-
listener:
-
simple:
-
acknowledge-mode: manual #手动应答
-
prefetch: 1 # 每次只处理一个信息
-
publisher-confirms: true #开启消息确认机制
-
publisher-returns: true #支持消息发送失败返回队列
3.rabbitmq的配置
-
@Configuration
-
public class RabbitMqConfig {
-
-
/**
-
* 连接工厂
-
*/
-
@Autowired
-
private ConnectionFactory connectionFactory;
-
-
/**
-
* 定制化amqp模版
-
*
-
* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
-
* ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack
-
*/
-
@Bean
-
public RabbitTemplate rabbitTemplate() {
-
Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
-
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
-
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
-
rabbitTemplate.setMandatory(true);
-
// 发送消息确认, yml需要配置 publisher-confirms: true
-
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
-
// 消息返回, yml需要配置 publisher-returns: true
-
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
-
String correlationId = message.getMessageProperties().getCorrelationId().toString();
-
logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,
-
routingKey);
-
});
-
return rabbitTemplate;
-
}
-
-
/**
-
* 确认发送消息是否成功(调用util方法)
-
*
-
* @return
-
*/
-
@Bean
-
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
-
return new MsgSendConfirmCallBack();
-
}
-
}
util发送回调方法
-
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
-
-
/**
-
* 回调方法
-
* @param correlationData
-
* @param ack
-
* @param cause
-
*/
-
@Override
-
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-
System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
-
if (ack) {
-
System.out.println("消息发送成功");
-
} else {
-
//可以将消息写入本地,使用定时任务重新发送
-
System.out.println("消息发送失败:" + cause + "\n重新发送");
-
}
-
}
-
-
}
这里有一个点 如果想做实现消息失败重新发送 在注释处可以实现
需要将消息写入本地 如果失败从本地读取 然后发送 如果成功删除本地信息
4.业务队列(如:订单业务)
这里声明了一个业务队列
关键点在于x-dead-letter-exchange,x-dead-letter-routing-key 两个参数
-
@Configuration
-
public class BusinessConfig {
-
-
/**
-
* 业务1模块direct交换机的名字
-
*/
-
public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
-
-
/**
-
* 业务1 demo业务的队列名称
-
*/
-
public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
-
-
/**
-
* 业务1 demo业务的routekey
-
*/
-
public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
-
-
-
@Bean
-
public Queue yewu1DemoDeadQueue() {
-
// 将普通队列绑定到死信队列交换机上
-
Map<String, Object> args = new HashMap<>(2);
-
args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
-
args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
-
return new Queue("yewu1_demo_dead_queue", true, false, false, args);
-
}
-
-
/**
-
* 将消息队列和交换机进行绑定
-
*/
-
@Bean
-
public Binding binding_one() {
-
return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
-
.with("yewu1_demo_dead_key");
-
}
-
}
这里有一个点如果想持久化消息到磁盘 需要新建队列时 new Queue将第二个参数输入为true 但是面对大并发时效率会变低
5.死信队列
-
@Configuration
-
public class DeadConfig {
-
-
/**
-
* 死信队列
-
*/
-
public final static String FAIL_QUEUE_NAME = "fail_queue";
-
-
/**
-
* 死信交换机
-
*/
-
public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
-
-
/**
-
* 死信routing
-
*/
-
public final static String FAIL_ROUTING_KEY = "fail_routing";
-
-
/**
-
* 创建配置死信队列
-
*
-
*/
-
@Bean
-
public Queue deadQueue() {
-
return new Queue(FAIL_QUEUE_NAME, true, false, false);
-
}
-
-
/**
-
* 死信交换机
-
*
-
* @return
-
*/
-
@Bean
-
public DirectExchange deadExchange() {
-
DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
-
return directExchange;
-
}
-
-
/**
-
* 绑定关系
-
*
-
* @return
-
*/
-
@Bean
-
public Binding failBinding() {
-
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
-
}
-
-
}
6.生产者消费者
-
public enum RabbitEnum {
-
-
/**
-
* 处理成功
-
*/
-
ACCEPT,
-
-
/**
-
* 可以重试的错误
-
*/
-
RETRY,
-
-
/**
-
* 无需重试的错误
-
*/
-
REJECT
-
@RequestMapping("/sendDirectDead")
-
String sendDirectDead(@RequestBody String message) throws Exception {
-
System.out.println("开始生产");
-
CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
-
rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
-
message, data);
-
System.out.println("结束生产");
-
System.out.println("发送id:" + data);
-
return "OK,sendDirect:" + message;
-
}
-
@RabbitListener(queues = "yewu1_demo_dead_queue")
-
protected void consumerDead(Message message, Channel channel) throws Exception {
-
RabbitEnum ackSign = RabbitEnum.RETRY;
-
try {
-
int i = 10 / 0;
-
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-
} catch (Exception e) {
-
ackSign = RabbitEnum.RETRY;
-
throw e;
-
} finally {
-
// 通过finally块来保证Ack/Nack会且只会执行一次
-
if (ackSign == RabbitEnum.ACCEPT) {
-
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-
} else if (ackSign == RabbitEnum.RETRY) {
-
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
-
}
-
}
-
}
7.实验
当发送yewu1_demo_dead_queue队列时 如果抛出异常 会放入死信队列中。
文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。
原文链接:baocl.blog.csdn.net/article/details/106156621
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)