RabbitMQ学习笔记 02、生产者与消费者、多消费者平均压力

举报
长路 发表于 2022/11/28 20:17:20 2022/11/28
【摘要】 文章目录前言一、第一个生产者与消费者前提准备条件①生产者(发送信息)②消费者(接收信息)二、对消息内容进行处理(一中扩展)三、多个消费者平均压力(三步骤)3.1、前者一、二示例可能会出现问题3.2、实战 前言 本篇博客是RabbitMQ的生产者、消费者案例,若文章中出现相关问题,请指出! 所有博客文件目录索引:博客目录索引(持续更新) 博客案例代码可见:Gitee-demoexer/RabbitM

@[toc]

前言

本篇博客是RabbitMQ的生产者、消费者案例,若文章中出现相关问题,请指出!

所有博客文件目录索引:博客目录索引(持续更新)

博客案例代码可见:Gitee-demoexer/RabbitMQGithub-javademos/RabbitMQ

一、第一个生产者与消费者

前提准备条件

首先我们连接使用xshell连接服务器,紧接着启动mq以及管理后台:

systemctl start rabbitmq-server  # 启动
rabbitmqctl status  # 查看状态
rabbitmq-plugins enable rabbitmq_management  # 启动管理后台 http://192.168.118.128:15672/

image-20210924083522902

若是没有分配虚拟主机的最好进行配置一下!

下面是本次示例的目录

image-20210924083002411

引入依赖:rabbitmq的客户端以及日志实现

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.27</version>
</dependency>


①生产者(发送信息)

下面的功能:连接远程rabbitmq,指定消息队列并发送一条数据。

说明:发送给rabbitmq的消息会暂时存储在队列中,直到对应消费者连接即可进行消费,也就说发送的记录能够被暂存!!!

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Send
 * @Author ChangLu
 * @Date 2021/9/23 21:15
 * @Description 发送给RabbitMQ一条消息
 */
public class Send {

    //队列名称
    private static String QUEUE_NAME = "Hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2、设置rabbitmq地址
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //3、建立连接
        Connection connection = connectionFactory.newConnection();
        //4、获取信道
        Channel channel = connection.createChannel();
        //5、声明队列
        //①队列名称;
        //②是否需要持久,例如服务器重启该队列是否需要存在,测试环境一般不需要所以设置为false,生产环境根据需求来定
        //③是否独有的意思,表明该队列是否只能给我这个连接使用。
        //④是否需要自动删除,在该队列没有使用的情况下自动删除
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //6、发布消息(发送出去!)
        String message = "Hello,world!";
        //①设置交换机,这里设置为空表示不以交换机形式进行匹配,而是直接匹配队列的形式
        //②routingkey:就是我们队列名称
        //③props:除了消息体之外还有该属性作为配置,这两者组成消息
        //④消息体:是字节数组形式的
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
        System.out.println("发送了消息:"+message);
        //7、关闭连接
        channel.close();
        connection.close();
    }

}

我们运行该程序,若是连接没有问题并且成功发送,程序是不会有报错提示:

image-20210924083812852

给出连接失败的报错提示:

image-20210924083740000



②消费者(接收信息)

其中1、2、3、4、5部分与生产者配置几乎一致,仅仅第6步是调用了接收方法,配置了三个参数,最后一个参数是回调函数是用来拿到指定队列的消息可用来进行消费的!!!

注意:接收消息后不要直接就进行资源的关闭了,因为该接收消息方法是异步的,一旦你进行连接关闭那么就收不到指定队列的消息了!!!

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Recv
 * @Author ChangLu
 * @Date 2021/9/23 21:27
 * @Description 接收消息,并打印,持续运行(持续消费)   除了第6部分是接收,其他操作都是相同的
 */
public class Recv {

    //队列名称
    private static String QUEUE_NAME = "Hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2、设置rabbitmq地址
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //3、建立连接
        Connection connection = connectionFactory.newConnection();
        //4、获取信道
        Channel channel = connection.createChannel();
        //5、声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //6、核心:接收消息。额外说明:由于发送方发送的消息没有消费者接受就会一直存储在队列里,一旦有消费方就会进行消费对应队列中的内容!
        //①队列的名称:指定队列
        //②是否是自动消息确认,意思就是我拿到这个消息之后告诉你发送的人我收到了,有签收的机制
        //③回调函数:获取到这个消息之后进行消费
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //获取到消息,body就是对应传输过来的消息体
                String message = new String(body,"UTF-8");
                System.out.println("收到消息:"+message);
            }
        });
        //注意:这里就不进行关闭了,因为这里直接关闭连接,上面的回调函数很有可能在进行回调前就结束程序,导致还没有消费程序就over了。
        //上面的回调函数是异步的!!!
    }

}

