消息中间件之RabbitMQ学习笔记
写在前面
-
嗯,陆续的整理一些中间件的笔记 -
今天和小伙伴们分享 RabbitMQ
相关的笔记 -
博文内容涉及: -
《分布式消息中间件实践》
RabbitMQ
部分读书笔记 -
RabbitMQ
的简单介绍 -
AMQP
协议标准介绍 -
RabbitMQ
部分场景API Demo
-
「 傍晚时分,你坐在屋檐下,看着天慢慢地黑下去,心里寂寞而凄凉,感到自己的生命被剥夺了。当时我是个年轻人,但我害怕这样生活下去,衰老下去。在我看来,这是比死亡更可怕的事。--------王小波」
RabbitMQ
RabbitMQ简介
RabbitMQ
是一个由Erlang
语言开发的基于AMOP标准
的开源
消息中间件。RabbitMQ
最初起源于金融系统
,用于在分布式系统
中存储转发消息,在易用性
、扩展性
、高可用性
等方面表现不俗。其具体特点
包括:
-
保证可靠性( Reliability), RabbitMQ
使用一些机制来保证可靠性,如持久化、传输确认、发布确认
等。 -
具有灵活的 路由(Flexible Routing)功能
。在消息进入队列之前,是通过Exchange (交换器)
来路由消息
的。对于典型的路由功能,RabbitMQ
已经提供了一些内置的Exchange
来实现。针对更复杂的路由功能,可以将多个Exchange
绑定在一起,也可以通过插件机制来实现自己的Exchange
. -
支持消息集群(Clustering),多台 RabbiMQ
服务器可以组成一个集群,形成一个逻辑Broker
. -
具有高可用性(Highly Available),队列可以在 集群中
的机器上进行镜像
,使得在部分节点出现问题的情况下队列仍然可用。 -
支持多种协议(Multi-protocol), RabbitMQ
除支持AMQP
协议之外,还通过插件的方式支持其他消息队列协议,比如STOMP, MQTT
等。 -
支持多语言客户端(Many Client), RabbitMQ
几乎支持所有常用的语言,比如Java
..NET
,Ruby
等 -
提供管理界面(Management UI), RabbitMQ
提供了一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面 -
提供跟踪机制(Tracing), RabbitMQ
提供了消息跟踪机制,如果消息异常,使用者可以查出发生了什么情况。 -
提供插件机制(Plugin System), RabbitMQ
提供了许多插件,从多方面进行扩展,也可以编写自己的插件.
上面讲到了AMOP协议
,这里我们简单了解下
AMQP标准
在2004年,摩根大通
和iMatrix
开始着手Advanced Message Queuing Protocol (AMQP)
开放标准的开发。2006
年,发布了AMQP规范
。目前AMQP
协议的版本为1.0
。
「一般来说,将AMQP
协议的内容分为三部分:基本概念
、功能命令
和传输层协议
」 。
基本概念:指AMQP内部定义的各组件及组件的功能说明
。
功能命令:指该协议所定义的一系列命令,应用程序可以基于这些命令来实现相应的功能。
传输层协议(TCP/UDP):是一个网络级协议,它定义了数据的传输格式,消息队列的客户端可以基于这个协议与消息代理和AMQP的相关模型进行交互通信,该协议的内容包括数据帧处理、信道复用、内容编码、心跳检测、数据表示和错误处理
等。
「主要概念」
Message (消息)
:消息服务器所处理数据的原子单元
。消息可以携带内容
,从格式上看,消息包括一个内容头
、一组属性
和一个内容体
。
❝这里所说的消息可以对应到许多不同
❞应用程序的实体
,比如一个应用程序级消息
、一个传输文件
、一个数据流帧
等。消息可以被保存到磁盘上,这样即使发生严重的网络故障、服务器崩溃也可确保投递消息可以有优先级,高优先级的消息会在等待同一个消息队列时在低优先级的消息之前发送,当消息必须被丢弃以确保消息服务器的服务质量时,服务器将会优先丢弃低优先级的消息。消息服务器不能修改所接收到的并将传递给消费者应用程序的消息内容体。消息服务器可以在内容头中添加额外信息,但不能删除或修改现有信息。
Publisher (消息生产者)
:也是一个向交换器发布消息
的客户端
应用程序。
Exchange (交换器)
:用来接收消息
生产者所发送的消息并将这些消息路由
给服务器中的队列
。
Binding (绑定)
:用于消息队列和交换器之间的关联
。一个绑定就是基于路由键
将交换器和消息队列连接起来的路由规则
❝所以可以将交换器理解成一个由绑定构成的
❞路由表
(路由控制表,IP寻址)。
Virtual Host (虚拟主机)
:它是消息队列以及相关对象
的集合,是共享同一个身份验证和加密环境的独立服务器域。每个虚拟主机本质上都是一个mini版的消息服务器
,拥有自己的队列、交换器、绑定和权限机制。
Broker (消息代理)
:表示消息队列服务器
,接受客户端连接,实现AMQP消息队列和路由功能的过程
。
Routing Key (路由规则)
:虚拟机可用它来确定如何路由一个特定消息。
Queue (消息队列)
:用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可被投入一个或多个队列中。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Connection (连接)
:可以理解成客户端和消息队列服务器之间的一个TCP连接
。
Channel (信道)
:仅仅当创建了连接后,若客户端还是不能发送消息,则需要为连接创建一个信道
。信道
是一条独立的双向数据流通道
,它是建立在真实的TCP连接内的虚拟连接
。
❝❞
AMQP命令
都是通过信道
发出去的,不管是发布消息、订阅队列还是接收消息
,它们都通过信道完成。一个连接
可以包含多个信道
,之所以需要信道
,是因为TCP连接
的建立
和释放
都是十分昂贵的,如果客户端的每一个线程都需要与消息服务器交互,如果每一个线程都建立了一个TCP连接,则暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。
Consumer (消息消费者)
:表示一个从消息队列中取得消息的客户端应用程序。
「核心组件的生命周期」
「消息的生命周期,一条消息的流转过程通常是这样的」:
-
Publisher(消息生产者)
产生一条数据,发送到Broker(消息代理)
,Broker
中的Exchange(交换器)
可以被理解为一个规则表(Routing Key和Queue的映射关系-Binding),Broker
收到消息后根据Routing Key
查询投递的目标Queue
. -
Consumer
向Broker
发送订阅消息时会指定自己监听哪个Queue
,当有数据到达Queue
时Broker
会推送数据到Consumer
.
「交换器的生命周期」
每台AMQP服务器
都预先创建了许多交换器实例,它们在服务器启动时就存在并且不能被销毁。如果你的应用程序有特殊要求,则可以选择自己创建交换器,并在完成工作后进行销毁。
「队列的生命周期」
这里主要有两种消息队列的生命周期,即持久化消息队列
和临时消息队列
。持久化消息队列可被多个消费者共享
,不管是否有消费者接收,它们都可以独立存在。临时消息队列对某个消费者是私有
的,只能绑定到此消费者,当消费者断开连接时,该消息队列将被删除。
「功能命令」
AMQP协议文本
是分层描述的,在不同主版本中划分的层次是有一定区别的。
「0-9」 版本共分两层: Functional Layer (功能层)
和Transport Layer (传输层)
-
功能层
定义了一系列命令
,这些命令按功能逻辑组合成不同的类(Class),客户端应用可以利用它们来实现自己的业务功能。 -
传输层
将功能层所接收的消息传递给服务器经过相应处理后再返回,处理的事情包括信道复用、帧同步、内容编码、心跳检测、数据表示和错误处理
等.
「0-10」 版本则分为三层: Model Layer (模型层)
、Session Layer (会话层)
和Transport Layer(传输层)
。
-
模型层
定义了一套命令,客户端应用利用这些命令来实现业务功能。 -
会话层
负责将命令从客户端应用传递给服务器,再将服务器的响应返回给客户端应用,会话层为这个传递过程提供了可靠性、同步机制和错误处理。 -
传输层
负责提供帧处理、信道复用、错误检测和数据表示
「 消息数据格式」
所有的消息必须有特定的格式来支持
,这部分就是在传输层中定义
的。AMQP是二进制协议
,协议的不同版本在该部分的描述有所不同。0-9-1版本
为例,看一下该版本中的消息格式
所有的消息数据
都被组织成各种类型的帧(Frame)
,帧可以携带协议方法
和其他信息
,所有帧
都有同样的格式,都由一个帧头(header, 7个字节)
、任意大小的负载(payload)
和一个检测错误的结束帧(frame-end)字节
组成。其中:
-
帧头
包括一个type字段
、一个channel
字段和一个size
字段; -
帧负载
的格式依赖帧类型(type)
要读取一个帧
需要三步。
-
①读取帧头,检查 帧类型
和通道(channel)
. -
②根据 帧类型
读取帧负载
并进行处理。 -
③读取结束帧字节。
AMQP
定义了如下帧类型。
-
type=1, "METHOD":方法帧; -
type=2, "HEADER":内容头帧; -
type=3,"BODY":内容体帧; -
type=4, "HEARTBEAT":心跳帧通道
编号为0的代表全局连接中的所有帧
, 1-65535代表特定通道的帧
。size
字段是指帧负载
的大小,它的数值不包括结束帧字节
。AMQP
使用结束帧
来检测
错误客户端和服务器实现引起的错误。
RabbitMQ基本概念
如图是RabbitMQ的整体架构图。
Message (消息)
:消息是不具名的,它由消息头
和消息体
组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括 routing-key (路由键)
,priority (相对于其他消息的优先级)
、 delivery-mode (指出该消息可能需要持久化存储)等
。
Publisher (消息生产者)
:一个向交换器发布消息的客户端应用程序。
Exchange (交换器)
:用来接收生产者
发送的消息,并将这些消息路由给服务器
中的队列
。.
RabbitMQ
是AMQP
协议的一个开源实现
,所以其基本概念
也就是AMQPt
中的基本概念。关于其他的概念小伙伴可以看上面。
(1) AMQP中的消息路由
在AMQP
中增加了Exchange
和Binding
的角色。生产者需要把消息发布到Exchange
上,消息最终到达队列并被消费者接收,而Binding
决定交换器上
的消息应该被发送到哪个队列
中。
(2)交换器类型
不同类型的交换器分发消息
的策略
也不同,目前交换器有4种类型: Direct
, Fanout
, Topic
,Headers
。其中Headers
交换器匹配AMQP
消息的Header
而不是路由键
。此外, Headers
交换器和Direct
交换器完全一致,但性能相差很多,目前几乎不用了,所以下面我们看另外三种类型。
Direct交换器
「如果消息中的路由键(routing key)和Binding中的绑定键(binding key)一致,交换器就将消息发送到对应的队列中」.
路由键与队列名称要完全匹配
,如果将一个队列绑定到交换机要求路由键为“dog",则只转发routing key标记为"dog"的消息,不会转发"dog.puppy"消息,也不会转发"dog.guard "消息等。Direct交换器是完全匹配、单播的模式
。
Fanout交换器
「Fanout交换器不处理路由键,只是简单地将队列绑定到交换器」 发送到交换器的每条消息都会被转发到与该交换器绑定的所有队列中,这很像子网广播
,子网内的每个主机都获得了一份复制的消息。通过Fanout交换器转发消息是最快的。
Topic交换器
「Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某种模式进行匹配,此时队列需要绑定一种模式。」
Topic交换器将路由键和绑定键的字符串切分成单词
,这些单词之间用点"."隔开,该交换器会识别两个通配符: “#”和“*”,其中“#”匹配0个或多个单词, “*”匹配不多不少一个单词。
RabbitMQ Demo
RabbitMQ官网:https://www.rabbitmq.com/
RabbitMQ服务安装
基于Docker的安装:
RabbitMQ镜像 :https://registry.hub.docker.com/_/rabbitmq?tab=description&page=2&ordering=last_updated
# 启动docker服务
[root@liruilong ~]# systemctl restart docker
# 查看镜像
[root@liruilong ~]# docker images
#指定版本,该版本包含了web控制页面
[root@liruilong ~]# docker pull rabbitmq:management
运行容器:
方式一:默认guest 用户,密码也是 guest
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
方式二:设置用户名和密码
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
发布服务,将端口映射到15672,5672
[root@liruilong ~]# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
2189f2fa53f1e76306a2ad422e0fa33bca1ae0f3ee77514573d71aca9ce24801
[root@liruilong ~]#
「这里需要注意的是端口绑定,需要把访问
端口和管理
端口同时绑定。如果是ESC的话,需要配置安全组」
访问路径:http://localhost:15672/ 登录
Hello World!
「Java客户端访问RabbitMQ实例」
RabbitMQ
支持多种语言访问。使用java需要添加的maven
依赖,下面我们看一个简单的Demo
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
「消息生产者」
package msg_queue.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liruilong
* @Description TODO 消息生产者
* @date 2022/4/20 20:46
**/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置 RabbitMQ 地址
factory.setHost("localhost");
//默认访问5672端口 factory.setPort(5672);
factory.setVirtualHost("/");
//建立到代理服务器到连接
try (Connection conn = factory.newConnection();
//创建信道
Channel channel = conn.createChannel()) {
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
// 定义 路由键
String routingKey = "testRoutingKey";
//发布消息
byte[] messageBodyBytes = "学习Rabbitmq".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
}
}
}
首先创建一个连接工厂,再根据连接工厂创建连接,之后从连接中创建信道,接着声明一个交换器和指定路由键,然后才发布消息,最后将所创建的信道、连接等资源关闭。代码中的ConnectionFactory, Connection、 Channel都是RabbitMQ提供的API中最基本的类。
-
ConnectionFactory
是Connection的制造工厂 -
Connection
代表RabbitMQ的Socket连接,它封装了Socket操作的相关逻辑。 -
Channel
是与RabbitMQ打交道的最重要的接口,大部分业务操作都是在Channel中完成的,比如定义队列、定义交换器、队列与交换器的绑定、发布消息等。
「消息消费者」
package msg_queue.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Liruilong
* @Description TODO 消息消费者
* @date 2022/4/20 20:48
**/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
//建立到代理服务器到连接
try (Connection conn = factory.newConnection();
//创建信道
final Channel channel = conn.createChannel()) {
//声明交换器
String exchangeName = "hello-exchange";
// true 设置是否持久化
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "testRoutingKey";
//绑定队列,通过键 testRoutingKey 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
//消费消息
while (true) {
// 设置是否自动确认,当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 表示会自动回复 mq,如果设置为 false,要通过编程实现回复
boolean autoAck = false;
channel.basicConsume(queueName, autoAck
// 设置消费者获取消息成功的回调函数
, (consumerTag, delivery) -> {
System.out.printf("消费的消息体内容:%s\n", new String(delivery.getBody(), "UTF-8"));
System.out.println("消费的路由键:" + delivery.getEnvelope().getRoutingKey());
System.out.println("消费的内容类型:" + delivery.getProperties().getContentType());
System.out.println("consumerTag:"+consumerTag);
//确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 设置消费者获取消息失败的回调函数
}, consumerTag -> {
System.out.println("consumerTag:"+consumerTag);
});
}
}
}
}
消费的消息体内容:学习Rabbitmq
消费的路由键:testRoutingKey
消费的内容类型:null
consumerTag:amq.ctag-rC_49IlY-Awwj7G_hXIR_Q
通道
消息客户端和消息服务器之间的通信是双向
的,不管是对客户端还是服务器来说,保持它们之间的网络连接是很耗费资源的。为了在不占用大量TCP/P连接的情况下也能有大量的逻辑连接, AMQP增加了通道(Channel)的概念..
RabbitMQ支持并鼓励在一个连接中创建多个通道,因为相对来说创建和销毁通道的代价会小很多。需要提醒的是,作为经验法则,应该尽量避免在线程之间共享通道,你的应用应该使用每个线程单独的通道,而不是在多个线程上共享同一个通道,因为大多数客户端不会让通道线程安全(因为这将对性能产生严重的负面影响)。
总结
个人认为, RabbitMQ
最大的优势
在于提供了比较灵活的消息路由策略
、高可用性
、可靠性
,以及丰富的插件
、多种平台支持和完善的文档
。不过,由于AMQP协议
本身导致它的实现比较重量,从而使得与其他MQ (比如Kafka)对比其吞吐量处于下风
。在选择MQ时关键还是看需求-是更看重消息的吞吐量
、消息堆积能力
还是消息路由的灵活性
、高可用性
、可靠性等方面,先确定场景,再对不同产品进行有针对性的测试和分析,最终得到的结论才能作为技术选型的依据
- 点赞
- 收藏
- 关注作者
评论(0)