五分钟带你玩转rabbitmq(五)死信队列

举报
小鲍侃java 发表于 2021/09/11 00:37:53 2021/09/11
3.1k+ 0 0
【摘要】 文件目录如下 业务背景: 如果有有错误消息 如果手动nack同时将消息放回到队列中 那么这条消息会反复消费 留在队列中  如果nack后将消息丢弃 那么如果碰到网络抖动 消息也会丢失 。 所以 建立死信队列避免消息丢失。 原理 : 当消息进入进入业务队列后 如果收到nack那么就将这条消息放入另一条队列中 。 ...

文件目录如下

业务背景:

如果有有错误消息 如果手动nack同时将消息放回到队列中 那么这条消息会反复消费 留在队列中 

如果nack后将消息丢弃 那么如果碰到网络抖动 消息也会丢失 。 所以 建立死信队列避免消息丢失。

原理 :

当消息进入进入业务队列后 如果收到nack那么就将这条消息放入另一条队列中 。

1.pom文件


             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter-amqp</artifactId>
             </dependency>
  
 

2.配置文件


      server:
       port: 8088
      spring:
       rabbitmq:
         host: 192.168.*.*
         port: 5672
         username: root
         password: root
         virtual-host: /
         listener:
           simple:
             acknowledge-mode: manual  #手动应答
             prefetch: 1 # 每次只处理一个信息
         publisher-confirms: true #开启消息确认机制
         publisher-returns: true #支持消息发送失败返回队列
  
 

3.rabbitmq的配置


      @Configuration
      public class RabbitMqConfig {
         /**
       * 连接工厂
       */
         @Autowired
         private ConnectionFactory connectionFactory;
         /**
       * 定制化amqp模版
       *
       * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
       * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack
       */
         @Bean
         public RabbitTemplate rabbitTemplate() {
              Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
              RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
              // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
              rabbitTemplate.setMandatory(true);
             // 发送消息确认, yml需要配置 publisher-confirms: true
              rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
             // 消息返回, yml需要配置 publisher-returns: true
              rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                  String correlationId = message.getMessageProperties().getCorrelationId().toString();
                  logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,
                      routingKey);
              });
             return rabbitTemplate;
          }
         /**
       * 确认发送消息是否成功(调用util方法)
       *
       * @return
       */
         @Bean
         public MsgSendConfirmCallBack msgSendConfirmCallBack() {
             return new MsgSendConfirmCallBack();
          }
      }
  
 

util发送回调方法


      public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
         /**
       * 回调方法
       * @param correlationData
       * @param ack
       * @param cause
       */
          @Override
         public void confirm(CorrelationData correlationData, boolean ack, String cause) {
              System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
             if (ack) {
                  System.out.println("消息发送成功");
              } else {
                 //可以将消息写入本地,使用定时任务重新发送
                  System.out.println("消息发送失败:" + cause + "\n重新发送");
              }
          }
      }
  
 

这里有一个点 如果想做实现消息失败重新发送 在注释处可以实现

需要将消息写入本地 如果失败从本地读取 然后发送 如果成功删除本地信息

4.业务队列(如:订单业务)

这里声明了一个业务队列 

关键点在于x-dead-letter-exchange,x-dead-letter-routing-key 两个参数


      @Configuration
      public class BusinessConfig {
         /**
       * 业务1模块direct交换机的名字
       */
         public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
         /**
       * 业务1 demo业务的队列名称
       */
         public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
         /**
       * 业务1 demo业务的routekey
       */
         public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
          @Bean
         public Queue yewu1DemoDeadQueue() {
             // 将普通队列绑定到死信队列交换机上
              Map<String, Object> args = new HashMap<>(2);
              args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
              args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
             return new Queue("yewu1_demo_dead_queue", true, false, false, args);
          }
         /**
       * 将消息队列和交换机进行绑定
       */
          @Bean
         public Binding binding_one() {
             return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
                  .with("yewu1_demo_dead_key");
          }
      }
  
 

这里有一个点如果想持久化消息到磁盘 需要新建队列时 new Queue将第二个参数输入为true 但是面对大并发时效率会变低 

5.死信队列


      @Configuration
      public class DeadConfig {
         /**
       * 死信队列
       */
         public final static String FAIL_QUEUE_NAME = "fail_queue";
         /**
       * 死信交换机
       */
         public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
         /**
       * 死信routing
       */
         public final static String FAIL_ROUTING_KEY = "fail_routing";
         /**
       * 创建配置死信队列
       *
       */
          @Bean
         public Queue deadQueue() {
             return new Queue(FAIL_QUEUE_NAME, true, false, false);
          }
         /**
       * 死信交换机
       *
       * @return
       */
          @Bean
         public DirectExchange deadExchange() {
              DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
             return directExchange;
          }
         /**
       * 绑定关系
       *
       * @return
       */
          @Bean
         public Binding failBinding() {
             return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
          }
      }
  
 

6.生产者消费者


      public enum RabbitEnum {
         /**
       * 处理成功
       */
          ACCEPT,
         /**
       * 可以重试的错误
       */
          RETRY,
         /**
       * 无需重试的错误
       */
          REJECT
  
 

      @RequestMapping("/sendDirectDead")
              String sendDirectDead(@RequestBody String message) throws Exception {
              System.out.println("开始生产");
              CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
              rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
                      message, data);
              System.out.println("结束生产");
              System.out.println("发送id:" + data);
             return "OK,sendDirect:" + message;
          }
  
 

         @RabbitListener(queues = "yewu1_demo_dead_queue")
         protected void consumerDead(Message message, Channel channel) throws Exception {
              RabbitEnum ackSign = RabbitEnum.RETRY;
             try {
                 int i = 10 / 0;
                  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
              } catch (Exception e) {
                  ackSign = RabbitEnum.RETRY;
                 throw e;
              } finally {
                 // 通过finally块来保证Ack/Nack会且只会执行一次
                 if (ackSign == RabbitEnum.ACCEPT) {
                      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                  } else if (ackSign == RabbitEnum.RETRY) {
                      channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                  }
              }
          }
  
 

7.实验

当发送yewu1_demo_dead_queue队列时 如果抛出异常 会放入死信队列中。

文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。

原文链接:baocl.blog.csdn.net/article/details/106156621

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。