RabbitMQ 第7章 RabbitMQ RPC

举报
许小强 发表于 2022/03/17 18:45:14 2022/03/17
【摘要】 RabbitMQ RPC

(使用Java客户端)

一、概述

在Work Queue的章节中我们学习了如何使用Work Queue分配耗时的任务给多个工作者,但是如果我们需要运行一个函数在远程计算机上,这是一个完全不同的情景,这种模式通常被称之为RPC。

在本章节的学习中,我们将使用RabbitMQ来构建一个RPC系统:一个远程客户端和一个可扩展的RPC服务器,我们没有任何费时的任务进行分配,我们将创建一个虚拟的RPC服务返回Fibonacci数。

1.1、客户端接口(Client Interface)

为了说明一个RPC服务可以使用,我们将创建一个简单的客户端类,这将通过方法名的调用发送一个RPC请求和接收块得到答复:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

 注意:
尽管RPC是计算机领域中非常普遍的模式,它经常受到批评,当程序不知道是否这是一个缓慢的RPC调用函数,像在不可预知接口的系统进行调试,增加了不必要的复杂性,而不是简化软件,滥用会导致不可修复的代码,如果要使用它记住考虑以下建议:

1、能明确区分被调用的函数是局部的还是远程的。

2、您的文件系统、组件之间的依赖关系是很清晰的。

3、处理问题?客户应该知道当RPC服务器挂掉的时候该如何做。

1.2、回调队列(Callback Queue)

总的说来使用RabbitMQ来实现RPC是比较简单的,当客户端发送请求消息和服务器响应消息的答复,为了接收到响应我们需要发送一个callback队列地址在请求中,我们可以使用默认的队列,让我们试试:

callbackQueueName = channel.queueDeclare().getQueue();
 
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();
 
channel.basicPublish("", "rpc_queue", props, message.getBytes());
 
// ... then code to read a response message from the callback_queue ...

 说明:
AMQP协议对一个消息预设了一个14个属性的集合,大部分属性很少被使用,有以下例外:

1、deliveryMode:标志一个消息的持久化(值为2)或者状态(其他任何值)。

2、contentType:用于描述编码的MIME类型,比如,要经常使用JSON编码大的一个好的设置方法为application/json

3、replyTo:常用的回调队列名称。

4、correlationId:被用于一个RPC相应请求相关。

同时我们需要一个新的类:

import com.rabbitmq.client.AMQP.BasicProperties;

1.3、相关ID (correlation Id)

在上述方法我们为每个RPC请求创建一个回调队列,那是很低效的但是幸运的是有一个更好的方式,让我们创建一个单一回调队列供每个客户端调用。

这出现了一个新的问题,在队列中接收到一个不清楚这个请求属于哪个响应时的响应,我们要将它设置为每个请求的一个特有的值,然后从一个回调队列中接收一个消息时就要查看这个属性值,在此基础上,我们将能匹配一个请求的响应,如果我们看到一个未知的correlationId值,我们可以安全地将这些消息丢弃因为它不属于我们的要求。

你也许会问,我们为什么要丢弃回调队列中未知的消息呢?而不是一个错误引起的失败呢?这是由于一个可能在服务器的竞争引起的,虽然不太可能,但是它还是有可能发生的,RPC服务器在给我们大答复之后将挂掉,但是发送确认消息的请求,如果这种情况发生,将再次重启RPC服务器处理请求,这就是为什么在客户端必须处理重复的响应。


二、实现

2.1、结构如下图所示:

从上图可知,我们RPC工作流程如下:

1、当客户端启动时,它创建一个匿名的独立的回调队列。

2、一个RPC请求中,客户端发送一个消息具有两个特性:replyTo它包含将要到达的回调队列和correlation_id,这是每个请求的一个固有的值。

3、请求发送到一个rpc_queue队列。

4、RPC服务器正在等待队列的请求,当一个请求到达时,它的工作久是发送一个消息结果返回给客户端,使用了replyTo队列。

5、客户端等待回调replyTo队里的数据,当消息出现时,它检查correlationId值是否和请求返回给应用程序响应的值匹配。

2.2、代码实现

Fibonacci 函数:

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

我们声明的Fibonacci函数,它假定只有有效的整整输入(别指望一个大的数字,它可能是最慢的递归实现),我们RPC服务器RPCServer.java代码如下:

private static final String RPC_QUEUE_NAME = "rpc_queue";
 
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
 
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
 
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
 
channel.basicQos(1);
 
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
 
System.out.println(" [x] Awaiting RPC requests");
 
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 
    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();
 
    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);
 
    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);
 
    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
 
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

 说明:
