RabbitMQ之死信队列

举报
别团等shy哥发育 发表于 2023/01/10 21:55:46 2023/01/10
【摘要】 @toc 1、死信队列的概念  先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。...

@toc

1、死信队列的概念

  先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

2、死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

3、死信实战

3.1 代码架构图

在这里插入图片描述
一个生产者,两个消费者,当消息满足死信条件的时候被送入dead-queue这个死信队列。
zhangsan是normal_exchange普通交换机和normal-queue普通队列绑定的routingKey
lisi是dead_exhange死信交换机和dead-queue死信队列绑定的routingKey

3.2 模拟消息TTL过期

生产者代码

public class Producer {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息  设置TTL时间  time to live  单位是ms  10000ms=10s
        AMQP.BasicProperties properties=
                new AMQP.BasicProperties()
                .builder().expiration("10000")
                .build();

        for (int i = 1; i <11; i++) {
            String message = "info" + i;    //info1.....info10
           
         channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }

消费者01:

public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机  类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明普通队列
        Map<String,Object> arguments=new HashMap<>();
        //过期时间   10s=10000ms
//        arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        //设置正常队列的长度的限制
//        arguments.put("x-max-length",6);

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        ////////////////////////////////////////////////////////////////////////////
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
           String msg= new String(message.getBody(),"UTF-8");
               System.out.println("Consumer01接收的消息是:"+msg);
               channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}

消费者02:这个消费者代码最简单了,只负责接收死信队列中的消息即可。

public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
        
    }
}

先启动消费者C1

  先建立交换机与队列的绑定关系(这些代码我们写在了消费者c1里面),然后将消费者C1关闭,这时如果启动生产者发送消息的话,由于消费者C1线程已经关闭,所以消息会全部进入死信队列中。
  我们这时启动生产者线程发送10条消息,由于消费者C1已经关闭,我们在配置中指定的是若消息被拒绝则会进入死信队列dead-queue,所以我们在启动生产者之后,消息会全部进入死信队列,我们可以通过rabbitmq的控制台查看

通过上面的图片可以看出,normal_queue队列中的10条消息已经全部进入了死信队列dead_queue

  由架构图可知,死信队列中的消息会被消费者C2消费,那我们现在启动消费者C2线程,按理说启动之后,死信队列中的10条消息都会被C2消费掉。
C2的代码:

/**
 * 死信队列 实战
 *
 * 消费者2
 */
public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});

    }
}

通过下图可以看到,死信队列中的消息成功被C2消费掉,测试成功。

3.3 模拟队列达到最大长度

  消费者C2代码不变,我们现在设置普通队列normal_queue最多只能接收6条消息,生产者发送10条消息,那么最后的四条消息会被送入死信队列(这里先别让消费者C1消费,要不然消费的太快,看不到效果)。
生产者代码修改:(注释掉TTL)

public class Producer {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息  设置TTL时间  time to live  单位是ms  10000ms=10s
//        AMQP.BasicProperties properties=
//                new AMQP.BasicProperties()
//                .builder().expiration("10000")
//                .build();

        for (int i = 1; i <11; i++) {
            String message = "info" + i;    //info1.....info10
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
//            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
        
    }
}

C1代码修改
在normal_queue队列的声明处加入以下配置

然后启动C1线程,启动之后关闭

生产者发送10条消息,启动生产者线程

  从上图中的死信队列和普通队列中的消息条数可以发现,符合实验预期,由于我们关闭了消费者C1这个线程,所以消息在进入normal_queue的时候不会被消费,但由于我们设置了它最多只能接受6条消息,所以剩下的4条信息会进入死信队列。

3.4 模拟消息被拒绝

消费者C1:(启动之后关闭,模拟其接收不到消息)

public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机  类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明普通队列
        Map<String,Object> arguments=new HashMap<>();
        //过期时间   10s=10000ms
//        arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        //设置正常队列的长度的限制
//        arguments.put("x-max-length",6);

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        ////////////////////////////////////////////////////////////////////////////
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
           String msg= new String(message.getBody(),"UTF-8");
           if(msg.equals("info5")){ //拒绝info5 让其成为死信
               System.out.println("Consumer01接收的消息是:"+msg+":此消息是被C1拒绝的");
               //false不塞回普通队列  让其成为死信
               channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
           }else{
               System.out.println("Consumer01接收的消息是:"+msg);
               channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
           }
        };
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});

    }
}

启动生产者(代码和上面的一样)
此时的消息队列

  我们这时候先启动消费者C1再启动消费者C2,我们设置的是消费者C1会拒绝info5这条消息 让其进入死信队列,那么我们启动消费者C2之后,这条被拒绝的消息就会被C2消费。
C1:

C2:

测试成功

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。