五分钟带你玩转rabbitmq(五)死信队列

举报
小鲍侃java 发表于 2021/09/11 00:37:53 2021/09/11
【摘要】 文件目录如下 业务背景: 如果有有错误消息 如果手动nack同时将消息放回到队列中 那么这条消息会反复消费 留在队列中  如果nack后将消息丢弃 那么如果碰到网络抖动 消息也会丢失 。 所以 建立死信队列避免消息丢失。 原理 : 当消息进入进入业务队列后 如果收到nack那么就将这条消息放入另一条队列中 。 ...

文件目录如下

业务背景:

如果有有错误消息 如果手动nack同时将消息放回到队列中 那么这条消息会反复消费 留在队列中 

如果nack后将消息丢弃 那么如果碰到网络抖动 消息也会丢失 。 所以 建立死信队列避免消息丢失。

原理 :

当消息进入进入业务队列后 如果收到nack那么就将这条消息放入另一条队列中 。

1.pom文件


  
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2.配置文件


  
  1. server:
  2. port: 8088
  3. spring:
  4. rabbitmq:
  5. host: 192.168.*.*
  6. port: 5672
  7. username: root
  8. password: root
  9. virtual-host: /
  10. listener:
  11. simple:
  12. acknowledge-mode: manual #手动应答
  13. prefetch: 1 # 每次只处理一个信息
  14. publisher-confirms: true #开启消息确认机制
  15. publisher-returns: true #支持消息发送失败返回队列

3.rabbitmq的配置


  
  1. @Configuration
  2. public class RabbitMqConfig {
  3. /**
  4. * 连接工厂
  5. */
  6. @Autowired
  7. private ConnectionFactory connectionFactory;
  8. /**
  9. * 定制化amqp模版
  10. *
  11. * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
  12. * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack
  13. */
  14. @Bean
  15. public RabbitTemplate rabbitTemplate() {
  16. Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
  17. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  18. // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
  19. rabbitTemplate.setMandatory(true);
  20. // 发送消息确认, yml需要配置 publisher-confirms: true
  21. rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
  22. // 消息返回, yml需要配置 publisher-returns: true
  23. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  24. String correlationId = message.getMessageProperties().getCorrelationId().toString();
  25. logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,
  26. routingKey);
  27. });
  28. return rabbitTemplate;
  29. }
  30. /**
  31. * 确认发送消息是否成功(调用util方法)
  32. *
  33. * @return
  34. */
  35. @Bean
  36. public MsgSendConfirmCallBack msgSendConfirmCallBack() {
  37. return new MsgSendConfirmCallBack();
  38. }
  39. }

util发送回调方法


  
  1. public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
  2. /**
  3. * 回调方法
  4. * @param correlationData
  5. * @param ack
  6. * @param cause
  7. */
  8. @Override
  9. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  10. System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
  11. if (ack) {
  12. System.out.println("消息发送成功");
  13. } else {
  14. //可以将消息写入本地,使用定时任务重新发送
  15. System.out.println("消息发送失败:" + cause + "\n重新发送");
  16. }
  17. }
  18. }

这里有一个点 如果想做实现消息失败重新发送 在注释处可以实现

需要将消息写入本地 如果失败从本地读取 然后发送 如果成功删除本地信息

4.业务队列(如:订单业务)

这里声明了一个业务队列 

关键点在于x-dead-letter-exchange,x-dead-letter-routing-key 两个参数


  
  1. @Configuration
  2. public class BusinessConfig {
  3. /**
  4. * 业务1模块direct交换机的名字
  5. */
  6. public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
  7. /**
  8. * 业务1 demo业务的队列名称
  9. */
  10. public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
  11. /**
  12. * 业务1 demo业务的routekey
  13. */
  14. public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
  15. @Bean
  16. public Queue yewu1DemoDeadQueue() {
  17. // 将普通队列绑定到死信队列交换机上
  18. Map<String, Object> args = new HashMap<>(2);
  19. args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
  20. args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
  21. return new Queue("yewu1_demo_dead_queue", true, false, false, args);
  22. }
  23. /**
  24. * 将消息队列和交换机进行绑定
  25. */
  26. @Bean
  27. public Binding binding_one() {
  28. return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
  29. .with("yewu1_demo_dead_key");
  30. }
  31. }

这里有一个点如果想持久化消息到磁盘 需要新建队列时 new Queue将第二个参数输入为true 但是面对大并发时效率会变低 

5.死信队列


  
  1. @Configuration
  2. public class DeadConfig {
  3. /**
  4. * 死信队列
  5. */
  6. public final static String FAIL_QUEUE_NAME = "fail_queue";
  7. /**
  8. * 死信交换机
  9. */
  10. public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
  11. /**
  12. * 死信routing
  13. */
  14. public final static String FAIL_ROUTING_KEY = "fail_routing";
  15. /**
  16. * 创建配置死信队列
  17. *
  18. */
  19. @Bean
  20. public Queue deadQueue() {
  21. return new Queue(FAIL_QUEUE_NAME, true, false, false);
  22. }
  23. /**
  24. * 死信交换机
  25. *
  26. * @return
  27. */
  28. @Bean
  29. public DirectExchange deadExchange() {
  30. DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
  31. return directExchange;
  32. }
  33. /**
  34. * 绑定关系
  35. *
  36. * @return
  37. */
  38. @Bean
  39. public Binding failBinding() {
  40. return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
  41. }
  42. }

6.生产者消费者


  
  1. public enum RabbitEnum {
  2. /**
  3. * 处理成功
  4. */
  5. ACCEPT,
  6. /**
  7. * 可以重试的错误
  8. */
  9. RETRY,
  10. /**
  11. * 无需重试的错误
  12. */
  13. REJECT

  
  1. @RequestMapping("/sendDirectDead")
  2. String sendDirectDead(@RequestBody String message) throws Exception {
  3. System.out.println("开始生产");
  4. CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
  5. rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
  6. message, data);
  7. System.out.println("结束生产");
  8. System.out.println("发送id:" + data);
  9. return "OK,sendDirect:" + message;
  10. }

  
  1. @RabbitListener(queues = "yewu1_demo_dead_queue")
  2. protected void consumerDead(Message message, Channel channel) throws Exception {
  3. RabbitEnum ackSign = RabbitEnum.RETRY;
  4. try {
  5. int i = 10 / 0;
  6. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  7. } catch (Exception e) {
  8. ackSign = RabbitEnum.RETRY;
  9. throw e;
  10. } finally {
  11. // 通过finally块来保证Ack/Nack会且只会执行一次
  12. if (ackSign == RabbitEnum.ACCEPT) {
  13. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  14. } else if (ackSign == RabbitEnum.RETRY) {
  15. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  16. }
  17. }
  18. }

7.实验

当发送yewu1_demo_dead_queue队列时 如果抛出异常 会放入死信队列中。

文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。

原文链接:baocl.blog.csdn.net/article/details/106156621

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。