【愚公系列】2022年03月 .NET架构班 027-分布式中间件 Kafka多场景使用情况分析
【摘要】 一、创建主题和分区/// <summary>/// 创建分区(更新分区)/// </summary>/// <param name="topic"></param>/// <param name="Partitions"></param>/// <returns></returns>[HttpGet("PartitionUpdate")]public async Task Partitio...
一、创建主题和分区
/// <summary>
/// 创建分区(更新分区)
/// </summary>
/// <param name="topic"></param>
/// <param name="Partitions"></param>
/// <returns></returns>
[HttpGet("PartitionUpdate")]
public async Task PartitionCreate(string topic,int PartitionCount)
{
AdminClientConfig adminClientConfig = new AdminClientConfig
{
BootstrapServers = "127.0.0.1:9092",
};
var bu = new AdminClientBuilder(adminClientConfig).Build();
bu.CreatePartitionsAsync(new PartitionsSpecification[] {
new PartitionsSpecification { Topic = topic, IncreaseTo=PartitionCount}
}).Wait();
await Task.CompletedTask;
}
/// <summary>
/// 创建主题
/// </summary>
/// <param name="topic"></param>
/// <param name="Partitions"></param>
/// <returns></returns>
[HttpGet("TopicCreate")]
public async Task TopicCreate(string topic)
{
AdminClientConfig adminClientConfig = new AdminClientConfig
{
BootstrapServers = "127.0.0.1:9092",
};
var bu = new AdminClientBuilder(adminClientConfig).Build();
bu.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification { Name = topic}
}).Wait();
await Task.CompletedTask;
}
/// <summary>
/// 创建主题和分区
/// </summary>
/// <param name="topic"></param>
/// <param name="Partitions"></param>
/// <returns></returns>
[HttpGet("TopicPartitionCreate")]
public async Task TopicPartitionCreate(string topic,int PartitionCount)
{
AdminClientConfig adminClientConfig = new AdminClientConfig
{
BootstrapServers = "127.0.0.1:9092",
};
var bu = new AdminClientBuilder(adminClientConfig).Build();
bu.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification { Name = topic,NumPartitions =PartitionCount}
}).Wait();
await Task.CompletedTask;
}
二、Kafka多场景使用情况分析
1.订阅发布(广播消费)
生产者
/// <summary>
/// 创建订单
/// </summary>
/// <param name="orderCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<OrderCreateDto> CreateOrder(OrderCreateDto orderCreateDto)
{
#region 1、生产者 Producer
{
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000,
EnableIdempotence = true
};
var builder = new ProducerBuilder<string, string>(producerConfig);
builder.SetDefaultPartitioner(RoundRobinPartitioner);
using (var producer = builder.Build())
{
try
{
var dr = producer.ProduceAsync("create-order", new Message<string, string> { Key = "order-1", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
}
}
}
#endregion
}
消费者
// <summary>
/// 创建订单
/// </summary>
/// <param name="OrderCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Order> CreateOrder(OrderCreateDto OrderCreateDto)
{
#region 5、订阅发布(广播消费)1、创建订单----2、发送短信-GroupId
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = true,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
var consumer = builder.Build();
// 1、订阅
consumer.Subscribe("create-order");
// 2、获取偏移量
string offset = distributedCache.GetString("create-order");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
// 3、重置偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 0), int.Parse(offset)));
while (true)
{
// 2、消费
var result = consumer.Consume();
// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
// 3、业务处理
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");
// 2.2、把kafka队列中偏移量存起来。redis mysql
// 2.3、重置kafka队列的偏移量
distributedCache.SetString("create-order", result.Offset.Value.ToString());
// 3、手动提交
//consumer.Commit(result);
}
}).Start();
}
#endregion
}
/// <summary>
/// 发送短信
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "sms",
EnableAutoCommit = false,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
var consumer = builder.Build();
// 1、订阅
consumer.Subscribe("create-order");
while (true)
{
// 2、消费
var result = consumer.Consume();
// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
// 3、业务处理
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");
// 3、手动提交
consumer.Commit(result);
}
}).Start();
}
2.算法分区的使用
生产者
/// <summary>
/// 创建订单
/// </summary>
/// <param name="orderCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<OrderCreateDto> CreateOrder(OrderCreateDto orderCreateDto)
{
#region 1、生产者 Producer
{
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000,
EnableIdempotence = true
};
var builder = new ProducerBuilder<string, string>(producerConfig);
builder.SetDefaultPartitioner(RoundRobinPartitioner);//默认hash一致性算法,这边用轮询替换
using (var producer = builder.Build())
{
try
{
var dr = producer.ProduceAsync("create-order", new Message<string, string> { Key = "order-1", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
}
}
}
#endregion
}
/// <summary>
/// 分区随机算法
/// </summary>
/// <param name="topic"></param>
/// <param name="partitionCount"></param>
/// <param name="keyData"></param>
/// <param name="keyIsNull"></param>
/// <returns></returns>
private Partition RandomPartitioner(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
{
Random random = new Random();
int partition = random.Next(partitionCount-1);
return new Partition(partition);
}
/// <summary>
/// 分区轮询算法。两个分区得到消息是一致的
/// </summary>
/// <param name="topic"></param>
/// <param name="partitionCount"></param>
/// <param name="keyData"></param>
/// <param name="keyIsNull"></param>
/// <returns></returns>
static int requestCount = 0;
private Partition RoundRobinPartitioner(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
{
int partition = requestCount % partitionCount;
requestCount++;
return new Partition(partition);
}
消费者
#region 创建订单----1、创建订单集群或发送短信集群--消息分区
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = false,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
var consumer = builder.Build();
// 1、订阅
consumer.Subscribe("create-order");
// 2、获取偏移量
*//*string offset = distributedCache.GetString("create-order");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}*//*
// 3、重置偏移量
//consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 1), int.Parse(offset)));
while (true)
{
// 2、消费
var result = consumer.Consume();
// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
// 3、业务处理
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");
// 2.2、把kafka队列中偏移量存起来。redis mysql
// 2.3、重置kafka队列的偏移量
//distributedCache.SetString("create-order", result.Offset.Value.ToString());
// 3、手动提交
consumer.Commit(result);
}
}).Start();
}
#endregion
在这里插入代码片
3.固定分区的使用
生产者
#region 4、生产者-固定分区发送
{
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder<string, string>(producerConfig);
using (var producer = builder.Build())
{
try
{
var OrderJson = JsonConvert.SerializeObject(orderCreateDto);
//TopicPartition topicPartition = new TopicPartition("order-create", new Partition(0));
var dr = producer.ProduceAsync("order-create", new Message<string, string> { Key = "order", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
}
}
}
#endregion
消费者
#region 创建订单----1、创建订单集群或发送短信集群--消息分区-固定分区
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = false,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
var consumer = builder.Build();
// 1、订阅
consumer.Subscribe("create-order");
// 1.1、获取偏移量
string offset = distributedCache.GetString("create-order");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
// 1.2、重置偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 0), int.Parse(offset)));
while (true)
{
// 2、消费
var result = consumer.Consume();
// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
_logger.LogInformation($"订单消息分区序号:Partition:{result.Partition}");
// 3、业务处理
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");
// 2.2、把kafka队列中偏移量存起来。redis mysql
// 2.3、重置kafka队列的偏移量
distributedCache.SetString("create-order", result.Offset.Value.ToString());
// 3、手动提交
consumer.Commit(result);
}
}).Start();
}
#endregion
4.消息延迟处理
消费者
#region 创建订单----1、订单消息延迟处理
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = false,
FetchMinBytes=170,
FetchMaxBytes=3060
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build())
{
// 1、订阅
consumer.Subscribe("create-order-1");
// 2、偏移量恢复
string offset = distributedCache.GetString("create-order-1");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order-1", 0), int.Parse(offset)));
while (true)
{
// 1、恢复消息
new Timer((s) =>
{
consumer.Resume(new List<TopicPartition> { new TopicPartition("create-order-1", 0) });
}, null, Timeout.Infinite, Timeout.Infinite).Change(5000, 5000);
// 1.1、消费暂停
consumer.Pause(new List<TopicPartition> { new TopicPartition("create-order-1", 0) });
// 2、消费消息
var result = consumer.Consume(); //批量获取消息,根据-----》字节数
try
{
// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
// 3、业务处理
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");
// 2.2、把kafka队列中偏移量存起来。redis mysql
// 2.3、重置kafka队列的偏移量
distributedCache.SetString("create-order-1", result.Offset.Value.ToString());
// 3、手动提交
consumer.Commit(result);
}
catch (Exception)
{
throw;
} finally
{
consumer.Pause(new List<TopicPartition> { new TopicPartition("create-order-1", 0) });
Console.WriteLine($"暂停消费");
}
}
}
}).Start();
}
#endregion
#region 创建订单----1、订单消息延迟处理-批量处理消息
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = false,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build())
{
// 1、订阅
consumer.Subscribe("create-order");
// 2、偏移量恢复
string offset = distributedCache.GetString("create-order");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 0), int.Parse(offset)));
while (true)
{
// 1、恢复消息
new Timer((s) =>
{
consumer.Resume(new List<TopicPartition> { new TopicPartition("create-order", 0) });
}, null, Timeout.Infinite, Timeout.Infinite).Change(5000, 5000);
// 1.1、消费暂停
consumer.Pause(new List<TopicPartition> { new TopicPartition("create-order", 0) });
// 2、消费
var result = consumer.Consume();
// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
// 3、业务处理
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");
// 2.2、把kafka队列中偏移量存起来。redis mysql
// 2.3、重置kafka队列的偏移量
distributedCache.SetString("create-order", result.Offset.Value.ToString());
// 3、手动提交
consumer.Commit(result);
}
}
}).Start();
}
#endregion
5.生产者失败重试和事务
失败重试
#region 2、生产者-失败重试
{
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000,
EnableIdempotence = true // 保证消息:不重复发送,失败重试
};
var builder = new ProducerBuilder<string, string>(producerConfig);
using (var producer = builder.Build())
{
try
{
var OrderJson = JsonConvert.SerializeObject(orderCreateDto);
// TopicPartition topicPartition = new TopicPartition("order-create-5",new Partition(0));
var dr = producer.ProduceAsync("order-create-5", new Message<string, string> { Key = "order", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
}
}
}
#endregion
事务
#region 3、生产者-失败重试-多消息发送
{
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000,
EnableIdempotence = true,
TransactionalId = Guid.NewGuid().ToString()
};
var builder = new ProducerBuilder<string, string>(producerConfig);
using (var producer = builder.Build())
{
// 1、初始化事务
producer.InitTransactions(TimeSpan.FromSeconds(60));
try
{
var OrderJson = JsonConvert.SerializeObject(orderCreateDto);
// 2、开发事务
producer.BeginTransaction();
for (int i = 0; i < 100; i++)
{
var dr = producer.ProduceAsync("order-create-5", new Message<string, string> { Key = "order", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
}
// 3、提交事务
producer.CommitTransaction();
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
// 4、关闭事务
producer.AbortTransaction();
}
}
}
#endregion
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)