RabbitMQ Routing(路由)工作模式

举报
兰舟千帆 发表于 2022/07/19 22:56:13 2022/07/19
【摘要】 RabbitMQ Routing(路由)工作模式 路由的这种模式和发布订阅的模式相比多了一个路由的环节。要求队列在绑定在绑定到交换机的时候指定到对应的路由。路由的作用是什么呢?作为一种分发的规...

RabbitMQ Routing(路由)工作模式

路由的这种模式和发布订阅的模式相比多了一个路由的环节。要求队列在绑定在绑定到交换机的时候指定到对应的路由。路由的作用是什么呢?作为一种分发的规则。可以按照相应的条件指定和分发。

官方还是很形象的给出了我们的模型图。

在这里插入图片描述
这里对日志的级别采用了不同的处理方式。分别是error,info,warninng。其实日志移动有五种这里给出了三种日志举例的处理方式。

DEBUG:详细的信息,通常只出现在诊断问题上
INFO:确认一切按预期运行
WARNING:一个迹象表明,一些意想不到的事情发生了,或表明一些问题在不久的将来(例如。磁盘空间低”)。这个软件还能按预期工作。
ERROR:更严重的问题,软件没能执行一些功能
CRITICAL:一个严重的错误,这表明程序本身可能无法继续运行

运行出现的日志info,error,warning会路由到一个队列,然后单独的error也会路由到一个队列。

其实还是比较简单的,我们来看具体的代码实现。

那我们还需要一个生产者首先,来看代码。

package com.jgdabc.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//完成发送消息
public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {

//       1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //       2 设置连接参数
        connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
        connectionFactory.setPort(5672); //消息端口 默认5672
        connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
        connectionFactory.setUsername("jgdabc");//用户名 默认guest
        connectionFactory.setPassword("123456");//密码 默认guest

//        3创建连接connection
        Connection connection = connectionFactory.newConnection();
//       4 创建Channel
        Channel channel = connection.createChannel();
//        5:创建交换机
//        exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
//      1:exchange : 交换机名称
//       2: type:交换机的类型 BuiltinExchangeType代表枚举类型
//        Direct("direct"):定向
//        FANOUT("fanout") : 扇形广播,发送消息到每一个与之绑定的队列
//        TOPIC("topic") 通配符的方式
//        HEADERS("headers")
//        3:durable是否持久化
//        4:autoDelete:是否自动删除
//        5:internal 内部使用
//        6Larguments:参数列表
        String exchangeName = "test_direct";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//        6:创建队列
        String queueName = "test_direct_queue";
        String queueName01 = "test_direct_queue01";
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName01,true,false,false,null);

//          queueBind(String queue, String exchange, String routingKey)
//        7 绑定队列和交换机
//        queue:绑定的队列名称
//        exchange:交换机名称
//        routingKey:路由键,绑定规则
//        如果倦鸟还击的类型为fanout,routingKey设置为“”
         channel.queueBind(queueName,exchangeName,"error");
         channel.queueBind(queueName,exchangeName,"info");
         channel.queueBind(queueName,exchangeName,"warning");

         channel.queueBind(queueName01,exchangeName,"error");
//        8:发送消息
        String body = "日志信息。张三调用了findAll方法,日志级别info....";
        channel.basicPublish(exchangeName,"info",null,body.getBytes());

//        9:释放资源
        channel.close();
        connection.close();


    }
}



  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

说明一次处理逻辑,我们这回的交换机需要指定为DIRECT,也就是一种定向的模式,很明显我们是定向分配的,如果你要是广播的模式的话,那么路由就不能做成定向的。
这是一点。

在这里插入图片描述
抽出来说明一些,下面这段是创建交换机以及创建出队列

        String exchangeName = "test_direct";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//        6:创建队列
        String queueName = "test_direct_queue";
        String queueName01 = "test_direct_queue01";
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName01,true,false,false,null);

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

下面这段是进行一个路由的绑定

 channel.queueBind(queueName,exchangeName,"error");
 channel.queueBind(queueName,exchangeName,"info");
 channel.queueBind(queueName,exchangeName,"warning");
 channel.queueBind(queueName01,exchangeName,"error");


  
 
  • 1
  • 2
  • 3
  • 4
  • 5

