RabbitMQ学习笔记 03、交换机模式(4种)

举报
长路 发表于 2022/11/28 20:17:51 2022/11/28
【摘要】 文章目录前言交换机工作模式(四种)一、fanout模式1.1、基本概念1.2、代码实操二、direct模式2.1、基本概念2.2、代码实操三、topic模式3.1、基本概念3.2、实战精炼总结 前言 本篇博客是RabbitMQ的四种交换机模式,若文章中出现相关问题,请指出! 所有博客文件目录索引:博客目录索引(持续更新) 博客案例代码可见:Gitee-demoexer/RabbitMQ、Githu

@[toc]

前言

本篇博客是RabbitMQ的四种交换机模式,若文章中出现相关问题,请指出!

所有博客文件目录索引:博客目录索引(持续更新)

博客案例代码可见:Gitee-demoexer/RabbitMQGithub-javademos/RabbitMQ

交换机工作模式(四种)

fanout:广播,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的。(广播,直接绑定队列)

  • 我们之前的哪里就是fanout形式。

direct:根据RoutingKey匹配消息路由到指定的队列。(绝对匹配)

topic:生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配方式进行相应转发。(模糊匹配)

headers:根据发送消息内容中的headers属性来匹配。

有了交换机之后就有了更强大的能力,可以根据交换机的模式来完成更加复杂的功能。



一、fanout模式

1.1、基本概念

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JBGMtaQg-1651541063735)(C:\Users\93997\AppData\Roaming\Typora\typora-user-images\image-20210925070020024.png)]

应用场景:例如我们要通过rabbitmq来进行日志记录,一般会以两种形式,控制台以及存储到磁盘方式进行落库存盘。无论哪种方式,其内容本质是相同的,此时我们可以使用fanout方式。

特点

  1. 使用fanout的特点就是只有当消费者连接之后生产者发送的消息才会生效,在此之前生产者产生的会默认直接丢弃,这也就解决了消息积压的问题。(对于之前的一些日志记录丢弃掉也没有关系)
  2. 若是运行多个服务,那么每一个服务都能够收到同一份消息,而不像之前案例一样均匀分配任务了!!!


1.2、代码实操

目的(实现效果):发送的多个日志信息,每个服务都能够收到相同数量且同样内容的信息。

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

/**
 * @ClassName EmitLog
 * @Author ChangLu
 * @Date 2021/9/25 7:10
 * @Description TODO
 */
public class EmitLog {

    //核心:指定交换机名称
    private static String EXCHANGE_NAME = "LOGS";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //绑定一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //向指定的交换机发送消息
            String msg = "this is a log info!";
            //第二个参数是routingkey,第三个参数是基本属性
            channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送成功!");
        }catch (Exception e){
            System.out.println("消息发送失败,失败原因:"+e.getMessage());
        }
        Optional.ofNullable(channel).ifPresent(EmitLog::close);
        Optional.ofNullable(connection).ifPresent(EmitLog::close);

    }

    //反射进行资源关闭
    public static <T> void close(T t){
        Class<?> aClass = t.getClass();
        Optional.ofNullable(t).ifPresent(c-> {
            try {
                Method closeMethod = aClass.getDeclaredMethod("close");
                closeMethod.setAccessible(true);
                closeMethod.invoke(t, null);
            } catch (Exception e){
                e.printStackTrace();
            }
        });
    }
}

消费者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName ReceiveLog
 * @Author ChangLu
 * @Date 2021/9/25 7:23
 * @Description TODO
 */
public class ReceiveLog {

    private static String EXCHANGE_NAME = "LOGS";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定指定的交换机类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //生成一个临时的随机的queue,并绑定交换机与队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"");//第三个参数为routingkey
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("接收到消息:"+msg);
            }
        };
        //进行消费,需要指定队列名称
        channel.basicConsume(queueName,true,consumer);
    }
}

演示:生产者连发两条消息,启动的两个消费者服务都能够接收到相同的两份信息

GIF



二、direct模式

2.1、基本概念

概念图

可以看到下图,相同的队列可以指定相同的routingkeys,这是不受影响的。

image-20210925094024822

image-20210925094034026


说明

场景:有些情况下,对于日志而言我们进行升级,可能不需要把所有的日志都存储到磁盘上,只需要在磁盘中存储错误日志,例如error类型的,日志分为不同等级嘛。

  • 在磁盘中只存储error,而在控制台里把所有消息都打印出来。涉及到不同的消费者接收消息不一致的情况此时非常建议使用direct模式。

