【愚公系列】2022年03月 .NET架构班 026-分布式中间件 Kafka场景使用问题分析
【摘要】 @TOC 一、微服务场景下Kafka的使用 1.Kafka的安装Kafka下载网址:https://archive.apache.org/dist/kafka/Kafka的安装可以参考本人其他文章Docker下安装:https://blog.csdn.net/aa2528877987/article/details/123589123?spm=1001.2014.3001.5501 2.Ka...
@TOC
一、微服务场景下Kafka的使用
1.Kafka的安装
Kafka下载网址:https://archive.apache.org/dist/kafka/
Kafka的安装可以参考本人其他文章
Docker下安装:https://blog.csdn.net/aa2528877987/article/details/123589123?spm=1001.2014.3001.5501
2.Kafka的使用
nuget引入 Confluent.Kafka
2.1 基本使用
生产者
/// <summary>
/// 创建订单
/// </summary>
/// <param name="orderCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<OrderCreateDto> CreateOrder(OrderCreateDto orderCreateDto)
{
#region 1、生产者 Producer
{
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000,
EnableIdempotence = true
};
var builder = new ProducerBuilder<string, string>(producerConfig);
builder.SetDefaultPartitioner(RoundRobinPartitioner);
using (var producer = builder.Build())
{
try
{
var dr = producer.ProduceAsync("create-order", new Message<string, string> { Key = "order-1", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
}
}
}
#endregion
}
消费者
// <summary>
/// 创建订单
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<Order> OrderCreate()
{
#region 1、工作队列(单消费者) Consumer
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = false
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build())
{
// 1、订阅
consumer.Subscribe("create-order");
while (true)
{
try
{
// 2、消费(自动确认)
var result = consumer.Consume();
// 3、业务逻辑:业务逻辑---->执行失败--->消息丢失
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建订单:Key:{key}");
_logger.LogInformation($"创建订单:Order:{value}");
}
catch (Exception e)
{
_logger.LogInformation($"异常:Order:{e}");
}
}
}
}).Start();
}
#endregion
}
2.1.1 消费端挂了(自动确认)
问题:Kafka给微服务发消息期间,微服务宕机。导致消息丢失
解决方案:EnableAutoCommit 自动确认机制修改为true
// <summary>
/// 创建订单
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<Order> OrderCreate()
{
#region 1、工作队列(单消费者) Consumer
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = true
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build())
{
// 1、订阅
consumer.Subscribe("create-order");
while (true)
{
try
{
// 2、消费(自动确认)
var result = consumer.Consume();
// 3、业务逻辑:业务逻辑---->执行失败--->消息丢失
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建订单:Key:{key}");
_logger.LogInformation($"创建订单:Order:{value}");
}
catch (Exception e)
{
_logger.LogInformation($"异常:Order:{e}");
}
}
}
}).Start();
}
#endregion
}
2.1.2 消费端挂了(手动确认)
问题:Kafka给微服务发了消息,微服务收到消息,微服务发送确认消息给Kafka期间,执行业务逻辑失败了。
解决方案:手动确认
// <summary>
/// 创建订单
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<Order> OrderCreate()
{
#region 1、工作队列(单消费者) Consumer
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = false,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
var consumer = builder.Build();
// 1、订阅
consumer.Subscribe("create-order");
while (true)
{
// 2、消费
var result = consumer.Consume();
// 3、业务逻辑
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");
// 3、手动提交(向kafka确认消息)----偏移量---消息的序号
consumer.Commit(result);//缓冲-kafka-返回
//consumer.StoreOffset(result)//缓冲直接返回,导致重新消费
}
}).Start();
}
#endregion
}
2.1.3 Kafka挂了
问题:Kafka给微服务发了消息,微服务收到消息,微服务发送确认消息给Kafka期间。kafka宕机,导致:消息重复消费
解决方案:重置偏移量
nuget:Microsoft.Extensions.Caching.Redis
public void ConfigureServices(IServiceCollection services)
{
....
services.AddDistributedRedisCache(options =>
{
options.Configuration = "localhost:6379";
});
...
}
#region 4、工作队列(单消费者)-手动确认消息-偏移量(重复消费)-存储偏移量
{
new Task(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "order",
EnableAutoCommit = true,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build())
{
// 1、订阅
consumer.Subscribe("create-order");
// 1.2、获取偏移量
string offset = distributedCache.GetString("create-order");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
// 1.3、重置偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 0), int.Parse(offset) + 1));
while (true)
{
// 2、消费
var result = consumer.Consume();
// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
// 2.2、把kafka队列中偏移量存起来。redis mysql
// 2.3、重置kafka队列的偏移量
distributedCache.SetString("create-order", result.Offset.Value.ToString());
// 3、业务处理
string key = result.Key;
string value = result.Value;
_logger.LogInformation($"创建订单:Key:{key}");
_logger.LogInformation($"创建订单:Order:{value}");
// redis缺陷:无法保证偏移和业务同时成功。
// 方案:使用数据库来存储偏移量
// 核心:通过数据库事务来保证
// 3、手动提交
// consumer.Commit(result);
}
}
}).Start();
}
#endregion
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)