RabbitMQ之Federation Exchange、Federation Queue、Shovel

举报
别团等shy哥发育 发表于 2023/01/10 21:53:11 2023/01/10
【摘要】 @[toc] 1、Federation Exchange(联邦交换机) 1.1 为什么使用联邦交换机  (broker 北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京 的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小, (Client 北京)可以迅速将消息发送至 exc...

@[toc]

1、Federation Exchange(联邦交换机)

1.1 为什么使用联邦交换机

  (broker 北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京 的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小, (Client 北京)可以迅速将消息发送至 exchangeA 中,就算在开启了 publisherconfirm 机制或者事务机制的 情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client 深圳)需要向 exchangeA 发送消息, 那么(Client 深圳) (broker 北京)之间有很大的网络延迟,(Client 深圳) 将发送消息至 exchangeA 会经历一 定的延迟,尤其是在开启了 publisherconfirm 机制或者事务机制的情况下,(Client 深圳) 会等待很长的延 迟时间来接收(broker 北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的 阻塞。

  将业务(Client 深圳)部署到北京的机房可以解决这个问题,但是如果(Client 深圳)调用的另些服务都部 署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现? 这里使用 Federation 插件就可以很好地解决这个问题.

image-20211228104716030

1.2 搭建步骤

1.2.1 需要保证每台节点单独运行

1.2.2 在每台机器上开启federation相关插件

rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_managemen

装完之后看下管理界面

image-20211228104904358

1.2.3 原理图(先运行consumer在node2创建fed_exchange)

消费者代码:

public class Consumer {

    //队列的名称
    public static final String QUEUE_NAME="mirrior_hello";
    //交换机的名称
    public static final String FED_EXCHANGE="fed_exchange";
    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.159.34");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(FED_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.queueDeclare("node2_queue",true,false,false,null);
        channel.queueBind("node2_queue",FED_EXCHANGE,"routeKey");

        //声明 接收消息
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println(new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback=consumerTag -> {
            System.out.println("消息消费被中断");
        };


        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答
         * 3.消费者成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

1.2.4 在downstream(node2)配置upstream(node1)

image-20211228105308071

1.2.5 添加policy

image-20211228105345952

查看下是否搭建成功,点击Federation Status

image-20211228105426346

2、Federation Queue(联邦队列)

2.1 为什么使用联邦队列

  联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以 连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息 的需求。

2.2 搭建步骤

2.2.1 原理图

image-20211228105514723

2.2.2 添加upstream

这一步和1.2.4一致

image-20211228105308071

2.2.3 添加policy

image-20211228105700743

3、Shovel

3.1 为什么使用Shovel

  Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即 source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作 为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为"铲子", 是一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel 行为就像优秀的客户端应用 程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。

3.2 搭建步骤

3.2.1 开启的插件(需要的机器都开启)

rabbitmq-plugins enable rabbitmq_shovel

image-20211228105852548

rabbitmq-plugins enable rabbitmq_shovel_management

image-20211228105911013

3.2.2 原理图(在源头发送的消息直接会进入到目的地队列)

image-20211228105953134

3.2.3 添加shovel源和目的地

image-20211228110016947

检查下是否搭建成功

image-20211228110049591

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。