业界消息总线技术分析-ZeroMQ
1. ZeroMQ的设计理念
引用官方的说法: “ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD 套接字之上的一层封装。ZMQ 让编写高性能网络应用程序极为简单和有趣。”
ZeroMQ从设计开始就打算设计成没有消息代理,其中Zero表示零 broker,同时也表示接近零时延、零管理、零代价、零浪费。所以ZeroMQ采用极简的方式实现消息队列的功能,追求性能的极致,用户基于zeromq可以很快构建消息队列的基本能力,其他的一些能力也可以用户自定义实现。其开源协议是I-GPL协议,需要采用动态链接的方式才能避免使用后也被开源的风险。
其架构可以更容易支持低时延与高吞吐。
2. 为什么需要ZeroMQ
目前很多应用程序由跨越某种网络的组件组成,不是局域网就是互联网。那么多的程序员最终都在从事某种消息传递。一些开发者使用消息队列产品,但大多是用TCP或UDP来自己开发。这些协议不难使用,但是从A到B发送少量字节和任何可靠方式的消息传递之间是有非常大的区别的。
让我们看看当我们开始用原始TCP来连接时面临的典型问题。任何可重用的消息层都需要解决全部或大部分以下问题:
我们如何处理I/O?你的程序阻塞吗,还是在后台处理I/O?这是设计上的一个关键。阻塞I/O创建的架构不能很好扩展。但是后台I/O要做好是非常困难的。
我们如何处理动态组件,就是会暂时离开的部分?我们是否要在形式上将组件划分为“客户端”和“服务器”,并要求服务器不能消失?那如果我们想要服务器连接服务器怎么办?我们是否要每隔几秒就尝试重新连接?
我们如何表述线上的消息?我们如何将数据组帧才能让它易写易读,缓冲溢出也很安全,对小型消息也很高效,又能够胜任关于戴着狂欢帽的跳舞猫的超大视频?
无法立刻投递的消息我们又如何处理?尤其是正当我们等着一个组件回到在线状态?我们是放弃消息,扔到数据库里,还是放到一个内存队列中?
我们把消息队列存到哪儿去?如果组件从队列读取的速度很慢导致队列堆积是什么原因?这种情况下我们的对策又是什么?
丢失的消息我们如何处理?我们是等待新数据,请求重发,还是建造某种可靠性层来保证消息无法丢失?那如果这个层自身崩溃了又怎么办?
假设我们要使用不同的网络传输会如何。比如说,用多播来替代TCP单播?或者IPv6?我们需要重写程序吗,或者传输已经抽象到某个层了吗?
我们如何路由消息?我们能将同一消息发送到多个对等点吗?
我们如何写一个用于其它语言的API?我们是重新实现一个线路级协议还是重新打包一个库?如果是前者,如何保证效率和稳定堆叠?如果是后者,如何保证互用性?
我们如何表述数据让它能在不同架构间读取?我们要对数据类型强制一种特定编码吗?这是消息系统的工作吗,难道不该是更高层的事吗?
如何处理网络错误?我们是等待重试,悄然忽略,还是中断?
看一个典型的开源项目如HadoopZookeeper,参见src/c/src/zookeeper.c里的C API。当此文写作时,2010年,已有3200行神秘代码,里面有个未公开的客户端服务器网络通信协议。我明白它很有效率因为使用了poll()而不是select()。但实际上,Zookeeper应该使用一个通用的消息层和显式公开的线路级协议。对于团队来说要一遍一遍的建造这个独特的轮子真是个惊人的浪费。
但是如何制作可重用消息层?为何当那么多项目需要这项技术,人们还是在用困难的办法,通过在代码中驱使TCP套接字,并解决着那个长长列表中的难题,一遍一遍?
图1 – 开始时的消息传递Messaging as it Starts
事实证明建造可重用消息传递系统真的很难,这就是为何只有少数FOSS(自由开源软件)尝试过,而商业的消息传递产品为何复杂、昂贵、僵化、脆弱。2006年iMatix公司设计了AMQP(高级消息队列协议),它开始给予FOSS开发者或许是第一个消息传递系统的可重用处方。AMQP比很多其它设计都工作的更好,但还是相对复杂、昂贵、脆弱。需要花数周时间来学习使用,数月时间才能创造出当事情变得复杂时不至于崩溃的稳定架构。
大部分消息传递项目,例如AMQP,在尝试以可重用方式解决这个长列表中的难题时,是通过发明一个新概念,“中介”,来做寻址、路由、和队列。这导致了一个客户端服务器协议或者一组API构建在一些未公开协议之上,来让程序与中介交谈。中介在减少大型网络复杂度方面是非常出色的。但是将基于中介的消息传递添加到产品例如Zookeeper将使它更糟,而不是更好。这将意味着添加一个额外的大型机,和一个新的单一故障点。中介迅速的成为一个瓶颈和一个新的管理风险。如果软件支持,我们能添加第二、第三、第四个中介,还能做一些容错方案。人们这么做着。创建了更多的移动部件、更多复杂度、更多故障。
并且以中介为中心的模式需要它自己的操作团队。你真的需要日夜观察着中介,当它行为不当时用棍子抽打。你需要机子,还需要备份的机子,还有管理这些机子的人。只有在做有很多移动部件、多个团队人员建造的、跨越多年的大型程序时才值得这么做。
所以中小型程序开发者被困住了。要么他们避免网络编程,去做无需缩放的整体应用。要么他们跳入网络编程去做脆弱、难以维护的复杂程序。要么他们下赌注在一个消息传递产品上,最终可扩展性程序基于昂贵、易碎的技术。真没有什么好的选择,这也许是为何消息传递在上世纪死死卡住并激起强烈情绪。对于用户来说是负面的,对于依靠支持和授权来盈利的人则是兴奋而愉悦的。
图 2 – 消息传递变成了Messaging as it Becomes
我们需要的是能做消息传递但方式上如此简单而廉价,它能够以接近零的成本,工作在任何程序中。它应该是一个只需要链接的函数库,无需任何其它依赖。没有附加的移动部件,所以也没有附加的风险。应当能运行在任何操作系统和任何编程语言。
而这就是ØMQ:一个高效的、可嵌入库,解决了让一个程序变得富有弹性的跨过网络的大部分难题,成本不高。
特别是:
它在后台线程异步的处理I/O。这些后台线程使用无锁数据结构与程序线程交流,所以并发ØMQ程序不需要锁、信号量、或其它等待状态。
组件可以动态的来来去去,而ØMQ会自动重连。这意味着你可以按任意顺序启动组件。你可以创建“面向服务架构”(SOAs),服务可以随时加入和离开网络。
当需要时它自动将消息排入队列。以智能的方式,消息排入队列前推送消息到尽可能靠近接收者。
它有几种办法处理满溢队列(称为“高水位线”)。当队列填满时,ØMQ自动阻塞发送者,或丢弃消息,取决于你用的消息传递方式(所谓的“模式”)。
它让你的程序用任意传输方式来相互交谈:TCP、多播、进程内、进程间。更改传输方式时无需更改代码。
安全处理低速/阻塞的读者,使用的是取决于消息传递模式的不同策略。
它让你路由消息使用各种模式如请求-应答和发布-订阅。这些模式是你创建拓扑、网络结构的方式。
它让你用一个调用就能创建代理来做队列、转发、或捕获消息。代理可以降低网络的互联复杂度。
它使用简单的线上组帧,转发整个消息并精确重现其发送时的样子。如果你写入一个10K的消息,就能接收一个10K的消息。
它不在消息上强加任何格式。消息就是零到千兆大小的二进制大对象。想要描述数据时你可以在其上选择一些其它产品,例如谷歌的协议缓冲(protocol buffers)、外部数据表示法(XDR)、或其它。
它智能的处理网络错误。有时它会重试,有时它告知你一个操作失败了。
它减少你的碳排放。用更低的CPU消耗做更多事意味着你的机子使用了更少的能源,并且可以让你的旧机器使用的更久。阿尔·戈尔会很喜欢ØMQ。
事实上ØMQ做的比这更多。对于如何开发支持网络的程序方面具有颠覆性效果。表面上它是一个受到套接字启发的API,你通过它做zmq_msg_recv()和zmq_msg_send()。但消息处理很快变成了中心循环,而你的程序马上分解成一组消息处理任务。优雅而自然。并扩展着:每个任务映射到一个节点,节点们通过任意传输方式相互交谈。进程内的两个节点(节点是线程),机子上的两个节点(节点是进程),或网络上的两台机子(节点是机子)——都是一样的,程序代码没有变化。
3. 应用场景
ZeroMQ 是一个非常轻量级的消息系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常可以发现它。与RabbitMQ相比,ZeroMQ支持许多高级消 息场景,但是你必须实现ZeroMQ框架中的各个块(比如Socket或Device等)。
也可以很轻松应用到其他需要消息通讯的场景例如:应用ZeroMQ的Push-Pull模型实现联众游戏服务器的“热插拔”、负载均衡和消息派发。按照如图3部署服务器,Push端充当Gateway,作为一组游戏服务器集群最上层的一个Proxy,起负载均衡的作用,所有Gameserver作为Pull端。当一个请求到达Push端(Gateway)时,Push端根据一定的分配策略将任务派发到Pull端(Gameserver)。以联众某款游戏A为例,游戏A刚上线时,预计最大同时在线人数是10W,单台Gameserver并发处理能力为1W,需要10台Gameserver,由于游戏A可玩性非常好,半个月后最大同时在线人数暴增到50W,那么不需要在某天的凌晨将Gateway和Gameserver停机,只需要随时在机房新添加40台Gameserver,启动并连接到Gateway即可。
ZeroMQ中对Client和Server的启动顺序没有要求,Gameserver之间如果需要通信的话,Gameserver的应用层不需要管理这些细节,ZeroMQ已经做了重连处理。
因为没有Broker,ZeroMQ不太适合削峰填谷、消息堆积等场景
4. 总体架构图
zeromq几乎所有I/O操作都是异步的,每个zmq i/o 线程(与实际线程不同)都有与之绑定的Poller,Poller采用经典的Reactor模式实现,Poller根据不同操作系统平台使用不同的网络I/O模型(select、poll、epoll、devpoll、kequeue等)。在zeromq中,zmq_socket也被看成是一个zmq io线程。每个线程内含一个信箱,用于线程与线程间传递命令(后面会详细讲),在创建zmq io线程时,会把信箱句柄加到Poller中,用于监听是否有命令到达。当client端开始发起连接或者server端开始监听时,会在主线程创建zmq_connector或者zmq_listener,主线程使用zmq_socket的mailbox发送命令给io线程,将其绑定到io线程中,io线程会把zmq_connector或者zmq_listener含有的句柄加入Poller中,以侦听读写事件。Client端与Server端都是通过Session来管理连接和通信,一个session代表一次会话,每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。
图4.2 基本流程图
5. 线性模型
zeromq的线程分为一个主线程(用户线程),一个回收线程,以及若干io线程。每一条线程都拥有一个mailbox_t用于接收命令。
从操作系统来看,ZMQ中只有两种线程,应用线程和I/O线程。应用线程在ZMQ外部创建,访问ZMQ的API。I / O线程在ZMQ内部创建,用于在后台发送和接收消息。thread_t是系统级线程的抽象,可以以OS无关的方式创建线程。
而从ZMQ的观点来看,线程只是一个拥有邮箱(mailbox_t)的对象。邮箱存储发送给居住在当前线程上所有对象的信件(命令command_t),所有这些对象公用线程上的邮箱。线程从邮箱中按序获取命令并交给其上的对象进行处理。
目前ZMQ内部使用两种不同类型的线程(拥有邮箱的对象):I/O线程(io_thread_t)和socket(socket_base_t)。
I / O线程很容易理解,每个 I/O 线程与一个系统级线程一一对应。I / O线程运行在自己的系统线程上,并且拥有独立的获取命令的邮箱。
socket在某种程度上显得复杂一些。每个ZMQ socket拥有自己的接收命令的邮箱,因此socket可被ZMQ视为分离线程。而实际上,一个应用程序线程可以创建多个套接字,也就是说多个ZMQ socket被映射到同一个系统线程。更加复杂的是,ZMQ socket可以在系统线程之间迁移。例如,Java语言绑定可以在单线程中使用ZMQ socket,而当线程结束时,ZMQ socket会传递给垃圾回收线程,并在垃圾回收线程上销毁。
5.1 I/O线程
I / O线程(io_thread_t)是ZMQ异步处理网络IO的后台线程。它的实现非常简洁。io_thread_t实现继承object_t ,并实现 i_poll_events 接口,其内部包含一个邮箱(mailbox_t)和一个poller对象(poller_t)。
继承object_t使得io_thread_t能够发送和接收command(如 stop 命令,当收到该命令时,I / O线程将被终止)。
i_poll_events 接口定义了文件描述符和计时器事件就绪时的回调处理函数(in_event/out_event/timer_event)。io_thread_t 实现此接口(in_event)来处理来自mailbox的事件。当mailbox_t事件触发时,io线程从mailbox中获取命令,并让命令的接收者进行处理。
mailbox_t 用来存储发送给任何居住在io_thread_t 上的object_t 的命令,每个io_thread_t 上有多个对象,这些对象公用同一个邮箱,邮箱的收件人就是对象。mailbox_t本质是一个具有就绪通知功能的存储命令的队列。就绪通知机制由signaler_t提供的文件描述符实现。队列是由ypipe_t实现的无锁无溢出队列。
poller_t 是从不同操作系统提供的事件通知机制中抽象出来的概念,用来通知描述符和计时器事件,poller_t 通过typedef定义为操作系统首选的通知机制(select_t/poll_t/epoll_t 等)。所有运行在 io_thread_t上的对象都继承自辅助类 io_object_t,该类实现了向io_thread_t注册/删除文件描述符 (add_fd/rm_fd)和计时器(add_timer/cancel_timer)事件的功能,同时io_object_t 还继承了 i_poll_events 接口来实现事件回调功能。
图3 – io_thread_t示意图
5.2 回收线程
上一小节描述了与销毁相关的机制。而销毁任何一个指定的对象(包括socket)消耗的时间是不确定的。然而,我们希望close有类似于POSIX的行为:当关闭TCP套接字时,即使在后台还有没有完全发出的数据,调用也会立即返回。
所以,应用程序线程调用 close 时,ZMQ应该关闭对应的套接字,但是,我们不能依赖应用线程来完成socket子对象的销毁(销毁可能需要多次命令交互)。同时应用线程调用zmq_close以后不会在继续使用该socket,甚至可能永远不会再调用 ZMQ库函数。因此,ZMQ socket应该从应用线程迁移到一个工作线程来处理销毁的逻辑。一个可能的解决方案是将socket迁移到某个后台的I/O线程上去,然而ZMQ可以初始化为具有零个I / O线程(适用于只在进程间通信的情况),因此,我们需要一个专门的回收线程来执行销毁任务。
回收线程由类reaper_t实现。socket通过(send_reap)向回收线程发送回收命令,回收线程收到命令后会将socket从应用线程迁移到回收线程上,这样socket就可以在回收线程上处理命令(term/term_ack),直到socket的所有子对象都成功销毁时,socket就会在回收线程上销毁。实际上回收线程只是待回收对象驻留的线程,对象的处理逻辑仍然由对象自身处理。
6. 特性:
6.1 消息分配
对于小型的消息,拷贝操作比内存分配要经济的多。只要有需要,完全不分配新的内存块而直接把消息拷贝到预分配好的内存块上,这么做是有道理的。另一方面,对于大型的消息,拷贝操作比内存分配的开销又要昂贵的多。为消息体分配一次内存,然后传递指向分配块的指针,而不是拷贝整个数据。这种方式被称为“零拷贝”。
ØMQ以透明的方式同时处理这两种情况。一条ØMQ消息由一个不透明的句柄来表示。对于非常短小的消息,其内容被直接编码到句柄中。因此,对句柄的拷贝实际上就是对消息数据的拷贝。当遇到较大的消息时,它被分配到一个单独的缓冲区内,而句柄只包含一个指向缓冲区的指针。对句柄的拷贝并不会造成对消息数据的拷贝,当消息有数兆字节长时,这么处理是很有道理的(图6-1)。需要提醒的是,后一种情况里缓冲区是按引用计数的,因此可以做到被多个句柄引用而不必拷贝数据。
图6-1– 针对不同消息不同处理方式
6.2 批量处理
前面已经提到过,在消息通信系统中,系统调用的数量太多的话会导致出现性能瓶颈。实际上,这个问题绝非一般。当需要遍历调用栈时会有不小的性能损失,因此,明智的做法是,当创建高性能的应用时应该尽可能多的去避免遍历调用栈。
参见图6.2,为了发送4条消息,你不得不遍历整个网络协议栈4次(也就是,ØMQ、glibc、用户/内核空间边界、TCP实现、IP实现、以太网链路层、网卡本身,然后反过来再来一次)。
图6.2 发送4条消息
但是,如果你决定将这些消息集合到一起成为一个单独的批次,那么就只需要遍历一次调用栈了(图6.2.2)。这种处理方式对消息吞吐量的影响是巨大的:可大至2个数量级,尤其是如果消息都比较短小,数百个这样的短消息才能包装成一个批次。
图6.2.2 批量处理消息
另一方面,批量处理会对时延带来负面影响。我们来分析一下,比如,TCP实现中著名的Nagle算法。它为待发出的消息延迟一定的时间,然后将所有的数据合并成一个单独的数据包。显然,数据包中的第一条消息,其端到端的时延要比最后一条消息严重的多。因此,如果应用程序需要持续的低时延的话,常见做法是将Nagle算法关闭。更常见的是取消整个调用栈层次上的批量处理(比如,网卡的中断汇聚功能)。
但同样,不做批量处理就意味着需要大量穿越整个调用栈,这会导致消息吞吐量降低。似乎我们被困在吞吐量和时延的两难境地中了。
ØMQ尝试采用以下策略来提供一致性的低时延和高吞吐量。当消息流比较稀疏,不超过网络协议栈的带宽时,ØMQ关闭所有的批量处理以改善时延。这里的权衡是CPU的使用率会变得略高——我们仍然需要经常穿越整个调用栈。但是在大多数情况下,这并不是个问题。
当消息的速率超过网络协议栈的带宽时,消息就必须进行排队处理了——保存在内存中直到协议栈准备好接收它们。排队处理就意味着时延的上升。如果消息在队列中要花费1秒时间,端到端的时延就至少会达到1秒。更糟糕的是,随着队列长度的增长,时延会显著提升。如果队列的长度没有限制的话,时延就会超过任何限定值。
据观察,即使调整网络协议栈以追求最低的时延(关闭Nagle算法,关闭网卡中断汇聚功能,等等),由于受前文所述的队列的影响,时延仍然会比较高。在这种情况下,积极的采取批量化处理是有意义的。反正时延已经比较高了,也没什么好顾虑的了。另一方面,积极的采用批量处理能够提高吞吐量,而且可以清空队列中等待的消息——这反过来又意味着时延将逐步降低,因为正是排队才造成了时延的上升。一旦队列中没有未发送的消息了,就可以关闭批量处理,进一步的改善时延。
我们观察到批量处理只应该在最高层进行,这是需要额外注意的一点。如果消息在最高层汇聚为批次,在低层次上就没什么可做批量处理的了,而且所有低层次的批量处理算法除了会增加总体时延外什么都没做。 我们从中学到了:在一个异步系统中,要获得最佳的吞吐量和响应时间,需要在调用栈的底层关闭批量处理算法,而在高层开启。仅在新数据到达的速率快于它们被处理的速率时才做批量处理。
6.3 并发模型
ØMQ需要充分利用多核的优势,换句话说就是随着CPU核心数的增长能够线性的扩展吞吐量。以我们之前对消息通信系统的经验表明,采用经典的多线程方式(临界区、信号量等等)并不会使性能得到较大提升。事实上,就算是在多核环境下,一个多线程版的消息通信系统可能会比一个单线程的版本还要慢。有太多时间都花在等待其他线程上了,同时,引入了大量的上下文切换拖慢了整个系统。
针对这些问题,我们决定采用一种不同的模型。目标是完全避免锁机制,并让每个线程能够全速运行。线程间的通信是通过在线程间传递异步消息(事件)来实现的。内行人都应该知道,这就是经典的actor模式。
我们的想法是在每一个CPU核心上运行一个工作者线程——让两个线程共享同一个核心只会意味着大量的上下文切换而没有得到任何别的优势。每一个ØMQ的内部对象,比如说TCP engine,将会紧密地关联到一个特定的工作者线程上。反过来,这意味着我们不再需要临界区、互斥锁、信号量等等这些东西了。此外,这些ØMQ对象不会在CPU核之间迁移,从而可以避免由于缓存被污染而引起性能上的下降。
图7 多个工作者线程
这个设计让很多传统多线程编程中出现的顽疾都消失了。然而,我们还需要在许多对象间共享工作者线程,这反过来又意味着必须要有某种多任务间的合作机制。这表示我们需要一个调度器,对象必须是事件驱动的,而不是在整个事件循环中来控制。我们必须考虑任意序列的事件,甚至非常罕见的情况也要考虑到。我们必须确保不会有哪个对象持有CPU的时间过长等等。
简单来说,整个系统必须是全异步的。任何对象都无法承受阻塞式的操作,因为这不仅会阻塞其自身,而且所有共享同一个工作者线程的其他对象也都会被阻塞。所有的对象都必须或显式或隐式的成为一种状态机。随着有数百或数千的状态机在并行运转着,你必须处理这些状态机之间的所有可能发生的交互,而其中最重要的就是——关闭过程。
事实证明,要以一种清晰的方式关闭一个全异步的系统是一个相当复杂的任务。试图关闭一个有着上千个运转着的部分的系统,其中有的正在工作中,有的处于空闲状态,有的正在初始化过程中,有的已经自行关闭了,此时极易出现各种竞态条件、资源泄露等诸如此类的情况。ØMQ中最为复杂的部分肯定就是这个关闭子系统了。快速检查一下bug跟踪系统的记录显示,约30%到50%的bug都同关闭有某种联系。
我们从中学到的是:当要追求极端的性能和可扩展性时,考虑采用actor模型。在这种情况下这几乎是你唯一的选择。不过,如果不使用像Erlang或者ØMQ这种专门的系统,你将不得不手工编写并调试大量的基础组件。此外,从一开始就要好好思考关于系统关闭的步骤。这将是代码中最为复杂的部分,而如果你没有清晰的思路该如何实现它,你可能应该重新考虑在一开始就使用actor模型。
6.4 无锁队列
最近比较流行使用无锁算法。它们是用于线程间通信的一种简单机制,同时并不会依赖于操作系统内核提供的同步原语,如互斥锁和信号量。相反,它们通过使用CPU原子操作来实现同步,比如原子化的CAS指令(比较并交换)。我们应该理解清楚的是它们并不是字面意义上的无锁——相反,锁机制是在硬件层面实现的。
ØMQ在pipe对象中采用无锁队列来在用户线程和ØMQ的工作者线程之间传递消息。关于ØMQ是如何使用无锁队列的,这里有两个有趣的地方。
首先,每个队列只有一个写线程,也只有一个读线程。如果有1对多的通信需求,那么就创建多个队列(图8)。鉴于采用这种方式时队列不需要考虑对写线程和读线程的同步(只有一个写线程,也只有一个读线程),因此可以以非常高效的方式来实现。
图8 队列
其次,尽管我们意识到无锁算法要比传统的基于互斥锁的算法更加高效,CPU的原子操作开销仍然非常高昂(尤其是当CPU核心之间有竞争时),对每条消息的读或者写都采用原子操作的话,效率将低于我们所能接受的水平。
提高速度的方法——再次采用批量处理。假设你有10条消息要写入到队列。比如,可能会出现当你收到一个网络数据包时里面包含有10条小型的消息的情况。由于接收数据包是一个原子事件,你不能只接收一半,因此这个原子事件导致需要写10条消息到无锁队列中。那么对每条消息都采用一次原子操作就显得没什么道理了。相反,你可以让写线程拥有一块自己独占的“预写”区域,让它先把消息都写到这里,然后再用一次单独的原子操作,整体刷入队列。
同样的方法也适用于从队列中读取消息。假设上面提到的10条消息已经刷新到队列中了。读线程可以对每条消息采用一个原子操作来读取,但是,这种做法过于重量级了。相反,读线程可以将所有待读取的消息用一个单独的原子操作移动到队列的“预读取”部分。之后就可以从“预读”缓存中一条一条的读取消息了。“预读取”部分只能由读线程单独访问,因此这里没有什么所谓的同步需求。
图9中左边的箭头展示了如何通过简单地修改一个指针来将预写入缓存刷新到队列中的。右边的箭头展示了队列的整个内容是如何通过修改另一个指针来移动到预读缓存中的。
图9 无锁队列
我们从中学到的是:发明新的无锁算法是很困难的,而且实现起来很麻烦,几乎不可能对其调试。如果可能的话,可以使用现有的成熟算法而不是自己来发明轮子。当需要追求极度的性能时,不要只依靠无锁算法。虽然它们的速度很快,但可以在其之上通过智能化的批量处理来显著提高性能
6.5 多种通讯模式
支持多种的socket类型,通过这些socket类型可以组合成多种通讯模式,其中的socket类型有:
Socket类型 | 说明 |
REQ | 请求类型socket,只允许send/recv交替使用 |
REP | 响应类型socket, 只允许recv/send交替使用 |
PUB | 发布类型socket,只能发布消息无法接收消息 |
SUB | 订阅类型socket,只能订阅消息无法发送消息 |
DEALER | 扩展请求/响应模式的高级模式。每个消息按照round-robin方式发送,按照fair-queue方式接收 |
ROUTER | 扩展请求/响应模式的高级模式。按照fair-queue方式接收数据,根据消息目的地进行路由。 |
PUSH | 推送类型socket,单向发送消息 |
PULL | 拉取类型socket,单向消费消息,可以支持并发消费。 |
6.5.1 Request-reply 模式
Request-reply 是 ZeroMQ 提供的最常用的消息传递模式之一。在这种模式下,客户端进程发起请求,服务器端进程接受请求并返回响应给客户端。客户端和服务器端进程都可以有多个。
图10: 请求响应模式
清单 1 和清单 2 实现了一个简单的“请求-应答”应用的服务器端和客户端。在 HelloWorldServer.py 中,我们首先创建了一个 socket 对象,将它绑定到一个特定的地址。一旦接受到客户端的请求,就发送内容为”World”的回复。
从表面上看这种风格与传统的 socket 十分相似,但实际上它们有重大的差别。首先,ZeroMQ 的 socket 是面向消息的,我们从 socket 里直接获得消息字符串,而非字节流,发送亦然。其次,开发者无需关心负责底层通讯的连接的管理,这种连接可能是传统的 socket 连接,也可能基于其他协议。这些底层连接的创建,销毁,重连以及它如何确保消息被有效的发送,都由 ZeroMQ 负责管理。最后,ZeroMQ 的 socket 之间的连接不受任何限制,而传统的 socket 之间往往无法建立多对多的连接。因此,ZeroMQ 的 socket 可以被看作一个功能完善的消息队列。
REQ 类型的 socket 通常被用来发送请求,并且只有在收到第一个请求的回复之后,才能发送第二个请求。在 HelloWorldClient.py 中该 REQ 类型的 socket 只连接到了一个地址,但它也可以连接多个地址。在这种情况下,ZeroMQ 将确保消息被均匀的发送给每个地址,但每次只有一个地址会受到请求。REP 类型的 socket 用于接受请求。它必须在发送第一个请求的回复之后才能接受第二个请求。尚未来得及处理的请求按顺序被置于队列中。
REQ 和 REP 类型的 socket 在消息发送和接受的操作序列上存在严格限制,为了应对更复杂的情况,ZeroMQ 也提供了更为灵活的 socket 类型,这就是 DEALER 和 ROUTER。
DEALER 和 REQ 的区别在于,它可以按照任意的次序执行发送消息和接受消息的操作,而不必等待上一个请求的回复。同样,ROUTER 也不必等待发送上一次请求的响应完成就能接受第二个请求。此外,ROUTER 会为请求加上标识以记录最初请求者的身份。这样一来它可以将该请求发送给其他进程处理,得到返回结果后,仍可以根据消息中的身份标识将该请求准确的返回给最初请求者。因此 ROUTER 和 DEALER 可以被用来实现类似于传统消息队列架构中的消息服务器的进程。
6.5.2 Publish-subscribe 模式
Publish-subscribe 是用于广播消息的模式,在这种模式下发布的消息将同时发送给多个节点。它包含 PUB 和 SUB 两种 socket 类型。与 Request-reply 不同,PUB 和 SUB 都只能进行单向的消息传递。PUB 只能发送消息,而 SUB 只能接受消息。
图11: 发布订阅模式
清单 3,清单 4 是一个简单的 Publish-subscribe 模式的实现。从中我们可以看到,作为消息订阅者的 syncsub.py,将一个 SUB 类型 socket 绑定到‘tcp://localhost:5561’,这代表了一个单一的地址。而作为消息发布者的 syncpub.py,将一个 PUB 类型的 socket 绑定到‘tcp://*:5561’,这实际上匹配了多个地址。也就是说,凡是绑定到符合格式‘tcp://*:5561’地址的任何 SUB 类型的 socket 都可以接收到该 PUB 进程发布的消息。
在这一实现中,我们还使用 REQ 和 REP 类型的 socket 对 SUB 和 PUB 进程进行了同步,仅当出现 10 个 SUB 进程时,PUB 进程才会开始发送消息。
6.5.3 Pipeline 模式
Pipeline 模式通常用于实现工作流的概念,每个进程负责整个处理流程中的一个步骤。每个步骤接受上一步的处理结果,并将自己的处理结果传递给下一步。每一步可以有多个备选进程。它包含 PUSH 和 PULL 两种 socket 类型。与 PUB 和 SUB 类型的 socket 类似,这两种 socket 都只能做单向消息传递,PUSH 只能发送消息,PULL 只能接受消息。因此通常一个进程需要同时包含这两种类型的 socket。此外,与 PUB 不同的是,PUSH 只会将消息发送给单个 PULL 节点。如下图:
图12: 并行PipeLine模式
6.5.4 Exclusive pair 模式
这种模式仅适用于两个特定节点之间互相传递消息的场合。不要和正常的socket对混淆。
7. 性能分析
目前,市面上类似的产品不少,主要有4种:MSMQ(微软产品)、ActiveMQ(Java)、RabbitMQ(Erlang)、ZeroMQ(C++)。除ZeroMQ外,其它3款产品都是一个单独服务或者进程,需要单独安装和运行,且对环境有一定依赖。其中,MSMQ在非Windows平台下安装非常复杂,ActiveMQ需要目标机器上已经安装了Java,RabbitMQ需要Erlang环境。而ZeroMQ是以库的形式存在,由应用程序加载、运行即可。但是ZeroMQ仅提供非持久性的消息队列。
图7是来自于Internet的性能测试数据。显示的是每秒钟发送和接受的消息数。整个过程共产生1百万条1K的消息,测试环境为Windows Vista。从测试数据可以看出,ZeroMQ的性能远远高于其它3个MQ。
但是测试数据仅供参考,因为缺少必须的环境参数和性能指标,比如:CPU参数、内存参数、消息模型、通信协议、极限时消耗CPU百分比、极限时消耗内存百分比等。
8. 横向对比
ActiveMQ | RabbitMQ | RocketMq | ZeroMQ | Kafka | |
关注度 | 高 | 高 | 中 | 中 | 高 |
成熟度 | 成熟 | 成熟 | 比较成熟 | 成熟 | 比较成熟 |
所属社区/公司 | Apache | Mozilla | Alibaba | iMatix |
|
社区活跃度 | 高 | 高 | 中 | 中 | 高 |
文档 | 多 | 多 | 中 | 中 | 多 |
特点 | 功能齐全,被大量开源项目使用 | 由于Erlang 语言的并发能力,性能很好 | 各个环节分布式扩展设计,主从 HA;支持上万个队列;多种消费模式;性能很好 | 低延时,高性能,最高 43万条消息每秒 | 1、快速持久化,O(1)系统开销。 2、高吞吐,高性能。 3、完全的分布式系统。
|
授权方式 | 开源 | 开源 | 开源 | IGPL | Apache License |
开发语言 | Java | Erlang | Java | C++ | Scala |
支持的协议 | OpenWire、 | AMQP | 自己定义的一 | TCP、UDP、IPC、广播 |
TCP |
客户端支持语言 | Java、C、 | Java、C、 | Java | python、 java、 php、.net 等 |
Java、Scala |
持久化 | 内存、文件、数据库 | 内存、文件 | 磁盘文件 | 在消息发送端保存 | 磁盘文件 |
事务 | 支持 | 不支持 | 支持 | 不支持 | 不支持 |
集群 | 支持 | 支持 | 支持 | 不支持 | 支持 |
负载均衡 | 支持 | 支持 | 支持 | 不支持 | 不支持 |
9. 总结
1) 简单
1、仅仅提供24个API接口,风格类似于BSD Socket。
2、处理了网络异常,包括连接异常中断、重连等。
3、改变TCP基于字节流收发数据的方式,处理了粘包、半包等问题,以msg为单位收发数据,结合Protocol Buffers,可以对应用层彻底屏蔽网络通信层。
4、对大数据通过SENDMORE/RECVMORE提供分包收发机制。
5、通过线程间数据流动来保证同一时刻任何数据都只会被一个线程持有,以此实现多线程的“去锁化”。
6、通过高水位HWM来控制流量,用交换SWAP来转储内存数据,弥补HWM丢失数据的缺陷。
7、服务器端和客户端的启动没有先后顺序。
2) 灵活
1、支持多种通信协议,可以灵活地适应多种通信环境,包括进程内、进程间、机器间、广播。
2、支持多种消息模型,消息模型之间可以相互组合,形成特定的解决方案。
3) 跨平台
支持Linux、Windows、OS X等。
4) 多语言
可以绑定C、C++、Java、.NET、Python等30多种开发语言。
5) 高性能
相对同类产品,性能卓越。
10. 参考资料
ZeroMQ官网:
http://zguide.zeromq.org/page:all
ZeroMQ:
http://blog.codingnow.com/2011/02/zeromq_message_patterns.html
http://www.cnblogs.com/zengzy/archive/2016/01/13/5122634.html
From Kafka to ZeroMQ for real-time log aggregation:
http://www.tuicool.com/articles/aiuIja6
- 点赞
- 收藏
- 关注作者
评论(0)