【愚公系列】2022年03月 .NET架构班 023-分布式中间件 RabbitMQ多场景使用问题分析

举报
愚公搬代码 发表于 2022/03/14 00:46:19 2022/03/14
【摘要】 一、微服务场景下的RabbitMQRabbitMQ主要用在分布式系统中,主要是应用在微服务系统中。在微服务系统中,微服务之间通信,主要是通过Http或者gRPC通信。由于http/gRPC通信方式是同步通信,如果遇到了高并发,同步通信就会导致微服务系统性能瓶颈,所以,为了解决微服务性能瓶颈问题。需要将同步通信换成异步通信方式。因此选用消息队列就是个很好的选择。RabbitMQ是消息队列中的...

一、微服务场景下的RabbitMQ

RabbitMQ主要用在分布式系统中,主要是应用在微服务系统中。在微服务系统中,微服务之间通信,主要是通过Http或者gRPC通信。由于http/gRPC通信方式是同步通信,如果遇到了高并发,同步通信就会导致微服务系统性能瓶颈,所以,为了解决微服务性能瓶颈问题。需要将同步通信换成异步通信方式。因此选用消息队列就是个很好的选择。

RabbitMQ是消息队列中的一种。

二、微服务场景下RabbitMQ的使用

1.RabbitMQ的安装

RabbitMQ的安装可以参考本人其他文章

window11下安装:https://blog.csdn.net/aa2528877987/article/details/107831477

Docker下安装:https://blog.csdn.net/aa2528877987/article/details/122802713

2.RabbitMQ的使用

nuget引入 RabbitMQ.Client

2.1 基本使用

生产者

/// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
    #region 1、生产者
    {
        // 1、创建连接工厂
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            Password = "guest",
            UserName = "guest",
            VirtualHost = "/"
        };
        using (var connection = factory.CreateConnection())
        {
            var channel = connection.CreateModel();
            // 2、定义队列
            channel.QueueDeclare(queue: "product-create",
                                 durable: false,// 消息持久化(防止rabbitmq宕机导致队列丢失风险)
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            string productJson = JsonConvert.SerializeObject(productCreateDto);
            // string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(productJson);

            // 3、发送消息
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true; // 设置消息持久化(个性化控制)
            channel.BasicPublish(exchange: "",
                                 routingKey: "product-create",
                                 basicProperties: properties,
                                 body: body);
        }
        _logger.LogInformation("成功创建商品");
    }
    #endregion
}  

消费者