下面是我们发送消息,body是要发送的消息。这里指定了交换机和路由,这是一个info级别的日志,所以不会发送到单独绑定error的队列。当然你在这里可以更改。只是举个例子。

   
 String body = "日志信息。张三调用了findAll方法,日志级别info....";
 channel.basicPublish(exchangeName,"info",null,body.getBytes());


  
 
  • 1
  • 2
  • 3
  • 4

然后呢,然后来看消费者的代码,我们只说明一个,另一个是一样的道理。

消费者一:

package com.jgdabc.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {

//       1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //       2 设置连接参数
        connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
        connectionFactory.setPort(5672); //消息端口 默认5672
        connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
        connectionFactory.setUsername("jgdabc");//用户名 默认guest
        connectionFactory.setPassword("123456");//密码 默认guest

//        3创建连接connection
        Connection connection = connectionFactory.newConnection();
//       4 创建Channel
        Channel channel = connection.createChannel();
        String queueName = "test_direct_queue";
        String queueName01 = "test_direct_queue01";

        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName01,true,false,false,null);


//        发送消息

//5创建队列
//        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
//        durable : 是否持久化 :当mq 重启数据还在
//        exclusive : 是否独占,只能有一个消费者监听这队列
//                      当connection关闭时候,是否删除队列
//        autoDelete:是否自动删除,当没有Consumer时候,是否自动删除

//         basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
//        参数说明
//        queue: 队列名称
//        autoAck : 是否自动确认
//        callback: 回调函数
        DefaultConsumer consumer = new DefaultConsumer(channel) {
//            回调方法,当收到消息后会自动执行该方法
//            consumerTag:消息表示
//            ebvelop:获取一些信息,交换机的信息,路由等等
//            properties:配置信息
//            body:数据
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                  System.out.println("body:"+new String(body));
                  System.out.println("将日志信息打印到控制台。。。");
            }
        };
        channel.basicConsume(queueName,true,consumer);
//
    }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

消费者二:

package com.jgdabc.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {

//       1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //       2 设置连接参数
        connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
        connectionFactory.setPort(5672); //消息端口 默认5672
        connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
        connectionFactory.setUsername("jgdabc");//用户名 默认guest
        connectionFactory.setPassword("123456");//密码 默认guest

//        3创建连接connection
        Connection connection = connectionFactory.newConnection();
//       4 创建Channel
        Channel channel = connection.createChannel();
        String queueName = "test_direct_queue";
        String queueName01 = "test_direct_queue01";

//        发送消息

//5创建队列
//        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
//        durable : 是否持久化 :当mq 重启数据还在
//        exclusive : 是否独占,只能有一个消费者监听这队列
//                      当connection关闭时候,是否删除队列
//        autoDelete:是否自动删除,当没有Consumer时候,是否自动删除

        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName01,true,false,false,null);
//        接收消息
//         basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
//        参数说明
//        queue: 队列名称
//        autoAck : 是否自动确认
//        callback: 回调函数
        DefaultConsumer consumer = new DefaultConsumer(channel) {
//            回调方法,当收到消息后会自动执行该方法
//            consumerTag:消息表示
//            ebvelop:获取一些信息,交换机的信息,路由等等
//            properties:配置信息
//            body:数据
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
                  System.out.println("body:"+new String(body));
                  System.out.println("将信息存储到数据库。。。");
            }
        };
        channel.basicConsume(queueName01,true,consumer);
//
    }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

取来一个说明。这里面有一些多余的代码啊!不知道你有没有发现。
生产者做过的事情消费者没必要做了。队列都创建过了。这里这段在消费者里的代码可以删除掉,不过并不影响,


        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName01,true,false,false,null);

  
 
  • 1
  • 2
  • 3

这里是一个处理逻辑
在这里插入图片描述
这里代表监听的队列。

 channel.basicConsume(queueName,true,consumer);

  
 
  • 1

测试
在这里插入图片描述
在这里插入图片描述
你看这里只有一个 消费者获取到了消息在这里插入图片描述
获取到消息的做出响应的逻辑处理,没获取到消息的则不会进行处理。

这就是路由。

文章来源: daodaozi.blog.csdn.net,作者:兰舟千帆,版权归原作者所有,如需转载,请联系作者。

原文链接:daodaozi.blog.csdn.net/article/details/125859102

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。