RabbitMQ 第3章 RabbitMQ Work Queues(工作队列)
一、概述
工作队列(Work queues)
(使用Java客户端)
在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现。
工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享。
它在web应用中是非常有用的,因为在很短的时间内http请求窗口处理一个复杂的任务是不可能实现的,它的结构如下图-1所示:
二、实现步骤
2.1、准备
在本部分内容之前,已经实现了发送单条”Hello World!“的消息,现在将发送一些复杂的字符串任务,由于没有一个真实的生产环境来模拟,我们可以通过使用Thread.sleep()函数来假设任务通过描述字符串hello...将要花费三秒钟的时间。
从之前的实例中,我们只需要稍微修改Sender01.java代码,允许任意的消息通过命令行发送,这一计划将安排到工作队列中的任务,让我们将
String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
消息处理:
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
之前的旧的Recv01.java也需要做一些改变。
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");
}
模拟假任务执行时间:
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
2.2、轮询调度
使用消息队列的优点之一就是能够方便地并行工作,如果我们建立了大量的工作任务,我们就可以添加更多的worker,这样大规模应用就显的比较容易,首先让我们尝试同时运行两个Worker.java脚本,他们都将获得队列中的消息,但是具体情况如下,我们需要打开三个控制台,两个将运行Worker.java的脚本,这些控制台将显示两个消费者C1和C2,第三个控制台将发布一个新的任务,一旦启动了消费者,你将可以通过第三个控制台发布一些消息,在默认情况下RabbitMQ将发送每条消息给下一个消费者,在队列里每个消费者将获得同样数量的消息,这种方式被称之轮询调度。
2.3、消息确认
执行一个任务要花费几秒钟,你可能想知道在此过程中会发生什么,如果一个消费者启动一项长期任务,它只是部分完成了,在我们当前的代码中,一旦RabbitMQ提供了一个消息给客户立即就将其从内存中删除,在这种情况下,如果一个工作者down掉,我们就将失去消息,尽管它只是处理,但是也将失去这部分消息,但是我们必须确保不丢失任何消息,如果一个工作者down了,我们希望这部分任务被传递到另外一个工作者的任务当中。
为了确保每条消息不丢失,RabbitMQ支持消息的确认(Acknowledgments),一个确认被送回消费者告诉RabbitMQ的一个特定的消息一经被接收和处理,RabbitMQ此时才可以将该特定的消息删除。
如果消费者进程被杀掉而没有发送一个确认给RabbitMQ服务器,RabbitMQ会明白这个消息是没有被正常完成处理。
目前还有任何的消息超市,只有当工作者连接中断,RabbitMQ将重新传送消息,即使重新处理消息需要一个很长的时间。
消息确认默认是被打开的,在之前的实例中,我们可以通过AUTOACK=true来打开这个标记,当我们完成了任务,就应该移除这个标志,并且发送一个确认给工作者(Worker).
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
2.4、消息持久化
前面已经给出了如何确保消费者down了,任务也不会丢失,但是我们的任务在RabbitMQ服务器停止时还是可能会被丢失,当RabbitMQ服务器宕机或者奔溃时,它会丢失所有的队列和消息,除非告诉它不要这么做,需要做两件事情确保消息不会被丢失,我们需要标记队列和消息持久化,首先我们需要确保RabbitMQ永远不会丢失队列,为了做到这点,我们需要将队列声明为持久化:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
此时queueDeclare变化需要被兼容到生产者和消费者的代码中,在这一点上,我们肯定不会丢失队列,即使task_queue队列所属的RabbitMQ服务器重新启动,现在我们需要将消息标记为持久性,通过设置MessageProperties值为 PERSISTENT_TEXT_PLAN来实现。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
消息持久化需要注意的事项:
标记为持久化的消息并不能完全保证消息不会被丢失,虽然它告诉RabbitMQ的消息保存到磁盘,仍然有一个很短的时间内,RabbitMQ的消息一经接收了,但是并没有成功保存到磁盘,而是保存在缓存汇总,持久化的保证能力不足,但它是我们简单任务队列已经足以满足需求了。
2.5、公平调度
RabbitMQ当消息进入队列后,仅仅只是分发消息,它没有为消费者查找未确定的消息,它只是一味的分发n个消息给n个消费者,假如我需要两个工作者来分别处理消息,一个工作者处理所有奇数的消息,另外一个工作者处理偶数的消息,此时就需要打破RabbitMQ广播式的分发消息的规则,我们可以使用basicQos方法来实现,将其设置为1.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
它的结构图如下图-2所示:
2.6、完整的代码清单:
package com.xuz.task;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
/**
* true:消息持久化设置
* 这样设置之后,服务器收到消息后就会立刻将消息写入到硬盘,就可以防止突然服务器挂掉,而引起的数据丢失了。
* 但是服务器如果刚收到消息,还没来得及写入到硬盘,就挂掉了,这样还是无法避免消息的丢失。
*/
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
//封装发送消息
// String[] msg={"xuzheng","test01","rabbitMQ"};
// String message = getMessage(msg);
String message = null;
for (int i = 0; i < 10; i++) {
message = "测试公平调度"+i;
//消息持久化设置MessageProperties.PERSISTENT_TEXT_PLAIN 当rabbitMQ暂时down掉,下次重启之后,工作者还是能接受目前发送的消息。
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println("send:["+message+"]");
}
channel.close();
conn.close();
}
private static String getMessage(String[] args) {
if(args.length<1){
return "Hello World!";
}else{
return joinStrings(args,"");
}
}
private static String joinStrings(String[] args, String string) {
int len = args.length;
if(len == 0){
return "";
}
StringBuilder words = new StringBuilder(args[0]);
for (int i = 0; i < len; i++) {
words.append(string).append(args[i]);
}
return words.toString();
}
}
package com.xuz.work;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
/**
* true:
* 这样设置之后,服务器收到消息后就会立刻将消息写入到硬盘,就可以防止突然服务器挂掉,而引起数据丢失了,但是服务器如果刚收到消息,还没有来得写入硬盘,就挂掉了,这样
* 无法避免消息得丢失。
*/
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("waiting for message.To exit press CTRL+C");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
/**
* false:设置确认消息,true表示接收到消息之后,将返回给服务端确定消息
*/
channel.basicConsume(TASK_QUEUE_NAME, false,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received:["+message+"] from Task");
doWork(message);
System.out.println("Done!");
//设置消息确认机制,如将如下代码注释掉,则
//一旦将autoAck关闭之后,一定要记得处理完消息之后,向服务器确认消息。否则服务器将会一直转发该消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static void doWork(String message) throws InterruptedException {
for (char ch : message.toCharArray()) {
if(ch == '.')Thread.sleep(1000);
}
}
}
- 点赞
- 收藏
- 关注作者
评论(0)