【愚公系列】2022年03月 .NET架构班 024-分布式中间件 RabbitMQ多场景使用情况分析
一、RabbitMQ多场景使用情况分析
1.一个生产者/多个消费者(全部消费)
1.1 生产者
// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
#region 2、扇形交换机
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
// 2、定义交换机
channel.ExchangeDeclare(exchange: "product_fanout", type: "fanout");
string productJson = JsonConvert.SerializeObject(productCreateDto);
// string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(productJson);
// 3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
channel.BasicPublish(exchange: "product_fanout",
routingKey: "",
basicProperties: properties,
body: body);
}
_logger.LogInformation("成功创建商品");
}
#endregion
}
1.2 消费者
// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
#region 6、订阅发布(广播消费)1、创建商品----2、发送短信-扇形交换机
{
var channel = connection.CreateModel();
// 1、定义交换机
channel.ExchangeDeclare(exchange: "product_fanout", type: "fanout");
// 2、定义随机队
var queueName = channel.QueueDeclare().QueueName;
// 3、队列要和交换机绑定起来
channel.QueueBind(queueName,
"product_fanout",
routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、业务逻辑
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 创建商品 {0}", message);
// 自动确认机制缺陷:
// 1、消息是否正常添加到数据库当中,所以需要使用手工确认
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消费消息
channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicConsume(queue: queueName,
autoAck: false, // 消息确认(防止消息消费失败)
consumer: consumer);
}
#endregion
}
/// <summary>
/// 发送短信
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定义交换机
channel.ExchangeDeclare(exchange: "product_fanout", type: ExchangeType.Fanout);
// 2、定义随机队列
var queueName = channel.QueueDeclare().QueueName;
// 3、队列要和交换机绑定起来
channel.QueueBind(queueName,
"product_fanout",
routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、业务逻辑
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 发送短信 {0}", message);
// 自动确认机制缺陷:
// 1、消息是否正常添加到数据库当中,所以需要使用手工确认
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消费消息
channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicConsume(queue: queueName,
autoAck: false, // 消息确认(防止消息消费失败)
consumer: consumer);
}
1.3 原理分析
扇形交换机fanout实现的就是订阅发布,生产者把消息发给给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给所有队列----->发送给消费者。
2.一个生产者/多个消费者(选择消费)
2.1 生产者
// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
#region 3、直连交换机
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
// 2、定义交换机
channel.ExchangeDeclare(exchange: "product_direct", type: "direct");
string productJson = JsonConvert.SerializeObject(productCreateDto);
// string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(productJson);
// 3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
channel.BasicPublish(exchange: "product_direct",
routingKey: "product-eamil",
basicProperties: properties,
body: body);
}
_logger.LogInformation("成功创建商品");
}
#endregion
}
2.2 消费者
// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
#region 7、创建商品----2、发送短信或者发送邮件--直连交换机
{
// 工具:直连交换机 type:direct
var channel = connection.CreateModel();
// 1、定义交换机
channel.ExchangeDeclare(exchange: "product_direct",
type: "direct");
*//* // 2、定义随机队列(用完之后立马删除)
var queueName = channel.QueueDeclare().QueueName;
// 3、队列要和交换机绑定起来
channel.QueueBind(queueName,
"product_direct",
routingKey: "product-sms");*//*
// 2、定义随机队列(用完之后立马删除)
var queueName1 = channel.QueueDeclare().QueueName;
// 3、队列要和交换机绑定起来
channel.QueueBind(queueName1,
"product_direct",
routingKey: "product-eamil");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
// Thread.Sleep(1000);
Console.WriteLine(" [x] 创建商品 {0}", message);
};
// 3、消费消息
channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicConsume(queue: queueName1,
autoAck: true, // 消息确认(防止消息消费失败)
consumer: consumer);
}
#endregion
}
/// <summary>
/// 发送短信
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
#region 1、直连交换机
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定义交换机
channel.ExchangeDeclare(exchange: "product_direct", type: ExchangeType.Direct);
// 2、定义随机队列
var queueName = channel.QueueDeclare().QueueName;
// 3、队列要和交换机绑定起来
channel.QueueBind(queueName,
"product_direct",
routingKey: "product-sms");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、业务逻辑
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 发送短信 {0}", message);
// 自动确认机制缺陷:
// 1、消息是否正常添加到数据库当中,所以需要使用手工确认
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消费消息
channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicConsume(queue: queueName,
autoAck: false, // 消息确认(防止消息消费失败)
consumer: consumer);
}
#endregion
}
2.3 原理分析
直连交换机direct实现的就是指定订阅发布,生产者把消息发送给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给指定队列----->发送给消费者
3.多个生产者/一个消费者(选择消费)
3.1 生产者
// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
#region 4、主题交换机
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
// 2、定义交换机
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string productJson = JsonConvert.SerializeObject(productCreateDto);
// string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(productJson);
// 3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
channel.BasicPublish(exchange: "sms_topic",
routingKey: "sms.product.update",
basicProperties: properties,
body: body);
}
_logger.LogInformation("成功创建商品");
}
#endregion
}
3.2 消费者
// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
#region 4、主题交换机
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
// 2、定义交换机
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string orderJson = JsonConvert.SerializeObject(orderCreateDto);
// string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(orderJson);
// 3、发送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 设置消息持久化
channel.BasicPublish(exchange: "sms_topic",
routingKey: "sms.order",
basicProperties: properties,
body: body);
}
_logger.LogInformation("成功创建订单");
}
#endregion
}
/// <summary>
/// 发送短信
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
#region 2、主题交换机
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定义交换机
channel.ExchangeDeclare(exchange: "sms_topic", type: ExchangeType.Topic);
// 2、定义随机队列
var queueName = channel.QueueDeclare().QueueName;
// 3、队列要和交换机绑定起来。多对1
// * 号的缺陷:只能匹配一级
// # 能够匹配一级及多级以上
// 真实项目当中,使用主题交换机。因为:可以满足所有场景
channel.QueueBind(queueName,
"sms_topic",
routingKey: "sms.#");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、业务逻辑
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 发送短信 {0}", message);
// 自动确认机制缺陷:
// 1、消息是否正常添加到数据库当中,所以需要使用手工确认
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消费消息
channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicConsume(queue: queueName,
autoAck: false, // 消息确认(防止消息消费失败)
consumer: consumer);
}
#endregion
}
3.3 原理分析
主题交换机实现的就是不同生产产匹配到相同消费者,生产者把消息发送给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给指定队列----->发送给消费者
4.一个生产者/一个消费者(回调消费)
4.1 生产者
// <summary>
/// 创建商品
/// </summary>
/// <param name="productCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Product> CreateProduct(ProductCreateDto productCreateDto)
{
#region 3、RPC回调来实现
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 2、定义队列
string replyQueueName = channel.QueueDeclare().QueueName;
var properties = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
properties.CorrelationId = correlationId;
properties.ReplyTo = replyQueueName;
// 3、发送消息
string productJson = JsonConvert.SerializeObject(productCreateDto);
// string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(productJson);
properties.Persistent = true; // 设置消息持久化
channel.BasicPublish(exchange: "",
routingKey: "product_create2",
basicProperties: properties,
body: body);
// 4、消息回调
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、业务逻辑处理
var message = Encoding.UTF8.GetString(body.ToArray());
if (ea.BasicProperties.CorrelationId == correlationId)
{
Console.WriteLine(" [x] 回调成功 {0}", message);
}
};
// 3、消费消息
// channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicConsume(queue: replyQueueName,
autoAck: true, // 消息确认(防止消息消费失败)
consumer: consumer);
_logger.LogInformation("成功创建商品");
}
#endregion
}
4.2 消费者
/// <summary>
/// 创建商品
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<Product> CreateProdcuts()
{
// 1、创建连接
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
#region 9、创建商品-----回调-RPC
{
// 工具:直连交换机 type:direct
var channel = connection.CreateModel();
// 1、定义随机队列(用完之后立马删除)
var queueName = channel.QueueDeclare(queue: "product_create2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
// 1、执行业务
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 创建商品 {0}", message);
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
}
finally
{
Console.WriteLine("发送回调消息");
var responseBytes = Encoding.UTF8.GetBytes("商品回调成功");
channel.BasicPublish(exchange: "",
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: responseBytes);
/*channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);*/
}
};
// 3、消费消息
// channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicConsume(queue: "product_create2",
autoAck: true, // 消息确认(防止消息消费失败)
consumer: consumer);
}
#endregion
}
4.3 原理分析
RabbitMQ中实现RPC的机制是:
客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败);
服务器端收到消息并处理;
服务器端处理完消息后0,0将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性;
客户端之前已订阅replyTo指 定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
- 点赞
- 收藏
- 关注作者
评论(0)