实现效果:生产者服务在消费者服务启动前发送的消息,不会放入到队列中存储起来,只有在消费者服务启动后发送的消息才会被进行分发处理。

实现方式:设置交换机为direct,并且需要额外设置routingkey,可以将key理解为对指定信息感兴趣。



2.2、代码实操

目的(实现效果):不同类型的日志交由不同的服务处理,例如info、debug、warning交由消费者服务1处理;error交由消费者服务2处理。

image-20210925100827436

生产者:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
 * @ClassName EmitLogs
 * @Author ChangLu
 * @Date 2021/9/25 9:44
 * @Description 生产者:绑定交换机direct_LOGS,发送四个消息,每个消息匹配一个routingkeys(info、debug、error、warning)
 */
public class EmitLogs {

    //核心:指定交换机名称,需要跟之前的不一样
    private static String EXCHANGE_NAME = "direct_LOGS";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //绑定一个Direct类型交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String msg = "sended msg";
        //发送三条消息,每个消息绑定对应的routingkey,发送给指定的一个交换机
        String INFO_MESSAGE = "info-"+msg;
        channel.basicPublish(EXCHANGE_NAME,"info",null,INFO_MESSAGE.getBytes(StandardCharsets.UTF_8));
        System.out.println("成功发送消息:"+INFO_MESSAGE);
        String DEBUG_MESSAGE = "debug-"+msg;
        channel.basicPublish(EXCHANGE_NAME,"debug",null,DEBUG_MESSAGE.getBytes(StandardCharsets.UTF_8));
        System.out.println("成功发送消息:"+DEBUG_MESSAGE);
        String WARNING_MESSAGE = "warning-"+msg;
        channel.basicPublish(EXCHANGE_NAME,"warning",null,WARNING_MESSAGE.getBytes(StandardCharsets.UTF_8));
        System.out.println("成功发送消息:"+WARNING_MESSAGE);
        String ERROR_MESSAGE = "warning-"+msg;
        channel.basicPublish(EXCHANGE_NAME,"error",null,ERROR_MESSAGE.getBytes(StandardCharsets.UTF_8));
        System.out.println("成功发送消息:"+ERROR_MESSAGE);

        channel.close();
        connection.close();

    }
}

消费者1:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName ReceiveLogs
 * @Author ChangLu
 * @Date 2021/9/25 9:50
 * @Description 消费者:绑定交换机direct_LOGS,并且设置三个routingkeys:info、debug、warning
 */
public class ReceiveLogs1 {

    private static String EXCHANGE_NAME = "direct_LOGS";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //绑定指定的交换机类型——direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        //一个交换机同时绑定三个routingkeys
        channel.queueBind(queueName,EXCHANGE_NAME,"info");
        channel.queueBind(queueName,EXCHANGE_NAME,"debug");
        channel.queueBind(queueName,EXCHANGE_NAME,"warning");

        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("接收到消息:"+msg);
            }
        };
        //进行消费,需要指定队列名称
        channel.basicConsume(queueName,true,consumer);
    }
}

消费者2:大部分内容与消费者1相同,只不过就指定的routingkeys不一样而已

//绑定direct类型交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
//绑定指定routingkeys:error
channel.queueBind(queueName,EXCHANGE_NAME,"error");

测试一下:生产者发送了四个不同routingkeys的消息,两个消费者接受对应routingkeys信息

GIF



三、topic模式

3.1、基本概念

我们可对routingkey进行模糊匹配,如使用*或#来进行匹配!

*可以代替一个单词

#可以替代零个或多个单词

image-20210925105311253

我们可以对routingkeys进行.分割的不同类型区分,上面对应着消费者服务来对生产者产生的信息进行过滤,每个队列能够拿到对应交换机可模糊匹配的执行信息,来达到区分的效果!



3.2、实战

目的:有10条记录其中包含了不同类型的内容,生产者发送消息的同时每条记录都附带对应详细的routingkey;消费者则会定义指定的routingkey模糊表达式,用于进行模糊匹配拿到生产者发送过来的信息记录条数。

生产者

这里对方法进行了抽离,实际上基本配置与之前都相同。这里的话准备了一个routingkey数组用于分别描述各组信息与实体信息传递出去:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName TopicProduce
 * @Author ChangLu
 * @Date 2021/9/25 10:57
 * @Description 生产者:交换机类型为topic
 */
public class TopicProduce {

