RabbitMQ 核心部分之简单模式和工作模式

举报
哥的时代 发表于 2023/12/25 22:59:53 2023/12/25
【摘要】 前言1.消息属性RabbitMQ是基于AMQP消息传输协议来实现的消息中间件;类似HTTP有header和body两部分数据,Message是RabbitMQ中的消息体概念。Message由Properties和Body组成,前者是一些元信息,如消息的优先级、持久化、传输格式(如JSON)、延迟等高级特性,Body则是传递的消息数据实体2.消息投递Exchange、Queue与Routin...

前言

1.消息属性

RabbitMQ是基于AMQP消息传输协议来实现的消息中间件;类似HTTP有header和body两部分数据,Message是RabbitMQ中的消息体概念。

Message由Properties和Body组成,前者是一些元信息,如消息的优先级、持久化、传输格式(如JSON)、延迟等高级特性,Body则是传递的消息数据实体

2.消息投递

Exchange、Queue与Routing Key三个概念是理解RabbitMQ消息投递的关键。RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。

Producer只能将自己的消息投递到Exchange中,由Exchange按照routing_key投递到对应的Queue中。

3.消息可靠性

不同于HTTP的同步访问,RabbitMQ中,Producer并不知道消息是否被可靠地投递到了Consumer中处理。那么,RabbitMQ是如何保证消息的可靠投递?

主要是两点:第一,消息确认机制。Consumer处理完消息后,需要发送确认消息给Broker Server,可以选择“确认接收”、“丢弃”、“重新投递”三种方式。如果Consumer在Broker Server收到确认消息之前挂了,Broker Server便会重新投递该消息。

第二,可以选择数据持久化,这样即使RabbitMQ重启,也不会丢失消息

一、Hello World(简单)模式

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代
表使用者保留的消息缓冲区
image.png

博主这里使用JAVA实现。

1.导入依赖

<!--指定 jdk 编译版本-->
<build>
 	<plugins>
 		<plugin>
 			<groupId>org.apache.maven.plugins</groupId>
 			<artifactId>maven-compiler-plugin</artifactId>
 			<configuration>
 				<source>8</source>
 				<target>8</target>
 			</configuration>
 		</plugin>
 	</plugins>
</build>
<dependencies>
 	<!--rabbitmq 依赖客户端-->
 	<dependency>
 		<groupId>com.rabbitmq</groupId>
 		<artifactId>amqp-client</artifactId>
		<version>5.8.0</version>
 	</dependency>
 	<!--操作文件流的一个依赖-->
 	<dependency>
 		<groupId>commons-io</groupId>
 		<artifactId>commons-io</artifactId>
 		<version>2.6</version>
 	</dependency>
</dependencies>

2.消息生产者

public class Producer {
 	private final static String QUEUE_NAME = "hello";
	public static void main(String[] args) throws Exception {
 		//创建一个连接工厂
 		ConnectionFactory factory = new ConnectionFactory();
 		factory.setHost("192.168.10.130");
 		factory.setUsername("guest");
 		factory.setPassword("guest");
 		//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
 		try(Connection connection = factory.newConnection();Channel channel = 
connection.createChannel()) {
 			/**
 			* 生成一个队列
 			* 1.队列名称
 			* 2.队列里面的消息是否持久化 默认消息存储在内存中
 			* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
 			* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
 			* 5.其他参数
 			*/
 			channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 			String message="hello world";
 			/**
 			* 发送一个消息
 			* 1.发送到那个交换机
 			* 2.路由的 key 是哪个
 			* 3.其他的参数信息
 			* 4.发送消息的消息体
 			*/
 			channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 			System.out.println("消息发送完毕");
 		}
 	}
}

3.消息消费者

public class Consumer {
 	private final static String QUEUE_NAME = "hello";
	public static void main(String[] args) throws Exception {
 		ConnectionFactory factory = new ConnectionFactory();
 		factory.setHost("192.168.10.130");
 		factory.setUsername("guest");
 		factory.setPassword("guest");
 		Connection connection = factory.newConnection();
 		Channel channel = connection.createChannel();
 		System.out.println("等待接收消息....");
 		//推送的消息如何进行消费的接口回调
 		DeliverCallback deliverCallback=(consumerTag,delivery)->{
 			String message= new String(delivery.getBody());
 			System.out.println(message);
 		};
 		//取消消费的一个回调接口 如在消费的时候队列被删除掉了
 		CancelCallback cancelCallback=(consumerTag)->{
 		System.out.println("消息消费被中断");
 		};
 		/**
 		* 消费者消费消息
 		* 1.消费哪个队列
 		* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
 		* 3.消费者未成功消费的回调
 		*/
 		channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
 	}
 }

二、Work Queues(工作)模式

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进
程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

1.抽取工具类

public class RabbitMqUtils {
 	//得到一个连接的 channel
 	public static Channel getChannel() throws Exception{
 		//创建一个连接工厂
 		ConnectionFactory factory = new ConnectionFactory();
 		factory.setHost("192.168.10.130");
 		factory.setUsername("guest");
 		factory.setPassword("guest");
 		Connection connection = factory.newConnection();
 		Channel channel = connection.createChannel();
 		return channel;
 	}
}

2.启动两个工作线程

public class Worker01 {
 	private static final String QUEUE_NAME="hello";
 	public static void main(String[] args) throws Exception {
 		Channel channel = RabbitMqUtils.getChannel();
 		DeliverCallback deliverCallback=(consumerTag,delivery)->{
 			String receivedMessage = new String(delivery.getBody());
 			System.out.println("接收到消息:"+receivedMessage);
 		};
 		CancelCallback cancelCallback=(consumerTag)->{
 			System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
		};
 		System.out.println("C2 消费者启动等待消费......");
 		channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
 	}
}

3.启动一个发送线程

public class Task01 {
 	private static final String QUEUE_NAME="hello";
 	public static void main(String[] args) throws Exception {
 		try(Channel channel=RabbitMqUtils.getChannel();) {
 			channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 			//从控制台当中接受信息
 			Scanner scanner = new Scanner(System.in);
 			while (scanner.hasNext()){
 				String message = scanner.next();
 				channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 				System.out.println("发送消息完成:"+message);
 			}
 		}
 	}
}

4.结果

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且
是按照有序的一个接收一次消息
image.png


总结

以上就是RabbitMQ 核心部分之简单模式和工作模式的相关知识,希望对你有所帮助。
积跬步以至千里,积怠惰以至深渊。时代在这跟着你一起努力哦!

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。