RabbitMQ学习笔记 03、交换机模式(4种)
@[toc]
前言
本篇博客是RabbitMQ的四种交换机模式,若文章中出现相关问题,请指出!
所有博客文件目录索引:博客目录索引(持续更新)
博客案例代码可见:Gitee-demoexer/RabbitMQ、Github-javademos/RabbitMQ
交换机工作模式(四种)
fanout
:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的。(广播,直接绑定队列)
- 我们之前的哪里就是fanout形式。
direct
:根据RoutingKey匹配消息路由到指定的队列。(绝对匹配)
topic
:生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配方式进行相应转发。(模糊匹配)
headers
:根据发送消息内容中的headers属性来匹配。
有了交换机之后就有了更强大的能力,可以根据交换机的模式来完成更加复杂的功能。
一、fanout模式
1.1、基本概念
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JBGMtaQg-1651541063735)(C:\Users\93997\AppData\Roaming\Typora\typora-user-images\image-20210925070020024.png)]
应用场景:例如我们要通过rabbitmq来进行日志记录,一般会以两种形式,控制台以及存储到磁盘方式进行落库存盘。无论哪种方式,其内容本质是相同的,此时我们可以使用fanout方式。
特点:
- 使用fanout的特点就是只有当消费者连接之后生产者发送的消息才会生效,在此之前生产者产生的会默认直接丢弃,这也就解决了消息积压的问题。(对于之前的一些日志记录丢弃掉也没有关系)
- 若是运行多个服务,那么每一个服务都能够收到同一份消息,而不像之前案例一样均匀分配任务了!!!
1.2、代码实操
目的(实现效果):发送的多个日志信息,每个服务都能够收到相同数量且同样内容的信息。
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
/**
* @ClassName EmitLog
* @Author ChangLu
* @Date 2021/9/25 7:10
* @Description TODO
*/
public class EmitLog {
//核心:指定交换机名称
private static String EXCHANGE_NAME = "LOGS";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.118.128");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("password");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
//绑定一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//向指定的交换机发送消息
String msg = "this is a log info!";
//第二个参数是routingkey,第三个参数是基本属性
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功!");
}catch (Exception e){
System.out.println("消息发送失败,失败原因:"+e.getMessage());
}
Optional.ofNullable(channel).ifPresent(EmitLog::close);
Optional.ofNullable(connection).ifPresent(EmitLog::close);
}
//反射进行资源关闭
public static <T> void close(T t){
Class<?> aClass = t.getClass();
Optional.ofNullable(t).ifPresent(c-> {
try {
Method closeMethod = aClass.getDeclaredMethod("close");
closeMethod.setAccessible(true);
closeMethod.invoke(t, null);
} catch (Exception e){
e.printStackTrace();
}
});
}
}
消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @ClassName ReceiveLog
* @Author ChangLu
* @Date 2021/9/25 7:23
* @Description TODO
*/
public class ReceiveLog {
private static String EXCHANGE_NAME = "LOGS";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.118.128");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("password");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//绑定指定的交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//生成一个临时的随机的queue,并绑定交换机与队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");//第三个参数为routingkey
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("接收到消息:"+msg);
}
};
//进行消费,需要指定队列名称
channel.basicConsume(queueName,true,consumer);
}
}
演示:生产者连发两条消息,启动的两个消费者服务都能够接收到相同的两份信息
二、direct模式
2.1、基本概念
概念图
可以看到下图,相同的队列可以指定相同的routingkeys,这是不受影响的。
说明
场景:有些情况下,对于日志而言我们进行升级,可能不需要把所有的日志都存储到磁盘上,只需要在磁盘中存储错误日志,例如error类型的,日志分为不同等级嘛。
- 在磁盘中只存储error,而在控制台里把所有消息都打印出来。涉及到不同的消费者接收消息不一致的情况此时非常建议使用direct模式。
实现效果:生产者服务在消费者服务启动前发送的消息,不会放入到队列中存储起来,只有在消费者服务启动后发送的消息才会被进行分发处理。
实现方式:设置交换机为direct,并且需要额外设置routingkey,可以将key理解为对指定信息感兴趣。
2.2、代码实操
目的(实现效果):不同类型的日志交由不同的服务处理,例如info、debug、warning交由消费者服务1处理;error交由消费者服务2处理。
生产者:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* @ClassName EmitLogs
* @Author ChangLu
* @Date 2021/9/25 9:44
* @Description 生产者:绑定交换机direct_LOGS,发送四个消息,每个消息匹配一个routingkeys(info、debug、error、warning)
*/
public class EmitLogs {
//核心:指定交换机名称,需要跟之前的不一样
private static String EXCHANGE_NAME = "direct_LOGS";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.118.128");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("password");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//绑定一个Direct类型交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String msg = "sended msg";
//发送三条消息,每个消息绑定对应的routingkey,发送给指定的一个交换机
String INFO_MESSAGE = "info-"+msg;
channel.basicPublish(EXCHANGE_NAME,"info",null,INFO_MESSAGE.getBytes(StandardCharsets.UTF_8));
System.out.println("成功发送消息:"+INFO_MESSAGE);
String DEBUG_MESSAGE = "debug-"+msg;
channel.basicPublish(EXCHANGE_NAME,"debug",null,DEBUG_MESSAGE.getBytes(StandardCharsets.UTF_8));
System.out.println("成功发送消息:"+DEBUG_MESSAGE);
String WARNING_MESSAGE = "warning-"+msg;
channel.basicPublish(EXCHANGE_NAME,"warning",null,WARNING_MESSAGE.getBytes(StandardCharsets.UTF_8));
System.out.println("成功发送消息:"+WARNING_MESSAGE);
String ERROR_MESSAGE = "warning-"+msg;
channel.basicPublish(EXCHANGE_NAME,"error",null,ERROR_MESSAGE.getBytes(StandardCharsets.UTF_8));
System.out.println("成功发送消息:"+ERROR_MESSAGE);
channel.close();
connection.close();
}
}
消费者1:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @ClassName ReceiveLogs
* @Author ChangLu
* @Date 2021/9/25 9:50
* @Description 消费者:绑定交换机direct_LOGS,并且设置三个routingkeys:info、debug、warning
*/
public class ReceiveLogs1 {
private static String EXCHANGE_NAME = "direct_LOGS";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.118.128");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("password");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//绑定指定的交换机类型——direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//生成一个随机的临时的queue
String queueName = channel.queueDeclare().getQueue();
//一个交换机同时绑定三个routingkeys
channel.queueBind(queueName,EXCHANGE_NAME,"info");
channel.queueBind(queueName,EXCHANGE_NAME,"debug");
channel.queueBind(queueName,EXCHANGE_NAME,"warning");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("接收到消息:"+msg);
}
};
//进行消费,需要指定队列名称
channel.basicConsume(queueName,true,consumer);
}
}
消费者2:大部分内容与消费者1相同,只不过就指定的routingkeys不一样而已
//绑定direct类型交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
//绑定指定routingkeys:error
channel.queueBind(queueName,EXCHANGE_NAME,"error");
测试一下:生产者发送了四个不同routingkeys的消息,两个消费者接受对应routingkeys信息
三、topic模式
3.1、基本概念
我们可对routingkey进行模糊匹配,如使用*或#来进行匹配!
*
可以代替一个单词
#
可以替代零个或多个单词
我们可以对routingkeys进行.分割的不同类型区分,上面对应着消费者服务来对生产者产生的信息进行过滤,每个队列能够拿到对应交换机可模糊匹配的执行信息,来达到区分的效果!
3.2、实战
目的:有10条记录其中包含了不同类型的内容,生产者发送消息的同时每条记录都附带对应详细的routingkey;消费者则会定义指定的routingkey模糊表达式,用于进行模糊匹配拿到生产者发送过来的信息记录条数。
生产者
这里对方法进行了抽离,实际上基本配置与之前都相同。这里的话准备了一个routingkey数组用于分别描述各组信息与实体信息传递出去:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @ClassName TopicProduce
* @Author ChangLu
* @Date 2021/9/25 10:57
* @Description 生产者:交换机类型为topic
*/
public class TopicProduce {
private static String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Util.doBefore();
Channel channel = Util.channel;
//指定交换机为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//定义关键字,不同的消息在发送时都指定routingKeys
String[] routingKeys = new String[]{
"quick.orange.rabbit","quick.orange.fox","quick.brown","quick.orange.male.rabbit",
"lazy.orange.elephant","lazy.brown.fox","lazy.pink.rabbit","lazy.orange.male.rabbit",
"orange"
};
for (String routingKey : routingKeys) {
String msg = "信息内容:"+routingKey;
//发送
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("已发送"+msg);
}
Util.close();
}
static class Util{
private static Connection connection = null;
public static Channel channel = null;
public static void doBefore() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.118.128");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("password");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
}
public static void close() throws IOException, TimeoutException {
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
}
}
}
消费者
消费者1:基本配置都相同,不再重复展示,匹配的routingkey为"*.orange.*"
private static String EXCHANGE_NAME = "topic_exchange";
...
//指定交换机为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//指定routingkeys,可搭配*或#进行匹配
String bindingKey = "*.orange.*";
channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);//绑定队列、交换机以及routingkey
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("收到消息:"+msg);
}
});
...
消费者2:匹配的routingkey为"quick.*"
、"lazy.orange.#"
、"quick.orange.fox"
private static String EXCHANGE_NAME = "topic_exchange";
//指定交换机为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//指定routingkeys,可搭配*或#进行匹配
String bindingKey = "quick.*";
channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);//绑定队列、交换机以及routingkey
String bindingkey2 = "lazy.orange.#";
channel.queueBind(queueName,EXCHANGE_NAME,bindingkey2);
String bindingkey3 = "quick.orange.fox";
channel.queueBind(queueName,EXCHANGE_NAME,bindingkey3);
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("收到消息:"+msg);
}
});
...
测试:可以看到对应的消费者服务都能够进行精确匹配拿到对应的信息
精炼总结
直接指定队列形式:消费者服务没有启动前发送的消息,在消费者服务启动后仍然能够收到,因为此前发送的消息被存储到队列中。
- 就是一、二章节中的案例。
交换机类型:第三章节案例
- fanout:①交换机发送的消息都一致的发送给现有的队列中。(每个队列都能够收到相同的消息)②在使用fanout类型交换机的消费者服务不会接收服务启动前生产者发送的信息,也就是说启动前发送的消息都会被丢弃掉!
- 应用场景:相同数量及内容多个消费者都能够收到,分别对所有内容进行不同的处理。
- direct:①交换机在发送的时候带上routingkey,消息队列在接收时也指定routingkey即可拿到对应交换机想要发送给某个队列的信息,来达到直接准确的发送!②消费者服务启动前,生产者发送给指定交换机的信息会被丢弃。
- 应用场景:不同类型的日志记录进行不同处理,如窗口执行、磁盘存储。
- topic模式(与direct是兄弟关系,或升级关系):①根据严重性,源头去考虑。消费者服务可以进行模糊匹配(*号代替一个单词,#替代零个或多个单词)。交换机可以设置多个routingkey,对应的接收队列可以进行模糊匹配。②②消费者服务启动前,生产者发送给指定交换机的信息不会被丢弃。
- headers:根据发送消息内容中的headers属性来匹配。(实际基本不会使用)
整理者:长路 时间:2021.9.25
类型交换机的消费者服务不会接收服务启动前生产者发送的信息,也就是说启动前发送的消息都会被丢弃掉!
- 应用场景:相同数量及内容多个消费者都能够收到,分别对所有内容进行不同的处理。
- direct:①交换机在发送的时候带上routingkey,消息队列在接收时也指定routingkey即可拿到对应交换机想要发送给某个队列的信息,来达到直接准确的发送!②消费者服务启动前,生产者发送给指定交换机的信息会被丢弃。
- 应用场景:不同类型的日志记录进行不同处理,如窗口执行、磁盘存储。
- topic模式(与direct是兄弟关系,或升级关系):①根据严重性,源头去考虑。消费者服务可以进行模糊匹配(*号代替一个单词,#替代零个或多个单词)。交换机可以设置多个routingkey,对应的接收队列可以进行模糊匹配。②②消费者服务启动前,生产者发送给指定交换机的信息不会被丢弃。
- headers:根据发送消息内容中的headers属性来匹配。(实际基本不会使用)
- 点赞
- 收藏
- 关注作者
评论(0)