RabbitMQ图文快速入门

举报
LoneWalker、 发表于 2023/08/27 16:33:49 2023/08/27
【摘要】 RabbitMQ图文快速入门

什么是RabbitMQ?

RabbitMQ是一个消息代理 , 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。

RabbitMQ架构图

image.gifimage.gif20210408193412801.png

基本概念

Connection

Connection是物理TCP连接。Connection将应用与RabbitMQ连接在一起。Connection会执行认证、IP解析、路由等底层网络任务。应用与RabbitMQ版完成Connection建立大约需要15个TCP报文交互,因而会消耗大量的网络资源和RabbitMQ资源。

image.gif01.png

Channel

Channel是物理TCP连接中的虚拟连接。当应用通过Connection与RabbitMQ建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。Channel可以复用Connection,即一个Connection下可以建立多个Channel。Channel不能脱离Connection独立存在,而必须存活在Connection中。当某个Connection断开时,该Connection下的所有Channel都会断开。当大量应用需要与消息队列RabbitMQ版建立多个连接时,建议使用Channel来复用Connection,从而减少网络资源和RabbitMQ资源消耗。

image.gif02.png

Queue

队列(Queue)存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,但是队列也有一些另外的属性。

    • Name
    • Durable(消息代理重启后,队列依旧存在)
    • Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
    • Auto-delete(当最后一个消费者退订后即被删除)
    • Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)

    队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。

    Exchange

    Exchange是消息队列RabbitMQ的消息路由代理。生产者向RabbitMQ发送消息时,不会直接将消息发送到Queue,而是先将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue。Exchange根据Binding Key、Routing Key以及Headers属性路由消息。

    交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机

    Name(交换机类型) Default pre-declared names(预声明的默认名称)
    Direct exchange(直连交换机) (Empty string) and amq.direct
    Fanout exchange(扇型交换机) amq.fanout
    Topic exchange(主题交换机) amq.topic
    Headers exchange(头交换机) amq.match (and amq.headers in RabbitMQ)

    除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

    • Name
    • Durability (消息代理重启后,交换机是否还存在)
    • Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
    • Arguments(依赖代理本身)

    交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。

    交换机类型

    Direct Exchange

    直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。

      • 路由规则 Direct Exchange根据Binding Key和Routing Key完全匹配的规则路由消息。
      • 使用场景 Direct Exchange适用于通过简单字符标识符区分消息的场景。Direct Exchange常用于单播路由。

      image.gif03.png

      Fanout Exchange

      扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。

        • 路由规则 Fanout Exchange忽略Routing Key和Binding Key的匹配规则将消息路由到所有绑定的Queue。
        • 使用场景 Fanout Exchange适用于广播消息的场景。例如,分发系统使用Fanout Exchange来广播各种状态和配置更新。

        image.gif04.png

        Topic Exchange

        主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。

          • 路由规则 Topic Exchange根据Binding Key和Routing Key通配符匹配的规则路由消息。Topic Exchange支持的通配符包括星号(*)和井号(#)。星号(*)代表一个英文单词(例如cn)。井号(#)代表零个、一个或多个英文单词,英文单词间通过英文句号(.)分隔,例如cn.zj.hz。
          • 使用场景 Topic Exchange适用于通过通配符区分消息的场景。Topic Exchange常用于多播路由。例如,使用Topic Exchange分发有关于特定地理位置的数据。

          Headers Exchange

          头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

            • 路由规则 Headers Exchange可以被视为Direct Exchange的另一种表现形式。Headers Exchange可以像Direct Exchange一样工作,不同之处在于Headers Exchange使用Headers属性代替Routing Key进行路由匹配。您在绑定Headers Exchange和Queue时,可以设置绑定属性的键值对。然后,在向Headers Exchange发送消息时,设置消息的Headers属性键值对。Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值:
                • all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。
                • any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。
                • 以下两种情况下,认为消息Headers属性键值对和绑定属性键值对匹配:
                    • 消息Headers属性的键和值与绑定属性的键和值完全相同。
                    • 消息Headers属性的键和绑定属性的键完全相同,但绑定属性的值为空。
                    • 使用场景 Headers Exchange适用于通过多组Headers属性区分消息的场景。Headers Exchange常用于多播路由。例如,涉及到分类或者标签的新闻更新。

                    RabbitMQ的通讯方式

                    1、"Hello World!"

                    image.gif05.png

                    2、Work Queues

                    image.gif06.png

                    3、Publish / Subscribe

                    image.gif07.png

                    4、Routing

                    image.gif08.png

                    5、Topics

                    image.gif09.png

                    6、RPC

                    image.gif010.png

                    AMQP协议相关知识

                    AMQP协议

                    AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。

                    消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上。

                    AMQP模型

                    消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

                    image.gif011.png

                    从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此原因,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当一个消息从队列中投递给消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。

                    通过JAVA连接RabbitMQ

                    新增Vhost

                    image.gif012.png

                    先在图形化界面中新增一个用户,角色填administrator    退出切换到test用户登录

                    image.gif013.png

                    点击test用户名,进入页面后选择刚刚创建的vhost

                    image.gif014.png

                    pom

                    <dependency>
                                <groupId>com.rabbitmq</groupId>
                                <artifactId>amqp-client</artifactId>
                                <version>5.9.0</version>
                            </dependency>

                    image.gif

                    RabbitMQClient

                    public class RabbitMQClient {
                        public static Connection GetConnection(){
                            ConnectionFactory factory = new ConnectionFactory();
                            //指定IP和端口号
                            factory.setHost("192.168.0.108");
                            factory.setPort(5672);
                            //用户名和密码
                            factory.setUsername("test");
                            factory.setPassword("test");
                            //指定vHost
                            factory.setVirtualHost("/test");
                            Connection connection = null;
                            try {
                                connection = factory.newConnection();
                            } catch (IOException e) {
                                e.printStackTrace();
                            } catch (TimeoutException e) {
                                e.printStackTrace();
                            }
                            return connection;
                        }
                    }

                    image.gif

                    接下来debug测试一下

                    image.gif016.png

                    图形化界面也显示已经连接了

                    image.gif017.png

                    入门就到这里,其他操作请关注后续博文

                    【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
                    • 点赞
                    • 收藏
                    • 关注作者

                    评论(0

                    0/1000
                    抱歉,系统识别当前为高风险访问,暂不支持该操作

                    全部回复

                    上滑加载中

                    设置昵称

                    在此一键设置昵称,即可参与社区互动!

                    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

                    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。