【愚公系列】2022年03月 .NET架构班 026-分布式中间件 Kafka场景使用问题分析

举报
愚公搬代码 发表于 2022/03/21 00:48:08 2022/03/21
【摘要】 @TOC 一、微服务场景下Kafka的使用 1.Kafka的安装Kafka下载网址:https://archive.apache.org/dist/kafka/Kafka的安装可以参考本人其他文章Docker下安装:https://blog.csdn.net/aa2528877987/article/details/123589123?spm=1001.2014.3001.5501 2.Ka...

@TOC


一、微服务场景下Kafka的使用

1.Kafka的安装

Kafka下载网址:https://archive.apache.org/dist/kafka/

Kafka的安装可以参考本人其他文章

Docker下安装:https://blog.csdn.net/aa2528877987/article/details/123589123?spm=1001.2014.3001.5501

2.Kafka的使用

nuget引入 Confluent.Kafka

2.1 基本使用

生产者

/// <summary>
/// 创建订单
/// </summary>
/// <param name="orderCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<OrderCreateDto> CreateOrder(OrderCreateDto orderCreateDto)
{
	#region 1、生产者 Producer
    {
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            MessageTimeoutMs = 50000,
            EnableIdempotence = true
        };

        var builder = new ProducerBuilder<string, string>(producerConfig);
        builder.SetDefaultPartitioner(RoundRobinPartitioner);
        using (var producer = builder.Build())
        {
            try
            {
                var dr = producer.ProduceAsync("create-order", new Message<string, string> { Key = "order-1", Value = OrderJson }).GetAwaiter().GetResult();
                _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
            }
            catch (ProduceException<string, string> ex)
            {
                _logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
            }
        }
    }
    #endregion
}  

消费者

// <summary>
/// 创建订单
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<Order> OrderCreate()
{
    #region 1、工作队列(单消费者) Consumer
    {
        new Task(() =>
        {
            var consumerConfig = new ConsumerConfig
            {
                BootstrapServers = "127.0.0.1:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                GroupId = "order",
                EnableAutoCommit = false
            };
            var builder = new ConsumerBuilder<string, string>(consumerConfig);

            using (var consumer = builder.Build())
            {
                // 1、订阅
                consumer.Subscribe("create-order");
                while (true)
                {
                    try
                    {
                        // 2、消费(自动确认)
                        var result = consumer.Consume();

                        // 3、业务逻辑:业务逻辑---->执行失败--->消息丢失
                        string key = result.Key;
                        string value = result.Value;

                        _logger.LogInformation($"创建订单:Key:{key}");
                        _logger.LogInformation($"创建订单:Order:{value}");
                    }
                    catch (Exception e)
                    {
                        _logger.LogInformation($"异常:Order:{e}");
                    }
                }
            }
        }).Start();
    }
    #endregion
}

2.1.1 消费端挂了(自动确认)

问题:Kafka给微服务发消息期间,微服务宕机。导致消息丢失

解决方案:EnableAutoCommit 自动确认机制修改为true

// <summary>
/// 创建订单
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<Order> OrderCreate()
{
    #region 1、工作队列(单消费者) Consumer
    {
        new Task(() =>
        {
            var consumerConfig = new ConsumerConfig
            {
                BootstrapServers = "127.0.0.1:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                GroupId = "order",
                EnableAutoCommit = true
            };
            var builder = new ConsumerBuilder<string, string>(consumerConfig);

            using (var consumer = builder.Build())
            {
                // 1、订阅
                consumer.Subscribe("create-order");
                while (true)
                {
                    try
                    {
                        // 2、消费(自动确认)
                        var result = consumer.Consume();

                        // 3、业务逻辑:业务逻辑---->执行失败--->消息丢失
                        string key = result.Key;
                        string value = result.Value;

                        _logger.LogInformation($"创建订单:Key:{key}");
                        _logger.LogInformation($"创建订单:Order:{value}");
                    }
                    catch (Exception e)
                    {
                        _logger.LogInformation($"异常:Order:{e}");
                    }
                }
            }
        }).Start();
    }
    #endregion
}

2.1.2 消费端挂了(手动确认)

问题:Kafka给微服务发了消息,微服务收到消息,微服务发送确认消息给Kafka期间,执行业务逻辑失败了。

解决方案:手动确认

// <summary>
/// 创建订单
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<Order> OrderCreate()
{
    #region 1、工作队列(单消费者) Consumer
    {
        new Task(() =>
        {
            var consumerConfig = new ConsumerConfig
            {
                BootstrapServers = "127.0.0.1:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                GroupId = "order",
                EnableAutoCommit = false,
            };
            var builder = new ConsumerBuilder<string, string>(consumerConfig);
            var consumer = builder.Build();
            // 1、订阅
            consumer.Subscribe("create-order");
            while (true)
            {
                // 2、消费
                var result = consumer.Consume();

                // 3、业务逻辑
                string key = result.Key;
                string value = result.Value;

                _logger.LogInformation($"创建商品:Key:{key}");
                _logger.LogInformation($"创建商品:Order:{value}");

                // 3、手动提交(向kafka确认消息)----偏移量---消息的序号
                consumer.Commit(result);//缓冲-kafka-返回
                //consumer.StoreOffset(result)//缓冲直接返回,导致重新消费
            }
        }).Start();
    }
    #endregion
}

2.1.3 Kafka挂了

问题:Kafka给微服务发了消息,微服务收到消息,微服务发送确认消息给Kafka期间。kafka宕机,导致:消息重复消费

解决方案:重置偏移量

nuget:Microsoft.Extensions.Caching.Redis

public void ConfigureServices(IServiceCollection services)
{
        ....
        services.AddDistributedRedisCache(options =>
        {
            options.Configuration = "localhost:6379";
        });
        ...
}
#region 4、工作队列(单消费者)-手动确认消息-偏移量(重复消费)-存储偏移量
{
    new Task(() =>
             {
                 var consumerConfig = new ConsumerConfig
                 {
                     BootstrapServers = "127.0.0.1:9092",
                     AutoOffsetReset = AutoOffsetReset.Earliest,
                     GroupId = "order",
                     EnableAutoCommit = true,
                 };
                 var builder = new ConsumerBuilder<string, string>(consumerConfig);
                 using (var consumer = builder.Build())
                 {
                     // 1、订阅
                     consumer.Subscribe("create-order");

                     // 1.2、获取偏移量
                     string offset = distributedCache.GetString("create-order");
                     if (string.IsNullOrEmpty(offset))
                     {
                         offset = "0";
                     }

                     // 1.3、重置偏移量
                     consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 0), int.Parse(offset) + 1));
                     while (true)
                     {
                         // 2、消费
                         var result = consumer.Consume();

                         // 2.1、获取偏移量
                         _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
                         // 2.2、把kafka队列中偏移量存起来。redis mysql
                         // 2.3、重置kafka队列的偏移量
                         distributedCache.SetString("create-order", result.Offset.Value.ToString());

                         // 3、业务处理
                         string key = result.Key;
                         string value = result.Value;
                         _logger.LogInformation($"创建订单:Key:{key}");
                         _logger.LogInformation($"创建订单:Order:{value}");

                         // redis缺陷:无法保证偏移和业务同时成功。
                         // 方案:使用数据库来存储偏移量
                         //       核心:通过数据库事务来保证
                         // 3、手动提交
                         // consumer.Commit(result);
                     }
                 }
             }).Start();
}
#endregion
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。