ZMQ
一、我们为什么需要ZMQ
目前的应用程序很多都会包含跨网络的组件,无论是局域网还是因特网。这些程序的开发者都会用到某种消息通信机制。有些人会使用某种消息队列产品,而大多数人则会自己手工来做这些事,使用TCP或UDP协议。这些协议使用起来并不困难,但是,简单地将消息从A发给B,和在任何情况下都能进行可靠的消息传输,这两种情况显然是不同的。
首先看看在使用纯TCP协议进行消息传输时会遇到的一些典型问题。任何可复用的消息传输层肯定或多或少地会要解决以下问题:
ZMQ就是这样一种软件:它高效,提供了嵌入式的类库,使应用程序能够很好地在网络中扩展,成本低廉。
ZMQ的主要特点有:
其实ZMQ可以做的还不止这些,它会颠覆人们编写网络应用程序的模式。虽然从表面上看,它不过是提供了一套处理套接字的API,能够用zmq_recv()和zmq_send()进行消息的收发,但是,消息处理将成为应用程序的核心部分,很快你的程序就会变成一个个消息处理模块,这既美观又自然。它的扩展性还很强,每项任务由一个节点(节点是一个线程)、同一台机器上的两个节点(节点是一个进程)、同一网络上的两台机器(节点是一台机器)来处理,而不需要改动应用程序。
二、ZeroMQ 背景介绍
官方:“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”
与其他消息中间件相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。
三、ZMQ是什么
阅读了ZMQ的Guide文档后,这是个类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。
四、三种模型
4.1 应答模式
使用REQ-REP套接字发送和接受消息是需要遵循一定规律的。客户端首先使用zmq_send()发送消息,再用zmq_recv()接收,如此循环。如果打乱了这个顺序(如连续发送两次)则会报错。类似地,服务端必须先进行接收,后进行发送。
4.2 订阅发布模式
PUB-SUB套接字组合是异步的。客户端在一个循环体中使用recv ()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用send ()发送消息,但不能在PUB套接字上使用recv ()。
关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。解决此问题需要在PUB端加入sleep。
4.3 基于分布式处理(管道模式)
下面贴出PUB_SUB(应答模式)模式下的代码:
发布端:
package cn.edu.ujn.pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Pubsub envelope publisher
*/
public class psenvpub {
public static void main (String[] args) throws Exception {
// Prepare our context and publisher
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind(“tcp://*:5563”);
while (!Thread.currentThread ().isInterrupted ()) {
// Write two messages, each with an envelope and content
publisher.sendMore (“A”);
publisher.send (“We don’t want to see this”);
publisher.sendMore (“B”);
publisher.send(“We would like to see this”);
}
publisher.close ();
context.term ();
}
}
发布端需要通过context.socket(ZMQ.PUB)表示为发布端,通过bind方法来创建发布端连接,等待订阅者连接。
之后通过send方法将数据发送到出去。
之后来写订阅端代码
package cn.edu.ujn.pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Pubsub envelope subscriber
*/
public class psenvsub {
public static void main (String[] args) {
// Prepare our context and subscriber
Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect(“tcp://localhost:5563”);
subscriber.subscribe(“B”.getBytes());
while (!Thread.currentThread ().isInterrupted ()) {
// Read envelope with address
String address = subscriber.recvStr ();
// Read message contents
String contents = subscriber.recvStr ();
System.out.println(address + " : " + contents);
}
subscriber.close ();
context.term ();
}
}
客户端通过connect进行连接,之后通过recv来进行数据接收。
下面贴出REQ_REP(订阅发布模式)模式下的代码:
发送端:
package cn.edu.ujn.req_rep;
//
// Hello World server in Java
// Binds REP socket to tcp://*:5555
// Expects “Hello” from client, replies with “World”
//
import org.zeromq.ZMQ;
public class hwserver {
private static int i = 0;
public static void main(String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to clients
ZMQ.Socket responder = context.socket(ZMQ.REP);
responder.bind(“tcp://*:5555”);
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from the client
byte[] request = responder.recv(0);
System.out.println("Received " + new String(request) + i++);
// Do some ‘work’
Thread.sleep(1000);
// Send reply back to client
String reply = “World”;
responder.send(reply.getBytes(), 0);
}
responder.close();
context.term();
}
}
接收端:
package cn.edu.ujn.req_rep;
// Hello World client in Java
// Connects REQ socket to tcp://localhost:5555
// Sends “Hello” to server, expects “World” back
import org.zeromq.ZMQ;
public class hwclient {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println(“Connecting to hello world server…”);
ZMQ.Socket requester = context.socket(ZMQ.REQ);
requester.connect(“tcp://localhost:5555”);
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = “Hello”;
System.out.println("Sending Hello " + requestNbr);
requester.send(request.getBytes(), 0);
byte[] reply = requester.recv(0);
System.out.println("Received " + new String(reply) + " " + requestNbr);
}
requester.close();
context.term();
}
}
下面贴出Para_Pipe(基于分布式处理(管道模式))模式下的代码:
发送端:
package cn.edu.ujn.para_pipe;
import java.util.Random;
import org.zeromq.ZMQ;
// Task ventilator in Java
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
public class taskvent {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to send messages on
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind(“tcp://*:5557”);
// Socket to send messages on
ZMQ.Socket sink = context.socket(ZMQ.PUSH);
sink.connect(“tcp://localhost:5558”);
System.out.println("Press Enter when the workers are ready: ");
System.in.read();
System.out.println(“Sending tasks to workers\n”);
// The first message is “0” and signals start of batch
sink.send(“0”, 0);
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = srandom.nextInt(100) + 1;
total_msec += workload;
System.out.print(workload + “.”);
String string = String.format("%d", workload);
sender.send(string, 0);
}
System.out.println(“Total expected cost: " + total_msec + " msec”);
Thread.sleep(1000); // Give 0MQ time to deliver
sink.close();
sender.close();
context.term();
}
}
中介端:
package cn.edu.ujn.para_pipe;
import org.zeromq.ZMQ;
// Task worker in Java
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
public class taskwork {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to receive messages on
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.connect(“tcp://localhost:5557”);
// Socket to send messages to
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.connect(“tcp://localhost:5558”);
// Process tasks forever
while (!Thread.currentThread ().isInterrupted ()) {
String string = new String(receiver.recv(0)).trim();
long msec = Long.parseLong(string);
// Simple progress indicator for the viewer
System.out.flush();
System.out.print(string + ‘.’);
// Do the work
Thread.sleep(msec);
// Send results to sink
sender.send("".getBytes(), 0);
}
sender.close();
receiver.close();
context.term();
}
}
接收端:
package cn.edu.ujn.para_pipe;
import org.zeromq.ZMQ;
// Task sink in Java
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
public class tasksink {
public static void main (String[] args) throws Exception {
// Prepare our context and socket
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.bind(“tcp://*:5558”);
// Wait for start of batch
String string = new String(receiver.recv(0));
// Start our clock now
long tstart = System.currentTimeMillis();
// Process 100 confirmations
int task_nbr;
int total_msec = 0; // Total calculated cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
string = new String(receiver.recv(0)).trim();
if ((task_nbr / 10) * 10 == task_nbr) {
System.out.print(":");
} else {
System.out.print(".");
}
}
// Calculate and report duration of batch
long tend = System.currentTimeMillis();
System.out.println("\nTotal elapsed time: " + (tend - tstart) + " msec");
receiver.close();
context.term();
}
}
五、注意事项
5.1 正确地使用上下文
ZMQ应用程序的一开始总是会先创建一个上下文,并用它来创建套接字。在C语言中,创建上下文的函数是zmq_init()。一个进程中只应该创建一个上下文。从技术的角度来说,上下文是一个容器,包含了该进程下所有的套接字,并为inproc协议提供实现,用以高速连接进程内不同的线程。
如果一个进程中创建了两个上下文,那就相当于启动了两个ZMQ实例。如果这正是你需要的,那没有问题,但一般情况下:在一个进程中使用zmq_init()函数创建一个上下文,并在结束时使用zmq_term()函数关闭它。
如果你使用了fork()系统调用,那每个进程需要自己的上下文对象。如果在调用fork()之前调用了zmq_init()函数,那每个子进程都会有自己的上下文对象。通常情况下,你会需要在子进程中做些有趣的事,而让父进程来管理它们。
5.2 正确地退出和清理
程序员的一个良好习惯是:总是在结束时进行清理工作。当你使用像Python那样的语言编写ZMQ应用程序时,系统会自动帮你完成清理。但如果使用的是C语言,那就需要小心地处理了,否则可能发生内存泄露、应用程序不稳定等问题。
内存泄露只是问题之一,其实ZMQ是很在意程序的退出方式的。个中原因比较复杂,但简单的来说,如果仍有套接字处于打开状态,调用zmq_term()时会导致程序挂起;就算关闭了所有的套接字,如果仍有消息处于待发送状态,zmq_term()也会造成程序的等待。只有当套接字的LINGER选项设为0时才能避免。
我们需要关注的ZMQ对象包括:消息、套接字、上下文。好在内容并不多,至少在一般的应用程序中是这样:
5.3 你的想法可能会被颠覆
传统网络编程的一个规则是套接字只能和一个节点建立连接。虽然也有广播的协议,但毕竟是第三方的。当我们认定“一个套接字 = 一个连接”的时候,我们会用一些特定的方式来扩展应用程序架构:我们为每一块逻辑创建线程,该线程独立地维护一个套接字。
但在ZMQ的世界里,套接字是智能的、多线程的,能够自动地维护一组完整的连接。你无法看到它们,甚至不能直接操纵这些连接。当你进行消息的收发、轮询等操作时,只能和ZMQ套接字打交道,而不是连接本身。所以说,ZMQ世界里的连接是私有的,不对外部开放,这也是ZMQ易于扩展的原因之一。
由于你的代码只会和某个套接字进行通信,这样就可以处理任意多个连接,使用任意一种网络协议。而ZMQ的消息模式又可以进行更为廉价和便捷的扩展。
这样一来,传统的思维就无法在ZMQ的世界里应用了。在你阅读示例程序代码的时候,也许你脑子里会想方设法地将这些代码和传统的网络编程相关联:当你读到“套接字”的时候,会认为它就表示与另一个节点的连接——这种想法是错误的;当你读到“线程”时,会认为它是与另一个节点的连接——这也是错误的。
如果你是第一次阅读本指南,使用ZMQ进行了一两天的开发(或者更长),可能会觉得疑惑,ZMQ怎么会让事情便得如此简单。你再次尝试用以往的思维去理解ZMQ,但又无功而返。最后,你会被ZMQ的理念所折服,拨云见雾,开始享受ZMQ带来的乐趣。
- 点赞
- 收藏
- 关注作者
评论(0)