运行一下:

image-20210924084054078



二、对消息内容进行处理(一中扩展)

其实就是在一中进行改进测试,我们下面贴出生产者与消费者的核心代码:

生产者:发布了10条消息出去

//6、发布消息(发送出去!),指定队列task
for (int i = 0; i < 10; i++) {
    String message = "消息任务" + i + "!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("已发送信息:" + message);
}

消费者:不断的接收指定队列task的信息

//6、接收消息,指定队列task
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //获取到消息,body就是对应传输过来的消息体
        String message = new String(body,"UTF-8");
        execTask(message);
        System.out.println("消息:"+message+"处理完毕!");
    }
});

//做任务处理的
public static void execTask(String msg){
    char[] chars = msg.toCharArray();
    System.out.println("开始处理消息:"+msg);
    for (char aChar : chars) {
        //由于发送方发送的信息结尾都有!我们就可以对其来进行作为判断来进行睡眠操作
        if(aChar == '!'){
            try {
                //睡眠一秒,表示用来执行业务
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

GIF

我们可以对发送到指定队列的对应信息来进行相应的处理操作!!!



三、多个消费者平均压力(三步骤)

3.1、前者一、二示例可能会出现问题

之前直接进行处理消息的问题

示例

//生产者  省略核心代码
for (int i = 0; i < 15; i++) {
    String message = "消息任务"+i;
    if(i%2==0){
        message+= "!";
    }
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("已发送信息:" + message);
}

//消费者  根据!情况来进行睡眠模拟实际执行业务时间
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //获取到消息,body就是对应传输过来的消息体
        String message = new String(body,"UTF-8");
        execTask(message);
        System.out.println("消息:"+message+"处理完毕!");
    }
});

//做任务处理的
public static void execTask(String msg){
    char[] chars = msg.toCharArray();
    System.out.println("开始处理消息:"+msg);
    for (char aChar : chars) {
        //由于发送方发送的信息结尾都有!我们就可以对其来进行作为判断来进行睡眠操作
        if(aChar == '!'){
            try {
                //睡眠一秒,表示用来执行业务
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

根据示例处理方式实际上都是按照消息的数量分配,这就造成了可能一个服务早早的接收了对应数量的任务,并且这些任务执行的很快就全部处理好了;另一个服务也接收到相对等数量的任务,但是由于任务度复杂,耗费时间很长。这就形成了服务1早早结束不再接收消息,服务2一直在很繁忙的处理,服务1也不会进行帮其分担,因为它已经完成了自己的所有任务就不会再接收!

目标方案

  • 实现公平派遣,不按照消息的数量分配,而是按照压力进行分配
  • 目的:若是有多个消费者,默认是按照消息的数量进行分配,而我们要实现循环调度与公平派遣!!!
  • 需要加入消息确认的机制。

采用合理分配效果

使用了合理分配之后,其实流程为如下:

  1. 接收到任务,执行完成任务后进行确认。
  2. 确认完成后再向rabbitmq申请任务,这样优先执行完的服务能够继续得到新的任务,此时就达到了分担压力的效果!

对一、二中代码进行更改:

①basicConsume(前设置basicQos(1)
②basicConsume()方法第二个参数设置false,将自动接收关闭掉,需要手动确认
③在回调任务结尾调用channel.basicAck(envelope.getDeliveryTag(),false);,第二个表示是不是同时的把多个一起进行确认不需要。


3.2、实战

其实生产者不用动,还是原来的代码,我们只需要对消费者进行改动,其核心就是说让每个消费者消费完消息后进行手动确认再获取消息,这样就能够达到按需分配,减轻服务器压力,更高效的解决问题,性能提升!

//就是在以前第6步上做文章,分为三个部分更改操作
//1、设置公平
channel.basicQos(1);
//2、设置第二个参数为false,表示要进行手动消息签收
channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body,"UTF-8");
        execTask(message);
        System.out.println("消息:"+message+"处理完毕!");
        //3、执行签收确认操作,此时再向rabbitmq请求消息。(第二个参数表示是否帮多个进行同时确认)
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
});

此时我们程序的性能就能够得到显著的提升,按照压力大小来进行分配!

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。