RabbitMQ 第3章 RabbitMQ Work Queues(工作队列)

举报
许小强 发表于 2022/03/17 17:26:42 2022/03/17
【摘要】 RabbitMQ 队列

一、概述

工作队列(Work queues)

(使用Java客户端)

在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现。

工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享。

它在web应用中是非常有用的,因为在很短的时间内http请求窗口处理一个复杂的任务是不可能实现的,它的结构如下图-1所示:

二、实现步骤

2.1、准备

在本部分内容之前,已经实现了发送单条”Hello World!“的消息,现在将发送一些复杂的字符串任务,由于没有一个真实的生产环境来模拟,我们可以通过使用Thread.sleep()函数来假设任务通过描述字符串hello...将要花费三秒钟的时间。

从之前的实例中,我们只需要稍微修改Sender01.java代码,允许任意的消息通过命令行发送,这一计划将安排到工作队列中的任务,让我们将

String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

消息处理:

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

之前的旧的Recv01.java也需要做一些改变。

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");        
    doWork(message);
    System.out.println(" [x] Done");
}

模拟假任务执行时间:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

2.2、轮询调度

使用消息队列的优点之一就是能够方便地并行工作,如果我们建立了大量的工作任务,我们就可以添加更多的worker,这样大规模应用就显的比较容易,首先让我们尝试同时运行两个Worker.java脚本,他们都将获得队列中的消息,但是具体情况如下,我们需要打开三个控制台,两个将运行Worker.java的脚本,这些控制台将显示两个消费者C1和C2,第三个控制台将发布一个新的任务,一旦启动了消费者,你将可以通过第三个控制台发布一些消息,在默认情况下RabbitMQ将发送每条消息给下一个消费者,在队列里每个消费者将获得同样数量的消息,这种方式被称之轮询调度。

2.3、消息确认

执行一个任务要花费几秒钟,你可能想知道在此过程中会发生什么,如果一个消费者启动一项长期任务,它只是部分完成了,在我们当前的代码中,一旦RabbitMQ提供了一个消息给客户立即就将其从内存中删除,在这种情况下,如果一个工作者down掉,我们就将失去消息,尽管它只是处理,但是也将失去这部分消息,但是我们必须确保不丢失任何消息,如果一个工作者down了,我们希望这部分任务被传递到另外一个工作者的任务当中。

为了确保每条消息不丢失,RabbitMQ支持消息的确认(Acknowledgments),一个确认被送回消费者告诉RabbitMQ的一个特定的消息一经被接收和处理,RabbitMQ此时才可以将该特定的消息删除。

如果消费者进程被杀掉而没有发送一个确认给RabbitMQ服务器,RabbitMQ会明白这个消息是没有被正常完成处理。

目前还有任何的消息超市,只有当工作者连接中断,RabbitMQ将重新传送消息,即使重新处理消息需要一个很长的时间。

消息确认默认是被打开的,在之前的实例中,我们可以通过AUTOACK=true来打开这个标记,当我们完成了任务,就应该移除这个标志,并且发送一个确认给工作者(Worker).

QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
 
while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  //...      
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

2.4、消息持久化

前面已经给出了如何确保消费者down了,任务也不会丢失,但是我们的任务在RabbitMQ服务器停止时还是可能会被丢失,当RabbitMQ服务器宕机或者奔溃时,它会丢失所有的队列和消息,除非告诉它不要这么做,需要做两件事情确保消息不会被丢失,我们需要标记队列和消息持久化,首先我们需要确保RabbitMQ永远不会丢失队列,为了做到这点,我们需要将队列声明为持久化:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

此时queueDeclare变化需要被兼容到生产者和消费者的代码中,在这一点上,我们肯定不会丢失队列,即使task_queue队列所属的RabbitMQ服务器重新启动,现在我们需要将消息标记为持久性,通过设置MessageProperties值为 PERSISTENT_TEXT_PLAN来实现。

import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue", 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

消息持久化需要注意的事项:

标记为持久化的消息并不能完全保证消息不会被丢失,虽然它告诉RabbitMQ的消息保存到磁盘,仍然有一个很短的时间内,RabbitMQ的消息一经接收了,但是并没有成功保存到磁盘,而是保存在缓存汇总,持久化的保证能力不足,但它是我们简单任务队列已经足以满足需求了。

2.5、公平调度

RabbitMQ当消息进入队列后,仅仅只是分发消息,它没有为消费者查找未确定的消息,它只是一味的分发n个消息给n个消费者,假如我需要两个工作者来分别处理消息,一个工作者处理所有奇数的消息,另外一个工作者处理偶数的消息,此时就需要打破RabbitMQ广播式的分发消息的规则,我们可以使用basicQos方法来实现,将其设置为1.

int prefetchCount = 1;
channel.basicQos(prefetchCount);

它的结构图如下图-2所示:

2.6、完整的代码清单:

package com.xuz.task;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
	private static final String TASK_QUEUE_NAME = "task_queue";
	public static void main(String[] args) throws IOException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		/**
		 * true:消息持久化设置
		 * 这样设置之后,服务器收到消息后就会立刻将消息写入到硬盘,就可以防止突然服务器挂掉,而引起的数据丢失了。
		 * 但是服务器如果刚收到消息,还没来得及写入到硬盘,就挂掉了,这样还是无法避免消息的丢失。
		 */
		channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
		//封装发送消息
//		String[] msg={"xuzheng","test01","rabbitMQ"};
//		String message = getMessage(msg);
		String message = null;
		for (int i = 0; i < 10; i++) {
			message = "测试公平调度"+i;
			//消息持久化设置MessageProperties.PERSISTENT_TEXT_PLAIN 当rabbitMQ暂时down掉,下次重启之后,工作者还是能接受目前发送的消息。
			channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
					message.getBytes());
			System.out.println("send:["+message+"]");
		}
		channel.close();
		conn.close();
	}
 
	private static String getMessage(String[] args) {
		if(args.length<1){
			return "Hello World!";
		}else{
			return joinStrings(args,"");
		}
	}
 
	private static String joinStrings(String[] args, String string) {
		int len = args.length;
		if(len == 0){
			return "";
		}
		StringBuilder words = new StringBuilder(args[0]);
		for (int i = 0; i < len; i++) {
			words.append(string).append(args[i]);
		}
		return words.toString();
	}
}	
package com.xuz.work;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class Worker {
	private static final String TASK_QUEUE_NAME = "task_queue";
	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		/**
		 * true:
		 * 这样设置之后,服务器收到消息后就会立刻将消息写入到硬盘,就可以防止突然服务器挂掉,而引起数据丢失了,但是服务器如果刚收到消息,还没有来得写入硬盘,就挂掉了,这样
		 * 无法避免消息得丢失。
		 */
		channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
		System.out.println("waiting for message.To exit press CTRL+C");
		channel.basicQos(1);
		QueueingConsumer consumer = new QueueingConsumer(channel);
		/**
		 * false:设置确认消息,true表示接收到消息之后,将返回给服务端确定消息
		 */
		channel.basicConsume(TASK_QUEUE_NAME, false,consumer);
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println("Received:["+message+"] from Task");
			doWork(message);
			System.out.println("Done!");
			//设置消息确认机制,如将如下代码注释掉,则
			//一旦将autoAck关闭之后,一定要记得处理完消息之后,向服务器确认消息。否则服务器将会一直转发该消息
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}
	private static void doWork(String message) throws InterruptedException {
		for (char ch : message.toCharArray()) {
			if(ch == '.')Thread.sleep(1000);
		} 
	}
}
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。