RabbitMQ消息应答

举报
别团等shy哥发育 发表于 2023/01/10 21:57:50 2023/01/10
【摘要】 @toc 1、概念  消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了部分它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该对象的消息,因为它无法接收到。  为了保证消息在发送过程中不丢失,RabbitMQ引入了消息应答机制,消息应答就...

@toc

1、概念

  消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了部分它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该对象的消息,因为它无法接收到。
  为了保证消息在发送过程中不丢失,RabbitMQ引入了消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了

2、自动应答

  消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效以某种速率能够处理这小消息的情况下使用

3、消息应答的方法

Channel.basicAck(用于肯定确认)
RabbitMQ已经知道该消息并且成功的处理消息,可以将其丢弃了。
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认)Channel.basicNack相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

4、Multiple的解释

手动应答的好处是可以批量应答并且减少网络拥堵

channel.basicAck(deliveryTag,true)

multipletruefalse代表不同意思
  true代表批量应答channel上未应答的消息,比如说channel上有传送tag的消息 5,6,7,8 当前tag是8 那么此时5-8的这些还未应答的消息都会被确认收到消息应答。
  false同上面相比,只会应答tag=8的消息,5,6,7这三个消息依然不会被确认收到消息应答
在这里插入图片描述

5、消息自动重新入队

  如果消费者由于某些原因失去连接(其通道已经关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
在这里插入图片描述

如上图所示,若消费者C1的连接突然断了,那么它就没有发送ACK确认,那么RabbitMQ会将该消息重新入队,如果此时消费者C2可以处理,那就将该消息交给C2

6、消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。

6.1 启动RabbitMQ

在虚拟机中执行命令systemctl start rabbitmq-server
启动之后查看下状态systemctl status rabbitmq-server
在这里插入图片描述

6.2 消息生产者

/**
 * 消息在手动应答时是不丢失的、放回队列中重新消费
 */
public class Task2 {

    //队列名称
    public static final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明队列
        boolean durable=true;   //需要让QUEUE进行持久化
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
//        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //从控制台中输入信息
        Scanner scanner=new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

6.3 消费者01

package com.atguigu.rabbitmq.three;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.atguigu.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 消息在手动应答时是不丢失的、放回队列中重新消费
 */
public class Work03 {

    public static final String TASK_QUEUE_NAME="ack_queue";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间较短");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            //沉睡1秒
            SleepUtils.sleep(1);

            System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 参数1:消息的标记 tag
             * 参数2:是否批量应答 false:不批量应答信道中的消息 true:批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> {
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        }));
    }
}

6.4 消费者02

package com.atguigu.rabbitmq.three;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.atguigu.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 消息在手动应答时是不丢失的、放回队列中重新消费
 */
public class Work04 {

    public static final String TASK_QUEUE_NAME="ack_queue";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间较长");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            //沉睡1秒
            SleepUtils.sleep(30);

            System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 参数1:消息的标记 tag
             * 参数2:是否批量应答 false:不批量应答信道中的消息 true:批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };
        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> {
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        }));
    }
}

6.5 手动应答效果演示

我们在生产者中发送四条消息,观察消费者C1和C2的处理过程,其中
我们设置消费者C1每处理一条消息休眠1秒
消费者C2每处理一条消息休眠30秒
生产者先发送四条消息
在这里插入图片描述
观察消费者C1和C2(默认使用的是轮询分发)
消费者C1处理了两条
在这里插入图片描述
消费者C2,由于消费者C2每30秒才能接收一条消息,所以这里还看不到它处理完成的响应。
在这里插入图片描述
这个时候观察队列中的消息,从下图可看出还有两条消息没有被接收,这是由于C2接收消息的速度太慢(我们故意设置的慢)
在这里插入图片描述

这里我们假设消费者C2的连接中断了,也就是我们关闭消费者C2的线程(模仿消费者C2连接失败)。
我们查看下消费者C1,可以看到,消费者C2没有处理完成的消息被重新入队,最后由RabbitMQ交给了消费者C1处理。
在这里插入图片描述

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。