【愚公系列】2022年03月 .NET架构班 024-分布式中间件 RabbitMQ多场景使用情况分析

举报
愚公搬代码 发表于 2022/03/14 23:59:40 2022/03/14
【摘要】 一、RabbitMQ多场景使用情况分析 1.一个生产者/多个消费者(全部消费) 1.1 生产者// <summary>/// 创建商品/// </summary>/// <param name="productCreateDto"></param>/// <returns></returns>[HttpPost]public IEnumerable<Product> CreateProdu...

一、RabbitMQ多场景使用情况分析

1.一个生产者/多个消费者(全部消费)

1.1 生产者

// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
	 #region 2、扇形交换机
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
            using (var connection = factory.CreateConnection())
            {
                var channel = connection.CreateModel();
                // 2、定义交换机
                channel.ExchangeDeclare(exchange: "product_fanout", type: "fanout");

                string productJson = JsonConvert.SerializeObject(productCreateDto);
                // string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(productJson);

                // 3、发送消息
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true; // 设置消息持久化
                channel.BasicPublish(exchange: "product_fanout",
                                     routingKey: "",
                                     basicProperties: properties,
                                     body: body);
            }
            _logger.LogInformation("成功创建商品");
        }
        #endregion
}

1.2 消费者

// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
	 // 1、创建连接
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            Password = "guest",
            UserName = "guest",
            VirtualHost = "/"
        };
        var connection = factory.CreateConnection();
       
        #region 6、订阅发布(广播消费)1、创建商品----2、发送短信-扇形交换机
        {
            var channel = connection.CreateModel();

            // 1、定义交换机
            channel.ExchangeDeclare(exchange: "product_fanout", type: "fanout");

            // 2、定义随机队
            var queueName = channel.QueueDeclare().QueueName;

            // 3、队列要和交换机绑定起来
            channel.QueueBind(queueName,
                                 "product_fanout",
                                 routingKey: "");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                Console.WriteLine($"model:{model}");
                var body = ea.Body;
                // 1、业务逻辑
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine(" [x] 创建商品 {0}", message);

                // 自动确认机制缺陷:
                // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
                channel.BasicAck(ea.DeliveryTag, true);
            };
            // 3、消费消息
            channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
                                           // 每一次一个消费者只成功消费一个)
            channel.BasicConsume(queue: queueName,
                                 autoAck: false, // 消息确认(防止消息消费失败)
                                 consumer: consumer);
        }
        #endregion
}
/// <summary>
/// 发送短信
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
    // 1、创建连接
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    var connection = factory.CreateConnection();
    var channel = connection.CreateModel();

    // 1、定义交换机
    channel.ExchangeDeclare(exchange: "product_fanout", type: ExchangeType.Fanout);

    // 2、定义随机队列
    var queueName = channel.QueueDeclare().QueueName;

    // 3、队列要和交换机绑定起来
    channel.QueueBind(queueName,
                          "product_fanout",
                          routingKey: "");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        Console.WriteLine($"model:{model}");
        var body = ea.Body;
        // 1、业务逻辑
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine(" [x] 发送短信 {0}", message);

        // 自动确认机制缺陷:
        // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
        channel.BasicAck(ea.DeliveryTag, true);
    };
    // 3、消费消息
    channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
                                   // 每一次一个消费者只成功消费一个)
    channel.BasicConsume(queue: queueName,
                         autoAck: false, // 消息确认(防止消息消费失败)
                         consumer: consumer);
}

1.3 原理分析

扇形交换机fanout实现的就是订阅发布,生产者把消息发给给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给所有队列----->发送给消费者。

2.一个生产者/多个消费者(选择消费)

2.1 生产者

// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
        #region 3、直连交换机
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
            using (var connection = factory.CreateConnection())
            {
                var channel = connection.CreateModel();
                // 2、定义交换机
                channel.ExchangeDeclare(exchange: "product_direct", type: "direct");

                string productJson = JsonConvert.SerializeObject(productCreateDto);
                // string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(productJson);

                // 3、发送消息
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true; // 设置消息持久化
                channel.BasicPublish(exchange: "product_direct",
                                     routingKey: "product-eamil",
                                     basicProperties: properties,
                                     body: body);
            }
            _logger.LogInformation("成功创建商品");
        }
        #endregion
}

2.2 消费者

// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
	 // 1、创建连接
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            Password = "guest",
            UserName = "guest",
            VirtualHost = "/"
        };
        var connection = factory.CreateConnection();
       
        #region 7、创建商品----2、发送短信或者发送邮件--直连交换机
        {
            // 工具:直连交换机 type:direct
            var channel = connection.CreateModel();
            // 1、定义交换机
            channel.ExchangeDeclare(exchange: "product_direct",
                                type: "direct");
           *//* // 2、定义随机队列(用完之后立马删除)
            var queueName = channel.QueueDeclare().QueueName;

            // 3、队列要和交换机绑定起来
            channel.QueueBind(queueName,
                                 "product_direct",
                                 routingKey: "product-sms");*//*

            // 2、定义随机队列(用完之后立马删除)
            var queueName1 = channel.QueueDeclare().QueueName;

            // 3、队列要和交换机绑定起来
            channel.QueueBind(queueName1,
                                 "product_direct",
                                 routingKey: "product-eamil");


            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                Console.WriteLine($"model:{model}");
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                // Thread.Sleep(1000);
                Console.WriteLine(" [x] 创建商品 {0}", message);
            };
            // 3、消费消息
            channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
                                           // 每一次一个消费者只成功消费一个)
            channel.BasicConsume(queue: queueName1,
                                 autoAck: true, // 消息确认(防止消息消费失败)
                                 consumer: consumer);
        }
        #endregion
}
/// <summary>
/// 发送短信
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
    // 1、创建连接
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    var connection = factory.CreateConnection();
    var channel = connection.CreateModel();

    #region 1、直连交换机
    {
       // 1、创建连接
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            Password = "guest",
            UserName = "guest",
            VirtualHost = "/"
        };
        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();

        // 1、定义交换机
        channel.ExchangeDeclare(exchange: "product_direct", type: ExchangeType.Direct);

        // 2、定义随机队列
        var queueName = channel.QueueDeclare().QueueName;

        // 3、队列要和交换机绑定起来
        channel.QueueBind(queueName,
                              "product_direct",
                              routingKey: "product-sms");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Console.WriteLine($"model:{model}");
            var body = ea.Body;
            // 1、业务逻辑
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(" [x] 发送短信 {0}", message);

            // 自动确认机制缺陷:
            // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
            channel.BasicAck(ea.DeliveryTag, true);
        };
        // 3、消费消息
        channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
                                       // 每一次一个消费者只成功消费一个)
        channel.BasicConsume(queue: queueName,
                             autoAck: false, // 消息确认(防止消息消费失败)
                             consumer: consumer);
    }
    #endregion
}

2.3 原理分析

直连交换机direct实现的就是指定订阅发布,生产者把消息发送给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给指定队列----->发送给消费者

3.多个生产者/一个消费者(选择消费)

3.1 生产者

// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
        #region 4、主题交换机
        {
           var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
            using (var connection = factory.CreateConnection())
            {
                var channel = connection.CreateModel();
                // 2、定义交换机
                channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");

                string productJson = JsonConvert.SerializeObject(productCreateDto);
                // string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(productJson);

                // 3、发送消息
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true; // 设置消息持久化
                channel.BasicPublish(exchange: "sms_topic",
                                     routingKey: "sms.product.update",
                                     basicProperties: properties,
                                     body: body);
            }
            _logger.LogInformation("成功创建商品");
        }
        #endregion
}

3.2 消费者

// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
	 #region 4、主题交换机
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
            using (var connection = factory.CreateConnection())
            {
                var channel = connection.CreateModel();
                // 2、定义交换机
                channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");

                string orderJson = JsonConvert.SerializeObject(orderCreateDto);
                // string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(orderJson);

                // 3、发送消息
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true; // 设置消息持久化
                channel.BasicPublish(exchange: "sms_topic",
                                     routingKey: "sms.order",
                                     basicProperties: properties,
                                     body: body);
            }
            _logger.LogInformation("成功创建订单");
        }
        #endregion
}
/// <summary>
/// 发送短信
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
    // 1、创建连接
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    var connection = factory.CreateConnection();
    var channel = connection.CreateModel();

     #region 2、主题交换机
    {
        // 1、创建连接
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            Password = "guest",
            UserName = "guest",
            VirtualHost = "/"
        };
        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();

        // 1、定义交换机
        channel.ExchangeDeclare(exchange: "sms_topic", type: ExchangeType.Topic);

        // 2、定义随机队列
        var queueName = channel.QueueDeclare().QueueName;

        // 3、队列要和交换机绑定起来。多对1
        // * 号的缺陷:只能匹配一级
        // # 能够匹配一级及多级以上
        // 真实项目当中,使用主题交换机。因为:可以满足所有场景
        channel.QueueBind(queueName,
                              "sms_topic",
                              routingKey: "sms.#");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Console.WriteLine($"model:{model}");
            var body = ea.Body;
            // 1、业务逻辑
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(" [x] 发送短信 {0}", message);

            // 自动确认机制缺陷:
            // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
            channel.BasicAck(ea.DeliveryTag, true);
        };
        // 3、消费消息
        channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
                                       // 每一次一个消费者只成功消费一个)
        channel.BasicConsume(queue: queueName,
                             autoAck: false, // 消息确认(防止消息消费失败)
                             consumer: consumer);
    }
    #endregion
}

3.3 原理分析

主题交换机实现的就是不同生产产匹配到相同消费者,生产者把消息发送给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给指定队列----->发送给消费者

4.一个生产者/一个消费者(回调消费)

4.1 生产者

// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
        #region 3RPC回调来实现
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
            var connection = factory.CreateConnection();
            
                var channel = connection.CreateModel();
                // 2、定义队列
                string replyQueueName = channel.QueueDeclare().QueueName;

                var properties = channel.CreateBasicProperties();
                var correlationId = Guid.NewGuid().ToString();
                properties.CorrelationId = correlationId;
                properties.ReplyTo = replyQueueName;

                // 3、发送消息
                string productJson = JsonConvert.SerializeObject(productCreateDto);
                // string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(productJson);
                properties.Persistent = true; // 设置消息持久化
                channel.BasicPublish(exchange: "",
                                     routingKey: "product_create2",
                                     basicProperties: properties,
                                     body: body);

                // 4、消息回调
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Console.WriteLine($"model:{model}");
                    var body = ea.Body;
                    // 1、业务逻辑处理
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {
                        Console.WriteLine(" [x] 回调成功 {0}", message);
                    }

                };
                // 3、消费消息
                // channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
                // 每一次一个消费者只成功消费一个)
                channel.BasicConsume(queue: replyQueueName,
                                     autoAck: true, // 消息确认(防止消息消费失败)
                                     consumer: consumer);

            _logger.LogInformation("成功创建商品");
        }
        #endregion
}

4.2 消费者

/// <summary>
/// 创建商品
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<Product> CreateProdcuts()
{
    // 1、创建连接
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    var connection = factory.CreateConnection();
    var channel = connection.CreateModel();

    #region 9、创建商品-----回调-RPC
    {
        // 工具:直连交换机 type:direct
        var channel = connection.CreateModel();

        // 1、定义随机队列(用完之后立马删除)
        var queueName = channel.QueueDeclare(queue: "product_create2",
                                             durable: false,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Console.WriteLine($"model:{model}");
            var body = ea.Body;

            var props = ea.BasicProperties;
            var replyProps = channel.CreateBasicProperties();
            replyProps.CorrelationId = props.CorrelationId;

            try
            {
                // 1、执行业务
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine(" [x] 创建商品 {0}", message);
            }
            catch (Exception e)
            {
                Console.WriteLine(" [.] " + e.Message);
            }
            finally
            {
                Console.WriteLine("发送回调消息");
                var responseBytes = Encoding.UTF8.GetBytes("商品回调成功");
                channel.BasicPublish(exchange: "",
                                    routingKey: props.ReplyTo,
                                    basicProperties: replyProps,
                                    body: responseBytes);
                /*channel.BasicAck(deliveryTag: ea.DeliveryTag,
                  multiple: false);*/
            }
        };
        // 3、消费消息
       // channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
                                       // 每一次一个消费者只成功消费一个)
        channel.BasicConsume(queue: "product_create2",
                             autoAck: true, // 消息确认(防止消息消费失败)
                             consumer: consumer);
    }
    #endregion
}

4.3 原理分析

RabbitMQ中实现RPC的机制是:

客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败);

服务器端收到消息并处理;

服务器端处理完消息后0,0将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性;

客户端之前已订阅replyTo指 定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。