    private static String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Util.doBefore();
        Channel channel = Util.channel;
        //指定交换机为topic
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //定义关键字,不同的消息在发送时都指定routingKeys
        String[] routingKeys = new String[]{
            "quick.orange.rabbit","quick.orange.fox","quick.brown","quick.orange.male.rabbit",
            "lazy.orange.elephant","lazy.brown.fox","lazy.pink.rabbit","lazy.orange.male.rabbit",
            "orange"
        };
        for (String routingKey : routingKeys) {
            String msg = "信息内容:"+routingKey;
            //发送
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("已发送"+msg);
        }
        Util.close();
    }


    static class Util{
        private static Connection connection = null;
        public static Channel channel = null;

        public static void  doBefore() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.118.128");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("password");
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        }

        public static void close() throws IOException, TimeoutException {
            if(channel!=null){
                channel.close();
            }
            if(connection!=null){
                connection.close();
            }
        }

    }
}

消费者

消费者1:基本配置都相同,不再重复展示,匹配的routingkey为"*.orange.*"

private static String EXCHANGE_NAME = "topic_exchange";
...
//指定交换机为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//指定routingkeys,可搭配*或#进行匹配
String bindingKey = "*.orange.*";
channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);//绑定队列、交换机以及routingkey
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, StandardCharsets.UTF_8);
        System.out.println("收到消息:"+msg);
    }
});
...

消费者2:匹配的routingkey为"quick.*""lazy.orange.#""quick.orange.fox"

private static String EXCHANGE_NAME = "topic_exchange";

//指定交换机为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//指定routingkeys,可搭配*或#进行匹配
String bindingKey = "quick.*";
channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);//绑定队列、交换机以及routingkey
String bindingkey2 = "lazy.orange.#";
channel.queueBind(queueName,EXCHANGE_NAME,bindingkey2);
String bindingkey3 = "quick.orange.fox";
channel.queueBind(queueName,EXCHANGE_NAME,bindingkey3);
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, StandardCharsets.UTF_8);
        System.out.println("收到消息:"+msg);
    }
});

...

测试:可以看到对应的消费者服务都能够进行精确匹配拿到对应的信息

GIF



精炼总结

直接指定队列形式:消费者服务没有启动前发送的消息,在消费者服务启动后仍然能够收到,因为此前发送的消息被存储到队列中。

  • 就是一、二章节中的案例。

交换机类型:第三章节案例

  • fanout:①交换机发送的消息都一致的发送给现有的队列中。(每个队列都能够收到相同的消息)②在使用fanout类型交换机的消费者服务不会接收服务启动前生产者发送的信息,也就是说启动前发送的消息都会被丢弃掉
    • 应用场景:相同数量及内容多个消费者都能够收到,分别对所有内容进行不同的处理。
  • direct:①交换机在发送的时候带上routingkey,消息队列在接收时也指定routingkey即可拿到对应交换机想要发送给某个队列的信息,来达到直接准确的发送!②消费者服务启动前,生产者发送给指定交换机的信息会被丢弃
    • 应用场景:不同类型的日志记录进行不同处理,如窗口执行、磁盘存储。
  • topic模式(与direct是兄弟关系,或升级关系):①根据严重性,源头去考虑。消费者服务可以进行模糊匹配(*号代替一个单词,#替代零个或多个单词)。交换机可以设置多个routingkey,对应的接收队列可以进行模糊匹配。②②消费者服务启动前,生产者发送给指定交换机的信息不会被丢弃
  • headers:根据发送消息内容中的headers属性来匹配。(实际基本不会使用)


整理者:长路 时间:2021.9.25
类型交换机的消费者服务不会接收服务启动前生产者发送的信息,也就是说启动前发送的消息都会被丢弃掉

  • 应用场景:相同数量及内容多个消费者都能够收到,分别对所有内容进行不同的处理。
  • direct:①交换机在发送的时候带上routingkey,消息队列在接收时也指定routingkey即可拿到对应交换机想要发送给某个队列的信息,来达到直接准确的发送!②消费者服务启动前,生产者发送给指定交换机的信息会被丢弃
    • 应用场景:不同类型的日志记录进行不同处理,如窗口执行、磁盘存储。
  • topic模式(与direct是兄弟关系,或升级关系):①根据严重性,源头去考虑。消费者服务可以进行模糊匹配(*号代替一个单词,#替代零个或多个单词)。交换机可以设置多个routingkey,对应的接收队列可以进行模糊匹配。②②消费者服务启动前,生产者发送给指定交换机的信息不会被丢弃
  • headers:根据发送消息内容中的headers属性来匹配。(实际基本不会使用)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。