1、像之前所有实例一样,开始建立连接、通道和队列

2、可能要运行多个服务器进程,为了传播同样的负载在多个服务器上,我们需要通过channel.basicQos来设置prefetchcount值。

3、通过basicConsume访问队列,然后进入循环,等待请求消息、处理消息和发送响应。

RPCClient.java代码如下:

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
 
public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();
 
    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}
 
public String call(String message) throws Exception {     
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();
 
    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();
 
    channel.basicPublish("", requestQueueName, props, message.getBytes());
 
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }
 
    return response; 
}
 
public void close() throws Exception {
    connection.close();
}

 说明:
1、建立一个连接通道和声明一个专属的回调队列的回复。

2、订阅回调队列,这样可以接受RPC响应

3、调用方法发起实际的RPC请求。 

4、生成一个唯一的correlationId并且保存它,while循环将使用这个值来匹配相对应的响应。

5、发送请求消息,它有两个属性值relpTo和correlationId。

6、等待相匹配的响应。

7、while循环做简单的工作,为每个响应检查是否correlationId就是我们需要的,如果是,保存该响应。

8、返回响应给客户端。

客户端请求代码:

RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");   
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();

2.3、完整的代码清单

RPCClient.java

package com.xuz.rpc;
 
import java.util.UUID;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
 
public class RPCClient {
	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	private QueueingConsumer consumer;
 
	public RPCClient() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		connection = factory.newConnection();
		channel = connection.createChannel();
		//响应队列名,服务端会把返回的信息发送到这个队列中。
		replyQueueName = channel.queueDeclare().getQueue();
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
	}
 
	public String call(String message) throws Exception {
		String response = null;
		//每个请求生成一个唯一的correlationId
		String corrId = UUID.randomUUID().toString();
		//设置请求响应基本参数:correlationId(UUID)和rpc_queue
		BasicProperties props = new BasicProperties.Builder().correlationId(
				corrId).replyTo(replyQueueName).build();
		System.out.println("客户端响应队列的属性:["+props.getCorrelationId()+","
				+props.getReplyTo()+"]");
		channel.basicPublish("", requestQueueName, props, message.getBytes());
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			//如果获得响应队列中的getCorrelationId和当前corrId相等,则保存响应并返回
			if (delivery.getProperties().getCorrelationId().equals(corrId)) {
				response = new String(delivery.getBody(), "UTF-8");
				break;
			}
		}
		return response;
	}
	/**
	 * 关闭连接
	 * @throws Exception
	 */
	public void close() throws Exception {
		connection.close();
	}
 
	public static void main(String[] argv) {
		RPCClient rpcClient = null;
		String response = null;
		try {
			rpcClient = new RPCClient();
			//调用Call方法传入请求消息:测试RPC
			response = rpcClient.call("测试RPC");
			System.out.println(" 响应消息:[" + response + "]");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (rpcClient != null) {
				try {
					rpcClient.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

RPCServer.java:

package com.xuz.rpc;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
 
public class RPCServer {
	private static final String RPC_QUEUE_NAME = "rpc_queue";
	/**
	 * 定义函数
	 * @param n 输入的正整数
	 * @return
	 */
	private static int fib(int n) {
		if (n == 0)
			return 0;
		if (n == 1)
			return 1;
		return fib(n - 1) + fib(n - 2);
	}
	
	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			//获取连接工厂
			ConnectionFactory factory = new ConnectionFactory();
			//设定主机
			factory.setHost("127.0.0.1");
			//创建连接
			connection = factory.newConnection();
			//创建通道
			channel = connection.createChannel();
			//声明RPC队列
			channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
			//设置公平调度
			channel.basicQos(1);
			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
			System.out.println("[等待RPC远程请求!]");
			while (true) {
				String response = null;
				System.out.println("[服务端等待接收消息!]");
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				System.out.println("[服务端成功接收消息!]");
				BasicProperties props = delivery.getProperties();
				//从响应队列获取reply参数
				BasicProperties replyProps = new BasicProperties.Builder()
						.correlationId(props.getCorrelationId()).build();
				System.out.println("服务端响应队列的属性:["+replyProps.getCorrelationId()+"]");
				try {
					String message = new String(delivery.getBody(), "UTF-8");
					response = "服务端已经处理了消息:[" + message+"]";
				} catch (Exception e) {
					System.out.println(" [.] " + e.toString());
					response = "";
				} finally {
					//将结果返回给客户端
					channel.basicPublish("", props.getReplyTo(), replyProps,
							response.getBytes("UTF-8"));
					//设置确认消息
					channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
							false);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。