RabbitMQ Pub/Sub(发布订阅)模式
这个模式和之前的相比还是有一点的变化,我们还是去官网去看这个模型图。首先是必要的生产者和消费者,中间的X代表交换机,这里我们会用到交换机,然后这里有两个队列,然后在分发给两个消费者。
这里的人生产者不会直接发消息给队列,而是给到了一个交换机。交换机一方面等待接收生产者的发送的消息,一方面可以进行一个指定的处理,比如是广播的模式还是适配或者是指定。红色的消息队列接收消息,并且在消费者没有消费之前做一个暂时的存储以及缓存消息的作用。消费者等待消息。
这里提到交换机,我们介绍三种常见的模式,
➢ Fanout:广播,将消息交给所有绑定到交换机的队列
➢ Direct:定向,把消息交给符合指定routing key 的队列
➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
另外交换机啊,它是不存储消息的,交换交换就是只有转发逻辑的功能,并不会存储消息。
因为这种模式引入了交换机,所以必然存在一个交换机和队列绑定的逻辑,不然交换机怎么知道有哪些队列。知道要联系的队列,然后做出一个转发的处理逻辑。
然后就编写代码实现一下,实现一下这个过程。
创建一个生产者
package com.jgdabc.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//完成发送消息
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
// 5:创建交换机
// exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
// 1:exchange : 交换机名称
// 2: type:交换机的类型 BuiltinExchangeType代表枚举类型
// Direct("direct"):定向
// FANOUT("fanout") : 扇形广播,发送消息到每一个与之绑定的队列
// TOPIC("topic") 通配符的方式
// HEADERS("headers")
// 3:durable是否持久化
// 4:autoDelete:是否自动删除
// 5:internal 内部使用
// 6Larguments:参数列表
String exchangeName = "test_fanout";
//BuiltinExchangeType.FANOUT 代表广播,扇形交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 6:创建队列
String queueName = "test_fanout_queue";
String queueName01 = "test_faout_queue01";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueDeclare(queueName01,true,false,false,null);
// queueBind(String queue, String exchange, String routingKey)
// 7 绑定队列和交换机
// queue:绑定的队列名称
// exchange:交换机名称
// routingKey:路由键,绑定规则
// 如果倦鸟还击的类型为fanout,routingKey设置为“”
channel.queueBind(queueName,exchangeName,"");
channel.queueBind(queueName01,exchangeName,"");
// 8:发送消息
String body = "日志信息。张三调用了findAll方法,日志级别info....";
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 9:释放资源
channel.close();
connection.close();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
这里我们用到交换机的方法注释里面都有说明,说明会的参数顺序和方法中的顺序一样。
我们声明了队列,并设置了参数。这里如果不明白参数的含义请参考上一篇代码中的注释。
队列参数说明
queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
声明完交换机后,我们声明了两个队列。然后交换机绑定了这两个队列。
我们注释里也说明了路由的方式是广播的模式,广播是什么呢?就是交换机会将消息转发到与之绑定的所有队列。都会通知到,然后这些队列可以做一个处理其实,怎么处理是它们的事情了。给到不同的消费者,消费者的处理也是一个逻辑。可以模拟的是一个队列进行了日志打印,一个进行了数据的数据库存储。
交换机会有一个路由的参数,因为交换机是广播的模式,所以路由直接就指定为空字符串就好。
然后呢,生产者就到这里,然后我们来看消费者,注意,这里的消费者不会竞争同一个队列,是不同队列的。
消费者一
package com.jgdabc.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
String queueName = "test_fanout_queue";
String queueName01 = "test_faout_queue01";
// 发送消息
//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
// 如果没有一个helloword的队列,则会创建该队列
channel.queueDeclare("work_queues",true,false,false,null);
// 接收消息
// basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
// 参数说明
// queue: 队列名称
// autoAck : 是否自动确认
// callback: 回调函数
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后会自动执行该方法
// consumerTag:消息表示
// ebvelop:获取一些信息,交换机的信息,路由等等
// properties:配置信息
// body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台。。。");
}
};
channel.basicConsume(queueName,true,consumer);
//
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
消费者二
package com.jgdabc.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
String queueName = "test_fanout_queue";
String queueName01 = "test_faout_queue01";
// 发送消息
//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
// 如果没有一个helloword的队列,则会创建该队列
channel.queueDeclare("work_queues",true,false,false,null);
// 接收消息
// basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
// 参数说明
// queue: 队列名称
// autoAck : 是否自动确认
// callback: 回调函数
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后会自动执行该方法
// consumerTag:消息表示
// ebvelop:获取一些信息,交换机的信息,路由等等
// properties:配置信息
// body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("将日志信息保存数据库。。。");
}
};
channel.basicConsume(queueName01,true,consumer);
//
}
}
```
测试
![在这里插入图片描述](https://img-blog.csdnimg.cn/70ddb2aa89f04cc9ac07437e7cbbd2c2.png)
![在这里插入图片描述](https://img-blog.csdnimg.cn/18f01a6e415b4e4e910f5ae7bc0b9f25.png)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
文章来源: daodaozi.blog.csdn.net,作者:兰舟千帆,版权归原作者所有,如需转载,请联系作者。
原文链接:daodaozi.blog.csdn.net/article/details/125856056
- 点赞
- 收藏
- 关注作者
评论(0)