高性能RabbitMQ消息队列介绍 及 SpringBoot整合
一、高性能RabbitMQ
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。
RabbitMQ的官网
http://www.rabbitmq.com
注意点:
RabbitMQ是采用erlang语言开发的,所以必须有erlang环境才可以运行。
二、RabbitMQ关键名词
- AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制;
涉及概念解释: - Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程;
- Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue。
- Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。
- ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;
- Message Queue:消息队列,用于存储还未被消费者消费的消息;
Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容; - BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来。
三、RabbitMQ交换机的作用及类型
生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和我们之前学习Nginx有点类似。
交换机的作用根据具体的路由策略分发到不同的队列中,交换机有四种类型。
-
Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
-
Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
-
Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列
-
Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
四、使用java操作RabbitMQ
后面会讲到springboot怎么去整合RabbitMQ;springboot的整合非常简单,这里先看原生的操作方式,可以更好的去学习它。
首先在pom下引入RabbitMQ的依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
- 1
- 2
- 3
- 4
- 5
Rabbmit工具类,后面讲的生产者和消费者都会用到这个工具类:
public class RabbitMQConnection {
public static Connection getConnection(){
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务器地址
factory.setHost("127.0.0.1");
// 设置协议端口号
factory.setPort(5672);
// 设置vhost
factory.setVirtualHost("/admin_test");
// 设置用户名称
factory.setUsername("admin");
// 设置用户密码
factory.setPassword("123456");
// 创建新的连接
Connection newConnection = null;
try {
newConnection = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return newConnection;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
1、简单队列-点对点
是根据消息携带的路由键(routing key)将消息投递给对应队列的。
消息提供者:
private static void MQProviderSimple(){
//队列名称
String QUEUE_NAME = "test_queue";
// 1.获取连接
Connection newConnection = RabbitMQConnection.getConnection();
// 2.创建通道
Channel channel = null;
try {
channel = newConnection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "哈哈哈";
System.out.println("发送消息:" + msg);
// 4.发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
channel.close();
newConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
消息接收者:
private static void MQConsumerSimple(){
//队列名称
String QUEUE_NAME = "test_queue";
Connection newConnection = RabbitMQConnection.getConnection();
// 2.获取通道
Channel channel = null;
try {
channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
}
};
// 3.监听队列
System.out.println("消费者开始监听队列:="+QUEUE_NAME);
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
其中需要注意的便是提供者和消费者的队列名称一致才能完成消费,
其中channel.queueDeclare()方法中的参数分别意思为:
- 队列的名称
- 设置是否持久化, true 为持久化数据,在 服务器重启的时候可以保证不丢失相关信息。
- 设置是否排他。 true 则设置队列为排他的。如果一个队列被声明为排 他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
- 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会 自动删除。
- 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 等
2、工作队列
消费者监听消息默认是自动签收的,并且如果消费者是集群的话,RabbitMQ会以轮训的形式分发给消费者,这样如果集群服务器与服务器之间配置不同的话,配置差的服务器也要处理那么多消息,显然是不合理的,应该让配置高的多承担,配置第的少承担。因此需要,消费者收到消息还没处理完,RabbitMQ就不想它再推送消息,待处理完再推送。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
消息提供者:
private static void MQProviderWork(){
//队列名称
String QUEUE_NAME = "test_queue";
// 1.获取连接
Connection newConnection = RabbitMQConnection.getConnection();
// 2.创建通道
Channel channel = null;
try {
channel = newConnection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
String msg = "哈哈哈";
System.out.println("发送消息:" + msg);
// 4.发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
channel.close();
newConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
消息接收者:
private static void MQConsumerWork(){
//队列名称
String QUEUE_NAME = "test_queue";
Connection newConnection = RabbitMQConnection.getConnection();
try {
// 2.获取通道
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 3.监听队列
System.out.println("消费者开始监听队列:="+QUEUE_NAME);
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
其中相比简单队列增加了:channel.basicQos(1),表示每次分发不得超过一条消息。
消费者开启监听channel.basicConsume的第二个参数,便代表着应答模式,true的话为自动应答,我们需要手动应答所以为false。
channel.basicAck(envelope.getDeliveryTag(), false)便是告知服务器,我的消息已处理完成,可以接收消息了。
3.发布订阅
一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。
消息提供者:
private static void MQProviderTopic(){
//交换机的名称
String EXCHANGE_NAME = "exchange";
// 1.获取连接
Connection newConnection = RabbitMQConnection.getConnection();
// 2.创建通道
Channel channel = null;
try {
channel = newConnection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String msg = "fanout_exchange_msg";
// 4.发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
channel.close();
newConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
消息消费者:
private static void MQConsumerTopic(){
//队列名称
String QUEUE_NAME = "test_queue";
//交换机的名称
String EXCHANGE_NAME = "exchange";
Connection newConnection = RabbitMQConnection.getConnection();
try {
// 2.创建通道
Channel channel = newConnection.createChannel();
// 3.newConnection
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
相比队列形式,发布订阅,多了 channel.queueBind(),其中参数为:
- 队列名
- 交换机的名称
- 路由策略
4.路由模式
路由模式,和上面发布订阅相差不大,差别多了个routingKey 。
消息提供者:
private static void MQProviderRouter(){
//交换机的名称
String EXCHANGE_NAME = "exchange";
// 1.获取连接
Connection newConnection = RabbitMQConnection.getConnection();
try {
// 2.创建通道
Channel channel = newConnection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "info";
String msg = "direct_exchange_msg" + routingKey;
// 4.发送消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("生产者发送msg:" + msg);
} catch (IOException e) {
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
消费接收者:
private static void MQConsumerRouter(){
//队列名称
String QUEUE_NAME = "test_queue";
//交换机的名称
String EXCHANGE_NAME = "exchange";
// 1.获取连接
Connection newConnection = RabbitMQConnection.getConnection();
try{
Channel channel = newConnection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}catch (Exception e){
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
5.通配符模式Topics
此模式是在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
消息提供者:
private static void MQProviderTop(){
// //交换机的名称
String EXCHANGE_NAME = "exchange";
// 1.获取连接
Connection newConnection = RabbitMQConnection.getConnection();
try {
Channel channel = newConnection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "log.info.error";
String msg = "topic_exchange_msg" + routingKey;
// 4.发送消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("生产者发送msg:" + msg);
} catch (IOException e) {
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
消息消费者:
private static void MQConsumerTop(){
//队列名称
String QUEUE_NAME = "test_queue";
//交换机的名称
String EXCHANGE_NAME = "exchange";
// 1.获取连接
Connection newConnection = RabbitMQConnection.getConnection();
try{
// 2.创建通道
Channel channel = newConnection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}catch (Exception e){
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
注意点:
符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
- 1
- 2
6.AMQP 事务机制
生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。有时如果方法中出现错误,发出去的消息需要撤回时,遍需要有事物进行处理;
事务模式:
txSelect 将当前channel设置为transaction模式
txCommit 提交当前事务
txRollback 事务回滚
private static void MQProviderSelect() throws IOException {
// //交换机的名称
String EXCHANGE_NAME = "exchange";
// 1.获取连接
Connection newConnection = RabbitMQConnection.getConnection();
Channel channel = newConnection.createChannel();
try {
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "log.info.error";
String msg = "topic_exchange_msg" + routingKey;
// 4.发送消息
// 将当前管道设置为 txSelect 将当前channel设置为transaction模式 开启事务
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("生产者发送msg:" + msg);
channel.txCommit();// 提交事务
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();// 回滚事务
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
五、SpringBoot集成RabbitMQ
前面学习了在java中怎么使用原生api操作消息队列,下面看下SpringBoot中如何集成:
消息提供者
1.POM
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 1
- 2
- 3
- 4
2.application.properties
server.port=8081
####连接地址
spring.rabbitmq.host=127.0.0.1
####端口号
spring.rabbitmq.port=5672
####账号
spring.rabbitmq.username=admin
####密码
spring.rabbitmq.password=123456
### 地址
spring.rabbitmq.virtual-host=/admin_test
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
3.将队列与交换机绑定
这里以Fanout模式演示的
@Component
public class FanoutConfig {
// 邮件队列
private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";
// 短信队列
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
// 交换机名称
private String EXCHANGE_NAME = "fanoutExchange";
// 1.定义队列邮件
@Bean
public Queue fanOutEamilQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}
@Bean
public Queue fanOutSmsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
// 2.定义交换机
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
// 3.队列与交换机绑定邮件队列
// 参数名称要和定义队列和交换机的名称一致
@Bean
Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}
// 4.队列与交换机绑定短信队列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
4.发送
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("/Send/{queueName}")
public void Send(@PathVariable("queueName") String queueName) {
String msg = "my_fanout_msg:" + new Date();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, msg);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
消息接受者
配制和上面相同。
监听为:
@Component
@RabbitListener(queues = "fanout_eamil_queue")
public class FanoutEamilConsumer {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("邮件消费者获取生产者消息msg:" + msg);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
参考资料来源:蚂蚁课堂资料。
文章来源: blog.csdn.net,作者:小毕超,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_43692950/article/details/107597026
- 点赞
- 收藏
- 关注作者
评论(0)