[HttpPost]
public IEnumerable<Product> CreateProdcuts()
{
    // 1、创建连接
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    var connection = factory.CreateConnection();
    #region 1、工作队列(单消费者)
    {
        var channel = connection.CreateModel();

        // 2、定义队列
        channel.QueueDeclare(queue: "product-create",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {

            Console.WriteLine($"model:{model}");
            var body = ea.Body;
            // 1、逻辑代码,添加商品到数据库
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(" [x] 创建商品 {0}", message);
        };

        channel.BasicConsume(queue: "product-create",
                             autoAck: false, 
                             consumer: consumer);
    }
    #endregion
}

2.1.1 消费端挂了(自动确认)

问题:RabbitMQ给微服务发消息期间,微服务宕机。导致消息丢失

解决方案:autoAck自动确认机制修改为true

[HttpPost]
public IEnumerable<Product> CreateProdcuts()
{
    // 1、创建连接
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    var connection = factory.CreateConnection();
    #region 1、工作队列(单消费者)
    {
        var channel = connection.CreateModel();
   // 2、定义队列
    channel.QueueDeclare(queue: "product-create",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {

        Console.WriteLine($"model:{model}");
        var body = ea.Body;
        // 1、逻辑代码,添加商品到数据库
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine(" [x] 创建商品 {0}", message);
    };

    channel.BasicConsume(queue: "product-create",
                         autoAck: true, // 消息自动确认机制
                         consumer: consumer);
}
#endregion
}

2.1.2 消费端挂了(手动确认)

问题:rabbitmq给微服务发了消息,微服务收到消息,微服务发送确认消息给rabbitmq期间,执行业务逻辑失败了。

解决方案:手动确认

  [HttpPost]
    public IEnumerable<Product> CreateProdcuts()
    {
        // 1、创建连接
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            Password = "guest",
            UserName = "guest",
            VirtualHost = "/"
        };
        var connection = factory.CreateConnection();
        #region 1、工作队列(单消费者)
        {
            var channel = _connection.CreateModel();

                // 2、定义队列
                channel.QueueDeclare(queue: "product-create",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {

                    Console.WriteLine($"model:{model}");
                    var body = ea.Body;
                    // 1、逻辑代码
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine(" [x] 创建商品 {0}", message);

                    // 自动确认机制缺陷:
                    // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
                    channel.BasicAck(ea.DeliveryTag, true);
                };
                channel.BasicConsume(queue: "product-create",
                                     autoAck: false, // 消息确认(防止消息重新消费)
                                     consumer: consumer);
    }
    #endregion
}

2.1.3 队列消息堆积(多个消费端)

问题:网站发送高并发消息,导致商品微服务来不及处理,导致消息堆积

解决方案:微服务集群消费
在这里插入图片描述
在这里插入图片描述

2.1.4 队列消息堆积(消费端堆积)

问题:微服务集群缺陷:无法控制集群实例的强弱。如果5007比较强,5006弱,就会导致消息大部分堆积在5006

解决方案:使用qos

[HttpPost]
public IEnumerable<Product> CreateProdcuts()
{
    // 1、创建连接
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    var connection = factory.CreateConnection();
    #region 1、工作队列(单消费者)
    {
        var channel = connection.CreateModel();

            // 2、定义队列
            channel.QueueDeclare(queue: "product-create",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                Console.WriteLine($"model:{model}");
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine(" [x] 创建商品 {0}", message);

                // 自动确认机制缺陷:
                // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
                channel.BasicAck(ea.DeliveryTag, true);
            };
            // 3、消费消息
            channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
                                           // 每一次一个消费者只成功消费一个)
            channel.BasicConsume(queue: "product-create",
                                 autoAck: false, // 消息确认(防止消息消费失败)
                                 consumer: consumer);
	}
	#endregion
}

2.1.5 RabbitMQ挂了

问题:网站给RabbitMQ发送消息成功后,如果RabbitMQ宕机了,会导致RabbitMQ中消息丢失!如何解决消息丢失问题

解决方案:消息持久化机制

    #region 1、生产者
    {
        // 1、创建连接工厂
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            Password = "guest",
            UserName = "guest",
            VirtualHost = "/"
        };
        using (var connection = factory.CreateConnection())
        {
            var channel = connection.CreateModel();
            // 2、定义队列
            channel.QueueDeclare(queue: "product-create",
                                 durable: true,// 队列持久化
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);
       string productJson = JsonConvert.SerializeObject(productCreateDto);
        // string message = "Hello World!";
        var body = Encoding.UTF8.GetBytes(productJson);

        // 3、发送消息
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;  // 设置消息持久化(个性化控制)
        channel.BasicPublish(exchange: "",
                             routingKey: "product-create",
                             basicProperties: properties,
                             body: body);
    }
    _logger.LogInformation("成功创建商品");
}
#endregion

2.2 挂载主机上使用

public class RabbitmqHostService : IHostedService
{
    public Task StartAsync(CancellationToken cancellationToken)
    {
        // 1、创建连接
        var factory = new ConnectionFactory()
        {
            HostName = "localhost",
            Port = 5672,
            Password = "guest",
            UserName = "guest",
            VirtualHost = "/"
        };
        var connection = factory.CreateConnection();
        #region 1、工作队列(单消费者)
        {
            var channel = connection.CreateModel();

            // 2、定义队列
            channel.QueueDeclare(queue: "product-create",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {

                Console.WriteLine($"model:{model}");
                var body = ea.Body;
                // 1、逻辑代码,添加商品到数据库
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine(" [x] 创建商品 {0}", message);
            };

            channel.BasicConsume(queue: "product-create",
                                 autoAck: true, // 消息确认(防止消息重新消费)
                                 consumer: consumer);
        }
        #endregion

        Console.WriteLine("rabbitmq开始监听......");
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        // 1、关闭rabbitmq的连接
        throw new NotImplementedException();
    }
}
services.AddHostedService<RabbitmqHostService>();
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。