RabbitMQ灵活运用,怎么理解五种消息模型

举报
战斧 发表于 2023/09/21 19:17:33 2023/09/21
【摘要】 上次我们介绍了,为什么rabbitMQ会被很多人中意选型,从而成为火热的MQ组件,今天就先来说一说MQ的基础使用 ———— 其五种消息模型一、AMQP协议我们都知道,RabbitMQ是一个使用Erlang语言,基于AMQP协议的MQ组件,那什么是AMQP协议呢,我们就从这开始今天的学习。AMQP全称为 Advanced Message Queuing Protocol(高级消息队列协议),是...


上次我们介绍了,为什么rabbitMQ会被很多人中意选型,从而成为火热的MQ组件,今天就先来说一说MQ的基础使用 ———— 其五种消息模型


一、AMQP协议

我们都知道,RabbitMQ是一个使用Erlang语言,基于AMQP协议的MQ组件,那什么是AMQP协议呢,我们就从这开始今天的学习。

AMQP全称为 Advanced Message Queuing Protocol(高级消息队列协议),是一个面向消息的中间件传输协议,用于在应用程序之间进行异步消息通信。

AMQP协议定义了多种角色和服务,包括生产者、消费者、交换器、队列等。其中生产者负责生成消息,消费者负责接收和处理消息,交换器则负责将消息路由到队列中。如下图
在这里插入图片描述

AMQP协议的消息传输基于消息队列,支持消息的确认、持久化、事务等功能,同时支持不同的消息传输模式,如点对点(point-to-point)和发布-订阅(publish-subscribe)模式。

我们今天要介绍的RabbitMQ的五种模型,其实都是基于AMQP的基础模型或其演变而来

二、交换机类型与默认交换机

1. 交换机的四种类型

在进一步讲解之前,我们需要知道RabbitMQ,交换机有四种类型,分别是:Direct、Fanout、Topic和Headers。

  • direct:
    按照消息的路由键(routing key)完全匹配来投递消息。直接匹配模式,将消息发送到与路由键完全匹配的队列中。direct模式可以使用RabbitMQ自带的默认交换机,所以不需要将交换机进行任何绑定操作

  • topic:
    使用通配符进行模糊匹配,消息会带有一个路由键(routing key),而队列绑定到交换机上时,也可以指定主题,而且还能使用一定的正则形式,只要主题匹配上消息的路由键,该消息就会发送至该队列
    符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’
    符号“ * ” 匹配不多不少一个词 eg:“ log.* ”只能匹配到“log.erro”

  • headers:
    按照消息头(header)匹配来投递消息。根据消息头的键值对匹配,当消息头与某个绑定的headers完全匹配时,才会将消息发送到该队列中。

  • fanout :
    将接收到的所有消息广播到它知道的所有队列中,如果routing_key 有指定也不会生效

不难发现,这四种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,四种类别对应着四种判断角度。fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;headers —— 这种模式的交换机不再以消息的routing_key作为判断依据,而是在队列绑定交换机时,每个队列需提供一个Map,当消息发送给交换机时,交换机会解析消息头,看看有没有能和各队列Map吻合的属性,有则发给该队列。

2. 默认交换机

RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同

如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。默认交换机可以通过设置routing_key来指定消息的目的地,例如:

//  将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")

但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。

三、五种模式速览

1. 一对一简单模式

在这里插入图片描述

  • 概念
    生产者将消息发送到“ hello”队列。使用者从该队列接收消息。

  • 图解
    “ P”是我们的生产者
    “ C”是我们的消费者
    中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。需要注意的是,虽然看起来生产者直接连接了队列,但是实际上,它连接的是rabbitMQ的默认交换机

在这里插入图片描述


2. work模式(轮询)

在这里插入图片描述

  • 概念
    work模式是一个生产者,一个队列,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列,C1\C2是竞争关系,一个消息被C1消费,C2将没有消息

  • 优点及作用
    一个生产者一个队列多个消费者模式能够并行化工作,解决了消息积压问题

  • 工作原理
    默认情况下,RabbitMQ将每个消息按顺序发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

  • 注意
    轮询分发采用平均,导致机器性能的浪费,可以将消费者信道设置如下,代表不公平分发

int preFetchCount = 1;
// 该参数的意思是消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者,该设置需要和手动ACK配合使用
channel.basicQos(preFetchCount )


3. 发布/订阅模式

前两种发布模式,在图中都没有画出 Exchange交换机,属于是一种模型的简化,实际上上两种模式,都将消息发布给了rabbitMQz自带的默认交换机,然后默认交换机再将消息转发给消息队列
(PS:每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同)
在这里插入图片描述

  • 概念
    和模式二不同的是,模式三是没有路由规则,多个队列,多个消费者生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,因此一个消息可以被多个消费者消费

4. 路由模式(自称direct模式)

在这里插入图片描述

  • 概念
    交换机连接接队列发送消息需要指定routing_key,所以模式四就是生产者发送消息到交换机并且要指定routing_key,消费者将队列绑定到交换机时需要指定路由key
  • 算法
    背后的路由算法很简单:消息自带一个routing_key(亦称路由键),交换机会把该消息路由给与其routing_key完全匹配的队列 

在这里插入图片描述
在direct模式中,第一个队列以 orange为 key 绑定至交换机 x,第二个队列以black和green绑定。注意:如果有消息没有使用上述key,比如某个消息的key 是 red ,那么该消息将会被丢弃

另外,用相同的路由键可以绑定多个队列
在这里插入图片描述


5. Topic模式

在这里插入图片描述

  • 概念
    topic模式的routing_key使用通配符组成进行模糊匹配

符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’
符号“ * ” 只匹配一个词 eg:“ log.* ”只能匹配到“log.erro”

举例说明,当使用下列 routing_key 发送消息时:

  1. “quick.orange.rabbit”的消息将传递到两个队列;
  2. “lazy.orange.elephant ”也将发送给他们两个;
  3. “quick.orange.fox ”只会进入Q1,而“ lazy.brown.fox ”只会进入Q2;
  4. “lazy.pink.rabbit ”将被传递到Q2只有一次,即使两个绑定都匹配;
  5. “quick.brown.fox ”与任何绑定都不匹配,因此将被丢弃;
  6. “orange“ 和“”quick.orange.male.rabbit“,因为单词个数对不上,则不会被匹配
  7. “lazy.orange.male.rabbit ”即使有四个单词,也将匹配最后一个绑定,并将其传送到Q2队列。

四、实例

我们以最复杂的Topic模式为例,实际写一段java代码

1. 生产者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicProducer {

    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建Topic类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 待发送的消息
        String routingKey = "topic.key1";
        String message = "hello rabbitmq, this is topic message";

        // 发送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("Sent message: " + message + ", routingKey: " + routingKey);

        channel.close();
        connection.close();
    }
}

2. 消费者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicConsumer {

    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建Topic类型的交换机,由于发送者已创建,此步其实可省略,一般由发送方建立交换机
        // channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个非持久、独占、自动删除的队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定路由键为 "topic.#" 的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "topic.#");

        System.out.println("Waiting for messages. To exit press CTRL+C");

        // 消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message + ", routingKey: " + routingKey);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}


五、总结

rabbitMQ的五种模式就介绍完了,其实后三种模式非常相像,我们可以这么理解Topic模式

当队列用‘#’绑定时它将接收所有消息,与routing_key无关,此时就像fanout模式一样
当队列绑定中不使用‘*’和‘#’时,主题交换就像direct模式一样。

而至于前两种模式,由于模型中不带有交换机,所以生产者和消费者都是直接连接的Queue,则就是直肠子的模式

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。