RabbitMQ 第7章 RabbitMQ RPC
(使用Java客户端)
一、概述
在Work Queue的章节中我们学习了如何使用Work Queue分配耗时的任务给多个工作者,但是如果我们需要运行一个函数在远程计算机上,这是一个完全不同的情景,这种模式通常被称之为RPC。
在本章节的学习中,我们将使用RabbitMQ来构建一个RPC系统:一个远程客户端和一个可扩展的RPC服务器,我们没有任何费时的任务进行分配,我们将创建一个虚拟的RPC服务返回Fibonacci数。
1.1、客户端接口(Client Interface)
为了说明一个RPC服务可以使用,我们将创建一个简单的客户端类,这将通过方法名的调用发送一个RPC请求和接收块得到答复:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
注意:
尽管RPC是计算机领域中非常普遍的模式,它经常受到批评,当程序不知道是否这是一个缓慢的RPC调用函数,像在不可预知接口的系统进行调试,增加了不必要的复杂性,而不是简化软件,滥用会导致不可修复的代码,如果要使用它记住考虑以下建议:
1、能明确区分被调用的函数是局部的还是远程的。
2、您的文件系统、组件之间的依赖关系是很清晰的。
3、处理问题?客户应该知道当RPC服务器挂掉的时候该如何做。
1.2、回调队列(Callback Queue)
总的说来使用RabbitMQ来实现RPC是比较简单的,当客户端发送请求消息和服务器响应消息的答复,为了接收到响应我们需要发送一个callback队列地址在请求中,我们可以使用默认的队列,让我们试试:
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
说明:
AMQP协议对一个消息预设了一个14个属性的集合,大部分属性很少被使用,有以下例外:
1、deliveryMode:标志一个消息的持久化(值为2)或者状态(其他任何值)。
2、contentType:用于描述编码的MIME类型,比如,要经常使用JSON编码大的一个好的设置方法为application/json
3、replyTo:常用的回调队列名称。
4、correlationId:被用于一个RPC相应请求相关。
同时我们需要一个新的类:
import com.rabbitmq.client.AMQP.BasicProperties;
1.3、相关ID (correlation Id)
在上述方法我们为每个RPC请求创建一个回调队列,那是很低效的但是幸运的是有一个更好的方式,让我们创建一个单一回调队列供每个客户端调用。
这出现了一个新的问题,在队列中接收到一个不清楚这个请求属于哪个响应时的响应,我们要将它设置为每个请求的一个特有的值,然后从一个回调队列中接收一个消息时就要查看这个属性值,在此基础上,我们将能匹配一个请求的响应,如果我们看到一个未知的correlationId值,我们可以安全地将这些消息丢弃因为它不属于我们的要求。
你也许会问,我们为什么要丢弃回调队列中未知的消息呢?而不是一个错误引起的失败呢?这是由于一个可能在服务器的竞争引起的,虽然不太可能,但是它还是有可能发生的,RPC服务器在给我们大答复之后将挂掉,但是发送确认消息的请求,如果这种情况发生,将再次重启RPC服务器处理请求,这就是为什么在客户端必须处理重复的响应。
二、实现
2.1、结构如下图所示:
从上图可知,我们RPC工作流程如下:
1、当客户端启动时,它创建一个匿名的独立的回调队列。
2、一个RPC请求中,客户端发送一个消息具有两个特性:replyTo它包含将要到达的回调队列和correlation_id,这是每个请求的一个固有的值。
3、请求发送到一个rpc_queue队列。
4、RPC服务器正在等待队列的请求,当一个请求到达时,它的工作久是发送一个消息结果返回给客户端,使用了replyTo队列。
5、客户端等待回调replyTo队里的数据,当消息出现时,它检查correlationId值是否和请求返回给应用程序响应的值匹配。
2.2、代码实现
Fibonacci 函数:
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
我们声明的Fibonacci函数,它假定只有有效的整整输入(别指望一个大的数字,它可能是最慢的递归实现),我们RPC服务器RPCServer.java代码如下:
private static final String RPC_QUEUE_NAME = "rpc_queue";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
说明:
1、像之前所有实例一样,开始建立连接、通道和队列
2、可能要运行多个服务器进程,为了传播同样的负载在多个服务器上,我们需要通过channel.basicQos来设置prefetchcount值。
3、通过basicConsume访问队列,然后进入循环,等待请求消息、处理消息和发送响应。
RPCClient.java代码如下:
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
说明:
1、建立一个连接通道和声明一个专属的回调队列的回复。
2、订阅回调队列,这样可以接受RPC响应
3、调用方法发起实际的RPC请求。
4、生成一个唯一的correlationId并且保存它,while循环将使用这个值来匹配相对应的响应。
5、发送请求消息,它有两个属性值relpTo和correlationId。
6、等待相匹配的响应。
7、while循环做简单的工作,为每个响应检查是否correlationId就是我们需要的,如果是,保存该响应。
8、返回响应给客户端。
客户端请求代码:
RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();
2.3、完整的代码清单
RPCClient.java
package com.xuz.rpc;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
connection = factory.newConnection();
channel = connection.createChannel();
//响应队列名,服务端会把返回的信息发送到这个队列中。
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
//每个请求生成一个唯一的correlationId
String corrId = UUID.randomUUID().toString();
//设置请求响应基本参数:correlationId(UUID)和rpc_queue
BasicProperties props = new BasicProperties.Builder().correlationId(
corrId).replyTo(replyQueueName).build();
System.out.println("客户端响应队列的属性:["+props.getCorrelationId()+","
+props.getReplyTo()+"]");
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//如果获得响应队列中的getCorrelationId和当前corrId相等,则保存响应并返回
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
/**
* 关闭连接
* @throws Exception
*/
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient rpcClient = null;
String response = null;
try {
rpcClient = new RPCClient();
//调用Call方法传入请求消息:测试RPC
response = rpcClient.call("测试RPC");
System.out.println(" 响应消息:[" + response + "]");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (rpcClient != null) {
try {
rpcClient.close();
} catch (Exception ignore) {
}
}
}
}
}
RPCServer.java:
package com.xuz.rpc;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
/**
* 定义函数
* @param n 输入的正整数
* @return
*/
private static int fib(int n) {
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
//获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设定主机
factory.setHost("127.0.0.1");
//创建连接
connection = factory.newConnection();
//创建通道
channel = connection.createChannel();
//声明RPC队列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
//设置公平调度
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println("[等待RPC远程请求!]");
while (true) {
String response = null;
System.out.println("[服务端等待接收消息!]");
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("[服务端成功接收消息!]");
BasicProperties props = delivery.getProperties();
//从响应队列获取reply参数
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId()).build();
System.out.println("服务端响应队列的属性:["+replyProps.getCorrelationId()+"]");
try {
String message = new String(delivery.getBody(), "UTF-8");
response = "服务端已经处理了消息:[" + message+"]";
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
//将结果返回给客户端
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes("UTF-8"));
//设置确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
- 点赞
- 收藏
- 关注作者
评论(0)