【愚公系列】2022年03月 .NET架构班 023-分布式中间件 RabbitMQ多场景使用问题分析
一、微服务场景下的RabbitMQ
RabbitMQ主要用在分布式系统中,主要是应用在微服务系统中。在微服务系统中,微服务之间通信,主要是通过Http或者gRPC通信。由于http/gRPC通信方式是同步通信,如果遇到了高并发,同步通信就会导致微服务系统性能瓶颈,所以,为了解决微服务性能瓶颈问题。需要将同步通信换成异步通信方式。因此选用消息队列就是个很好的选择。
RabbitMQ是消息队列中的一种。
二、微服务场景下RabbitMQ的使用
1.RabbitMQ的安装
RabbitMQ的安装可以参考本人其他文章
window11下安装:https://blog.csdn.net/aa2528877987/article/details/107831477
Docker下安装:https://blog.csdn.net/aa2528877987/article/details/122802713
2.RabbitMQ的使用
nuget引入 RabbitMQ.Client
2.1 基本使用
生产者
/// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
#region 1、生产者
{
// 1、创建连接工厂
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
// 2、定义队列
channel.QueueDeclare(queue: "product-create",
durable: false,// 消息持久化(防止rabbitmq宕机导致队列丢失风险)
exclusive: false,
autoDelete: false,
arguments: null);
string productJson = JsonConvert.SerializeObject(productCreateDto);
// string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(productJson);
// 3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化(个性化控制)
channel.BasicPublish(exchange: "",
routingKey: "product-create",
basicProperties: properties,
body: body);
}
_logger.LogInformation("成功创建商品");
}
#endregion
}
消费者
[HttpPost]
public IEnumerable<Product> CreateProdcuts()
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
#region 1、工作队列(单消费者)
{
var channel = connection.CreateModel();
// 2、定义队列
channel.QueueDeclare(queue: "product-create",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、逻辑代码,添加商品到数据库
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 创建商品 {0}", message);
};
channel.BasicConsume(queue: "product-create",
autoAck: false,
consumer: consumer);
}
#endregion
}
2.1.1 消费端挂了(自动确认)
问题:RabbitMQ给微服务发消息期间,微服务宕机。导致消息丢失
解决方案:autoAck自动确认机制修改为true
[HttpPost]
public IEnumerable<Product> CreateProdcuts()
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
#region 1、工作队列(单消费者)
{
var channel = connection.CreateModel();
// 2、定义队列
channel.QueueDeclare(queue: "product-create",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、逻辑代码,添加商品到数据库
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 创建商品 {0}", message);
};
channel.BasicConsume(queue: "product-create",
autoAck: true, // 消息自动确认机制
consumer: consumer);
}
#endregion
}
2.1.2 消费端挂了(手动确认)
问题:rabbitmq给微服务发了消息,微服务收到消息,微服务发送确认消息给rabbitmq期间,执行业务逻辑失败了。
解决方案:手动确认
[HttpPost]
public IEnumerable<Product> CreateProdcuts()
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
#region 1、工作队列(单消费者)
{
var channel = _connection.CreateModel();
// 2、定义队列
channel.QueueDeclare(queue: "product-create",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、逻辑代码
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 创建商品 {0}", message);
// 自动确认机制缺陷:
// 1、消息是否正常添加到数据库当中,所以需要使用手工确认
channel.BasicAck(ea.DeliveryTag, true);
};
channel.BasicConsume(queue: "product-create",
autoAck: false, // 消息确认(防止消息重新消费)
consumer: consumer);
}
#endregion
}
2.1.3 队列消息堆积(多个消费端)
问题:网站发送高并发消息,导致商品微服务来不及处理,导致消息堆积
解决方案:微服务集群消费
2.1.4 队列消息堆积(消费端堆积)
问题:微服务集群缺陷:无法控制集群实例的强弱。如果5007比较强,5006弱,就会导致消息大部分堆积在5006
解决方案:使用qos
[HttpPost]
public IEnumerable<Product> CreateProdcuts()
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
#region 1、工作队列(单消费者)
{
var channel = connection.CreateModel();
// 2、定义队列
channel.QueueDeclare(queue: "product-create",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 创建商品 {0}", message);
// 自动确认机制缺陷:
// 1、消息是否正常添加到数据库当中,所以需要使用手工确认
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消费消息
channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicConsume(queue: "product-create",
autoAck: false, // 消息确认(防止消息消费失败)
consumer: consumer);
}
#endregion
}
2.1.5 RabbitMQ挂了
问题:网站给RabbitMQ发送消息成功后,如果RabbitMQ宕机了,会导致RabbitMQ中消息丢失!如何解决消息丢失问题
解决方案:消息持久化机制
#region 1、生产者
{
// 1、创建连接工厂
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
// 2、定义队列
channel.QueueDeclare(queue: "product-create",
durable: true,// 队列持久化
exclusive: false,
autoDelete: false,
arguments: null);
string productJson = JsonConvert.SerializeObject(productCreateDto);
// string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(productJson);
// 3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化(个性化控制)
channel.BasicPublish(exchange: "",
routingKey: "product-create",
basicProperties: properties,
body: body);
}
_logger.LogInformation("成功创建商品");
}
#endregion
2.2 挂载主机上使用
public class RabbitmqHostService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
#region 1、工作队列(单消费者)
{
var channel = connection.CreateModel();
// 2、定义队列
channel.QueueDeclare(queue: "product-create",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、逻辑代码,添加商品到数据库
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 创建商品 {0}", message);
};
channel.BasicConsume(queue: "product-create",
autoAck: true, // 消息确认(防止消息重新消费)
consumer: consumer);
}
#endregion
Console.WriteLine("rabbitmq开始监听......");
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
// 1、关闭rabbitmq的连接
throw new NotImplementedException();
}
}
services.AddHostedService<RabbitmqHostService>();
- 点赞
- 收藏
- 关注作者
评论(0)