业界消息总线技术分析-RabbitMQ
1 概述
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,最初由RabbitMQ Technologies Ltd开发并且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。
1.1 官网介绍
为应用提供健壮的消息投递,易于使用,能运行于多个主流的操作系统,多种类型的开发平台。
1.2 亮点
可靠性:RabbitMQ提供多种功能让你能够在性能与可靠性之间做出权衡,包括持久化、消息投递确认、发布者确认和高用性。
灵活的路由:消息在到达queues之前,通过exchanges进行路由。RabbitMQ为典型的路由逻辑內建了多种exchange类型。你能够将多种exchanges绑定在一起,也能够以插件的形式,自己实现一个exchange。
集群:在同一个本地网络中的多个RabbitMQ server能够组成集群,形成一个统一的逻辑broker。
联合:如果多个server只需要一个松散的、不可靠的连接,则可以通过RabbitMQ的联合模式实现。
高可用的queues:Queues能够在集群内多台机器之间镜像,确保在某一台机器故障时的消息安全。
多协议:RabbitMQ支持多种消息协议。核心协议是AMQP 0-9-1,另外还能够以插件的形式,支持STOMP、MQTT、AMQP1.0等。
多客户端:你能想到的任何语言几乎都有RabbitMQ的客户端。
管理界面:提供容易使用的管理界面来监控broker。
消息跟踪:RabbitMQ提供消息跟踪功能。
支持插件:支持以插件的形式,扩展功能。
1.3 RabbitMQ解决什么问题?
1)信息的发送者和接收者如何维持连接,如果一方的连接中断,这期间的数据如何防止丢失?
2)如何降低发送者和接收者的耦合度?
3)如何让Priority高的接收者先接到数据?
4)如何做到load balance?有效均衡接收者的负载?
5)如何有效的将数据发送到相关的接收者?也就是说接收者subscribe 不同的数据,如何做有效的filter。
6)如何做到可扩展,甚至将这个通信模块发到cluster上?
7)如何保证接收者接收到了完整,正确的数据?
AMQP协议解决了以上的问题,而RabbitMQ实现了AMQP。
2 性能吞吐量
2.1 与redis对比
测试环境
个人笔记本,Server与Client使用同一台机器。
Processor : Inter(R) Core(TM)2 Duo CPU P8400 @2.26GHz RAM : 4.00GB Operating System : 32-bit Ubuntu 10.10。
软件环境
RabbitMQ和Redis的Server启动都是用默认配置,客户端使用Java,JVM启动使用默认参数:
RabbitMQ Server 2.2.0
RabbitMQ Java Client 2.2.0
Redis 2.0 Stable
Jedis
入队性能
出队性能
测试结论
对于入队操作,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis慢的无法忍受。
对于出队操作,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
2.2 与kafka对比
测试环境
ubuntu 15.10 64位
cpu:inter core i7-4790 3.60GHZ * 8
内存:16GB
硬盘:ssd 120GB
软件环境:rabbmitmq 3.6.0 kafka0.8.1 (均为单机本机运行)
测试结果均为单操作测试,即生产的时候没有消费操作
测试结果
kafka :消费速度: 37,586 /s 生产速度: 448,753 /s
rabbitmq: 消费速度: 20,807 /s 生产速度 16,413 /s
测试结论
很明显的看出kafka的性能远超rabbitmq。
3 RabbitMQ的架构
架构图如下:
从架构图可以看出,Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。
有三种类型的Exchanges:direct, fanout,topic。每个实现了不同的路由算法(routing algorithm)。
Direct exchange: 如果routing key 匹配, 那么Message就会被传递到相应的queue中。
Fanout exchange: 会向相应的queue广播。
Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。
4 分布式模型
4.1 三种模式
集群模式(Clustering):
连接多台机器,作为一个逻辑的broker。集群节点之间通过Erlang的消息进行通信,同一个集群内的每一个节点,都必须有相同的Erlang cookie。集群内的所有节点,都必须有相同版本的RabbitMR与Erlang。
联合模式(Federation):
联合模式允许一个broker上的exchange或queue接收消息,然后发布到另外一个broker上的exchange或queue。这里的broker可以是独立的机器,也可以是一个集群。联合模式broker之间通过AMQP进行通信。
铲子模式(The Shovel):
使用铲子连接brokers在概念上与联合模式类似,不一样的是,铲子模式工作在更低的level。铲子只是简单地从一个broker的queue消费消息,然后转发到另外一个broker的exchange上。
对照
4.2 集群模式
RabbitMQ逻辑视图
RabbitMQ物理视图
Ø 分为磁盘节点与内存节点。
Ø 队列数据只能够存储在磁盘节点上。
Ø 状态数据存储在内存节点与磁盘节点上,内存节点只把数据保存在内存上,因此能够提高速度,但仅仅影响到资源管理。
Ø 节点之间通过Erlang Cookie进行鉴权,也就是说,互相通信的两个节点,一定要有相同的Erlang Cookie。
Ø 集群可以随时增加或减少节点,但需要注意的是,如果整个集群都宕掉,则恢复集群时最后宕掉的节点需要最先启动。
4.3 普通集群
Ø queue在集群中对于contents只存储一份,其他节点只存储meta信息。
Ø 对于publish,客户端任意连接集群的一个节点,转发给创建queue的节点存储消息的所有信息;
Ø 对于consumer,客户端任意连接集群中的一个节点,如果数据不在该节点中,则从存储该消息data的节点拉取。
Ø 可见当存储有queue内容的节点失效后,只要等待该节点恢复后,queue中存在的消息才可以获取消费的到。
Ø 显然增加集群的节点,可以提高整个集群的吞吐量,但是在高可用方面要稍微差一些。
4.4 集群镜像队列
Ø mirror queue是为RabbitMQ高可用的一种方案,相对于普通的集群方案来讲,queue中的消息每个节点都会存在一份或多份副本,具体有多少个副本,由策略进行配置。
Ø 在配置了镜像队列之后,单个节点失效的情况下,整个集群仍旧可以提供服务。但是由于数据需要在多个节点复制,在增加可用性的同时,系统的吞吐量会有所下降。
Ø 在实现机制上,mirror queue内部实现了一套选举算法,有一个master和多个slave,queue中的消息以master为主。
Ø 对于publish,可以选择任意一个节点进行连接,若该节点不是master,则RabbitMQ将消息转发给master,然后由master向其他slave节点发送该消息。
Ø 对于consumer,可以选择任意一个节点进行连接,消费的请求会转发给master。为保证消息的可靠性,consumer需要进行ack确认,master收到ack后,才会删除消息,ack消息会同步(默认异步方式)到其他各个节点,slave节点收到从master同步过来的ack,也会将对应的消息删除。
Ø 若master节点失效,则mirror queue会自动选举出一个节点(slave中消息队列最长者)作为master。
Ø 若slave节点失效,mirror queue集群中其他节点的状态无需改变。
5 网络模型
Ø 支持IPv4与IPv6双协议栈
Ø 底层使用TCP
Ø 支持TLS
Ø 官网介绍了较多针对传输层的调优措施,包括TCP Buffer Size、 OS Level Tuning、 TCP Socket Options等,具体可以参考 http://www.rabbitmq.com/networking.html
6 持久化
6.1 消息什么时候需要持久化
消息本身在publish的时候就要求消息写入磁盘。
内存紧张,需要将部分内存中的消息转移到磁盘。
6.2 消息什么时候会刷到磁盘
写入文件前会有一个Buffer,大小为1M(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘);
有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每隔25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘;
每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0来实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。
6.3 消息在磁盘文件中的格式
消息保存于$MNESIA/msg_store_persistent/x.rdq文件中,其中x为数字编号,从1开始,每个文件最大为16M(16777216),超过这个大小会生成新的文件,文件编号加1。消息以以下格式存在于文件中:
<<Size:64, MsgId:16/binary, MsgBody>>
MsgId为RabbitMQ通过rabbit_guid:gen()每一个消息生成的GUID,MsgBody会包含消息对应的exchange,routing_keys,消息的内容,消息对应的协议版本,消息内容格式(二进制还是其它)等等。
6.4 文件何时删除?
当所有文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操作(至少有三个文件存在的情况下),以提高磁盘利用率。
publish消息时写入内容,ack消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于0时,删除该文件。
7 消息QoS
可靠性
支持高可靠场景,一个master加上多个slave来保证消息的可靠。
安全
支持使用SASL进行认证
支持使用SSL进行通道加密
支持ACL
消息传递的优先级
AMQP协议本身支持优先级队列,具体的实现还得再分析。
8 总结
Broker部署较为复杂
基于Erlang语言实现,每一个节点都需要安装Erlang语言运行环境。
配置非常灵活
支持多种分布式模式,集群模式又分为普通集群与集群镜像队列,提供多种路由策略。同时通过插件方式,可以很方便地扩展RabbitMQ的功能。
无中心节点,可以支撑大集群
组件集群是直接使用Erlang语言的特性,不像kafka需要一个zookeeper。
RabbitMQ节点也没有
支持多种类型的协议
不仅仅支持RabbitMQ,还通过插件形式,支持MQTT、STOMP等。
- 点赞
- 收藏
- 关注作者
评论(0)