业界消息总线技术分析-RocketMQ
一、概述
官方简介:
u RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
u 能够保证严格的消息顺序
u 提供丰富的消息拉取模式
u 高效的订阅者水平扩展能力
u 实时的消息订阅机制
u 亿级消息堆积能力
二、性能吞吐量
单节点,消息大小 10个字节。
Kafka: 80万条/秒
RocketMQ:12万条/秒
Kafka性能高的原因:producer端多个小消息合并,批量发送到Broker。
可能存在的问题:如果producer宕机,则会导致消息丢失,业务出错。
三、分布式模型
3.1 网络部署模型
部署模型:Name server 轻量级的名字服务,存储cluster、broker、topic、queue之间的关系—即路由信息。
Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。每个name server都是全量的路由信息。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId=0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer 完全无状态,可集群部署。
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
3.2 生产端模型
PUSH(推消息)
单个生产者和该生产者关联的所有broker保持长连接。
3.3 消费者模型
轮询消息(PULL模式)
消息拉取线程每隔多久拉取一次?间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置。
消费完offset存储到broker上。该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置
四、网络模型
序号 | 名称 | 描述 | 优缺点 |
1 | 单个 Master | 一个集群只有一台Broker | 一旦Broker 重启或者宕机时,会导致整个服务不可用 |
2 | 多 Master 模式 | 一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master | 优点:配置简单,单个Master 宕机或重启维护对应用无影响。 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。 |
3 | 多 Master 多 Slave 模式,异步复制 | 每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。 | 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。 |
4 | 多 Master 多 Slave 模式,同步双写 | 每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。 | 优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。 |
五、消息模型
队列模型
Topic 可以有N个队列组成。队列可以在一台Broker上,也可以在不同Broker上。相当于kafka的分区。
Tag:消息的二级分类。可以通过此进行过滤消费。
消息模型:
序号 | 名称 | 说明 |
1 | 顺序消息 | 局部顺序,即一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。 |
2 | 普通顺序消息 | 正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。 |
3 | 严格顺序消息 | 顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。 目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。 |
4 | 事务消息 | 阿里云MQ支持分布式事务消息,未来开源版本的RocketMQ也有计划支持分布式事务消息。
|
消费模型:
序号 | 名称 | 说明 |
1 | 广播消费 | 一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次,广播消费中的Consumer Group概念可以认为在消息划分方面无意义。 |
2 | 集群消费 | 一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息。 |
3 | 并发消费 | 消费者个数比队列个数多时,并发消费吞吐量大。注意:该情况下,不保序。(这个就相当于我们后面做的并发消费。) |
六、消息转发实时性
(1). Producer 发送消息,消息从socket 进入java 堆。
(2). Producer 发送消息,消息从java 堆转入PAGACACHE,物理内存。
(3). Producer 发送消息,由异步线程刷盘,消息从PAGECACHE 刷入磁盘。
(4). Consumer 拉消息(正常消费),消息直接从PAGECACHE(数据在物理内存)转入socket,到达consumer,不经过java 堆。这种消费场景最多,线上96G 物理内存,按照1K 消息算,可以在物理内存缓存1 亿条消息。
(5). Consumer 拉消息(异常消费),消息直接从PAGECACHE(数据在虚拟内存)转入socket。
(6). Consumer 拉消息(异常消费),由于Socket 访问了虚拟内存,产生缺页中断,此时会产生磁盘IO,从磁盘Load 消息到PAGECACHE,然后直接从socket 发出去。
(7). 同5 一致。
(8). 同6 一致。
实时消息消费:大部分都是可以实时消息PageCache的,实时性高。
堆积消息的消费:“读写分离设计”,如果Broker有Slave,消费时,当Master发现,消费者消费的是磁盘上的数据,会把该消费者重定向到Slave节点进行读取。
七、持久化
刷盘:同步/异步刷盘
Replication:同步/异步,参加第四章节。
所有的Topic的队列写入同一个CommitLog,每个CommitLog文件默认大小1G,超过1G自动生成新的。
u 永远一个文件在写,其他文件在读
u 顺序写,随机读
u 消费时,使用mmap + write方式
消息清理
扫描间隔
默认10秒,由broker配置参数cleanResourceInterval决定
空间阈值
物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%
清理时机
默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值
文件保留时长
默认72小时,由broker配置参数fileReservedTime决定
八、消息QoS机制
u 可靠性
Master slave模式时,数据可靠。可以配置1个master,多slave即多副本。
u 安全
通信基于Netty的,可以配置SSL通道。
数据存储安全,开源版本没有看到。
u 时间约束
?
u 消息传递的优先级
没有。优先级会严重影响队列性能。在内存的数据,排序还好点,已经落盘的排序,有点不可能了。
九、扩展性
扩容:
Broker可以自由的扩容。这样整个集群能力增强,但是对于已经存在的topic的队列,不能自动rebalance到新增的上去。需要专门工具,开源版本没有。
Name server 的扩容,Broker怎么动态发现最新的Name server?客户端(Producer/Comsumer)怎么发现添加的Name server?
Ø 环境变量指定NameServer地址·
export NAMESRV_ADDR=192.168.8.106:9876
Ø http静态服务器寻址
客户端启动后,会定时访问一个静态的HTTP服务器,地址如下:
http://examle.com:8080/rocketmq/msaddr
这个URL的返回内容如下:
192.168.8.106:9876
客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的NameServer地址。URL已经在代码中写死,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:
10.232.22.67 examle.com
缩容:开源版本没有更多介绍。
升级:v3.2.6新版本向前向后兼容,客户端与服务器不同版本可互相兼容。
消息丰富的过滤机制。可扩展。
Ø 简单过滤:
订阅消息时,可以根据tag过滤。
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
Ø 高级过滤:
可以在Broker上开启 Filter Server 进程进行过滤。Filter Server上运行的代码,可以自定义。使用CPU 资源来换取网卡流量资源(因为CPU通常不高,如果客户端过滤,则浪费带宽。但是过滤会带来时延)。
九、总结
Broker部署复杂:Broker本身分为Master/Slave角色
Topic Replica只能按照上面Broker的Master/Slave配置,即如果topic创建在1个master/1个slave上,则具有1个副本。如果创建3副本的topic,则需要专门放在1个master/2 slave的Broker上。不灵活。
但是这种结构比kafka可能更可以支撑大集群。因为
1、 没有统一的controller节点。
2、 Broker之间的通信也是固定的,master只和自己的slave通信,进行数据的副本复制。而Kafka的Broker需要和不定数目的Broker进行通信。(topic A 副本可能分布在Broker 1,Broker2,Broker 3上,Topic B的副本可能分布在Broker 1 ,Broker 10,Broker20上)
3、 RocketMQ 2.0 是zookeeper,RocketMQ是自研的name server。可以深入思考一下,为什么?name server代码行数不到1000行,就是个内存K/V数据库。非常轻量级,没有watch机制,都是通过心跳检测。Name server之间不通信,不同步。多台name server相当于多台热备。重新添加一台name server,每30s Broker更新路由信息,可以全量获取到所有的信息。
4、RocketMQ支持丰富的消费过滤,定时发送,失败重传,事务消息,可能是和电商或者说阿里本身的场景有关,即他们需要这些丰富的功能。我们的核心场景是什么?要有所取舍。
- 点赞
- 收藏
- 关注作者
评论(0)