【愚公系列】2022年03月 .NET架构班 027-分布式中间件 Kafka多场景使用情况分析

举报
愚公搬代码 发表于 2022/03/24 01:23:47 2022/03/24
【摘要】 一、创建主题和分区/// <summary>/// 创建分区(更新分区)/// </summary>/// <param name="topic"></param>/// <param name="Partitions"></param>/// <returns></returns>[HttpGet("PartitionUpdate")]public async Task Partitio...

一、创建主题和分区

/// <summary>
/// 创建分区(更新分区)
/// </summary>
/// <param name="topic"></param>
/// <param name="Partitions"></param>
/// <returns></returns>
[HttpGet("PartitionUpdate")]
public async Task PartitionCreate(string topic,int PartitionCount)
{
    AdminClientConfig adminClientConfig = new AdminClientConfig
    {
        BootstrapServers = "127.0.0.1:9092",
    };

    var bu = new AdminClientBuilder(adminClientConfig).Build();
    bu.CreatePartitionsAsync(new PartitionsSpecification[] {
            new PartitionsSpecification { Topic = topic, IncreaseTo=PartitionCount}
        }).Wait();

    await Task.CompletedTask;
}

/// <summary>
/// 创建主题
/// </summary>
/// <param name="topic"></param>
/// <param name="Partitions"></param>
/// <returns></returns>
[HttpGet("TopicCreate")]
public async Task TopicCreate(string topic)
{
    AdminClientConfig adminClientConfig = new AdminClientConfig
    {
        BootstrapServers = "127.0.0.1:9092",
    };

    var bu = new AdminClientBuilder(adminClientConfig).Build();
    bu.CreateTopicsAsync(new TopicSpecification[] {
            new TopicSpecification { Name = topic}
        }).Wait();

    await Task.CompletedTask;
}

/// <summary>
/// 创建主题和分区
/// </summary>
/// <param name="topic"></param>
/// <param name="Partitions"></param>
/// <returns></returns>
[HttpGet("TopicPartitionCreate")]
public async Task TopicPartitionCreate(string topic,int PartitionCount)
{
    AdminClientConfig adminClientConfig = new AdminClientConfig
    {
        BootstrapServers = "127.0.0.1:9092",
    };

    var bu = new AdminClientBuilder(adminClientConfig).Build();
    bu.CreateTopicsAsync(new TopicSpecification[] {
            new TopicSpecification { Name = topic,NumPartitions =PartitionCount}
        }).Wait();

    await Task.CompletedTask;
}

二、Kafka多场景使用情况分析

1.订阅发布(广播消费)

生产者

/// <summary>
/// 创建订单
/// </summary>
/// <param name="orderCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<OrderCreateDto> CreateOrder(OrderCreateDto orderCreateDto)
{
	#region 1、生产者 Producer
    {
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            MessageTimeoutMs = 50000,
            EnableIdempotence = true
        };

        var builder = new ProducerBuilder<string, string>(producerConfig);
        builder.SetDefaultPartitioner(RoundRobinPartitioner);
        using (var producer = builder.Build())
        {
            try
            {
                var dr = producer.ProduceAsync("create-order", new Message<string, string> { Key = "order-1", Value = OrderJson }).GetAwaiter().GetResult();
                _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
            }
            catch (ProduceException<string, string> ex)
            {
                _logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
            }
        }
    }
    #endregion
}  

消费者

// <summary>
/// 创建订单
/// </summary>
/// <param name="OrderCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<Order> CreateOrder(OrderCreateDto OrderCreateDto)
{
	 #region 5、订阅发布(广播消费)1、创建订单----2、发送短信-GroupId
        {
            new Task(() =>
            {
                var consumerConfig = new ConsumerConfig
                {
                    BootstrapServers = "127.0.0.1:9092",
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                    GroupId = "order",
                    EnableAutoCommit = true,
                };
                var builder = new ConsumerBuilder<string, string>(consumerConfig);
                var consumer = builder.Build();

                // 1、订阅
                consumer.Subscribe("create-order");
                // 2、获取偏移量
                string offset = distributedCache.GetString("create-order");
                if (string.IsNullOrEmpty(offset))
                {
                    offset = "0";
                }
                // 3、重置偏移量
                consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 0), int.Parse(offset)));
                while (true)
                {
                    // 2、消费
                    var result = consumer.Consume();
                    // 2.1、获取偏移量
                    _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");

                    // 3、业务处理
                    string key = result.Key;
                    string value = result.Value;
                    _logger.LogInformation($"创建商品:Key:{key}");
                    _logger.LogInformation($"创建商品:Order:{value}");

                    // 2.2、把kafka队列中偏移量存起来。redis mysql
                    // 2.3、重置kafka队列的偏移量
                    distributedCache.SetString("create-order", result.Offset.Value.ToString());

                    // 3、手动提交
                    //consumer.Commit(result);
                }
            }).Start();
        }
        #endregion
}
/// <summary>
/// 发送短信
/// </summary>
/// <returns></returns>
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
    new Task(() =>
    {
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId = "sms",
            EnableAutoCommit = false,
        };
        var builder = new ConsumerBuilder<string, string>(consumerConfig);
        var consumer = builder.Build();

        // 1、订阅
        consumer.Subscribe("create-order");
        while (true)
        {
            // 2、消费
            var result = consumer.Consume();
            // 2.1、获取偏移量
            _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");

            // 3、业务处理
            string key = result.Key;
            string value = result.Value;
            _logger.LogInformation($"创建商品:Key:{key}");
            _logger.LogInformation($"创建商品:Order:{value}");

            // 3、手动提交
            consumer.Commit(result);
        }
    }).Start();
}

2.算法分区的使用

生产者

/// <summary>
/// 创建订单
/// </summary>
/// <param name="orderCreateDto"></param>
/// <returns></returns>
[HttpPost]
public IEnumerable<OrderCreateDto> CreateOrder(OrderCreateDto orderCreateDto)
{
	#region 1、生产者 Producer
    {
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            MessageTimeoutMs = 50000,
            EnableIdempotence = true
        };

        var builder = new ProducerBuilder<string, string>(producerConfig);
        builder.SetDefaultPartitioner(RoundRobinPartitioner);//默认hash一致性算法,这边用轮询替换
        using (var producer = builder.Build())
        {
            try
            {
                var dr = producer.ProduceAsync("create-order", new Message<string, string> { Key = "order-1", Value = OrderJson }).GetAwaiter().GetResult();
                _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
            }
            catch (ProduceException<string, string> ex)
            {
                _logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
            }
        }
    }
    #endregion
}  
/// <summary>
/// 分区随机算法
/// </summary>
/// <param name="topic"></param>
/// <param name="partitionCount"></param>
/// <param name="keyData"></param>
/// <param name="keyIsNull"></param>
/// <returns></returns>
private Partition RandomPartitioner(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
{
    Random random = new Random();
    int partition = random.Next(partitionCount-1);
    return new Partition(partition);
}

/// <summary>
/// 分区轮询算法。两个分区得到消息是一致的
/// </summary>
/// <param name="topic"></param>
/// <param name="partitionCount"></param>
/// <param name="keyData"></param>
/// <param name="keyIsNull"></param>
/// <returns></returns>
static int requestCount = 0;
private Partition RoundRobinPartitioner(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
{
    int partition = requestCount % partitionCount;
    requestCount++;
    return new Partition(partition);
}

消费者

#region 创建订单----1、创建订单集群或发送短信集群--消息分区
{
    new Task(() =>
    {
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId = "order",
            EnableAutoCommit = false,
        };
        var builder = new ConsumerBuilder<string, string>(consumerConfig);
        var consumer = builder.Build();

        // 1、订阅
        consumer.Subscribe("create-order");
        // 2、获取偏移量
        *//*string offset = distributedCache.GetString("create-order");
        if (string.IsNullOrEmpty(offset))
        {
            offset = "0";
        }*//*
        // 3、重置偏移量
        //consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 1), int.Parse(offset)));
        while (true)
        {
            // 2、消费
            var result = consumer.Consume();
            // 2.1、获取偏移量
            _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
            // 3、业务处理
            string key = result.Key;
            string value = result.Value;
            _logger.LogInformation($"创建商品:Key:{key}");
            _logger.LogInformation($"创建商品:Order:{value}");

            // 2.2、把kafka队列中偏移量存起来。redis mysql
            // 2.3、重置kafka队列的偏移量
            //distributedCache.SetString("create-order", result.Offset.Value.ToString());

            // 3、手动提交
            consumer.Commit(result);
        }
    }).Start();
}
#endregion
在这里插入代码片

3.固定分区的使用

生产者

#region 4、生产者-固定分区发送
{
    var producerConfig = new ProducerConfig
    {
        BootstrapServers = "127.0.0.1:9092",
        MessageTimeoutMs = 50000
    };
    var builder = new ProducerBuilder<string, string>(producerConfig);
    using (var producer = builder.Build())
    {
        try
        {
            var OrderJson = JsonConvert.SerializeObject(orderCreateDto);
            //TopicPartition topicPartition = new TopicPartition("order-create", new Partition(0));
            var dr = producer.ProduceAsync("order-create", new Message<string, string> { Key = "order", Value = OrderJson }).GetAwaiter().GetResult();
            _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
        }
        catch (ProduceException<string, string> ex)
        {
            _logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
        }
    }
}
#endregion

消费者

#region 创建订单----1、创建订单集群或发送短信集群--消息分区-固定分区
{
    new Task(() =>
    {
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId = "order",
            EnableAutoCommit = false,
        };
        var builder = new ConsumerBuilder<string, string>(consumerConfig);
        var consumer = builder.Build();

        // 1、订阅
        consumer.Subscribe("create-order");
        // 1.1、获取偏移量
        string offset = distributedCache.GetString("create-order");
        if (string.IsNullOrEmpty(offset))
        {
            offset = "0";
        }
        // 1.2、重置偏移量
        consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 0), int.Parse(offset)));
        while (true)
        {
            // 2、消费
            var result = consumer.Consume();
            // 2.1、获取偏移量
            _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");
            _logger.LogInformation($"订单消息分区序号:Partition:{result.Partition}");

            // 3、业务处理
            string key = result.Key;
            string value = result.Value;
            _logger.LogInformation($"创建商品:Key:{key}");
            _logger.LogInformation($"创建商品:Order:{value}");

            // 2.2、把kafka队列中偏移量存起来。redis mysql
            // 2.3、重置kafka队列的偏移量
            distributedCache.SetString("create-order", result.Offset.Value.ToString());

            // 3、手动提交
            consumer.Commit(result);
        }
    }).Start();
}
#endregion

4.消息延迟处理

消费者

#region 创建订单----1、订单消息延迟处理
{
    new Task(() =>
    {
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId = "order",
            EnableAutoCommit = false,
            FetchMinBytes=170,
            FetchMaxBytes=3060
        };
        var builder = new ConsumerBuilder<string, string>(consumerConfig);
        using (var consumer = builder.Build())
        {
            // 1、订阅
            consumer.Subscribe("create-order-1");
            // 2、偏移量恢复
            string offset = distributedCache.GetString("create-order-1");
            if (string.IsNullOrEmpty(offset))
            {
                offset = "0";
            }
            consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order-1", 0), int.Parse(offset)));
            while (true)
            {
                // 1、恢复消息
                new Timer((s) =>
                {
                    consumer.Resume(new List<TopicPartition> { new TopicPartition("create-order-1", 0) });
                }, null, Timeout.Infinite, Timeout.Infinite).Change(5000, 5000);

                // 1.1、消费暂停
                consumer.Pause(new List<TopicPartition> { new TopicPartition("create-order-1", 0) });

                // 2、消费消息
                var result = consumer.Consume(); //批量获取消息,根据-----》字节数
                try
                {
                    // 2.1、获取偏移量
                    _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");

                    // 3、业务处理
                    string key = result.Key;
                    string value = result.Value;
                    _logger.LogInformation($"创建商品:Key:{key}");
                    _logger.LogInformation($"创建商品:Order:{value}");

                    // 2.2、把kafka队列中偏移量存起来。redis mysql
                    // 2.3、重置kafka队列的偏移量
                    distributedCache.SetString("create-order-1", result.Offset.Value.ToString());

                    // 3、手动提交
                    consumer.Commit(result);
                }
                catch (Exception)
                {

                    throw;
                } finally
                {
                    consumer.Pause(new List<TopicPartition> { new TopicPartition("create-order-1", 0) });
                    Console.WriteLine($"暂停消费");
                }
            }
        }

    }).Start();
}
#endregion
#region 创建订单----1、订单消息延迟处理-批量处理消息
{
    new Task(() =>
    {
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "127.0.0.1:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId = "order",
            EnableAutoCommit = false,
        };
        var builder = new ConsumerBuilder<string, string>(consumerConfig);
        using (var consumer = builder.Build())
        {

            // 1、订阅
            consumer.Subscribe("create-order");
            // 2、偏移量恢复
            string offset = distributedCache.GetString("create-order");
            if (string.IsNullOrEmpty(offset))
            {
                offset = "0";
            }
            consumer.Assign(new TopicPartitionOffset(new TopicPartition("create-order", 0), int.Parse(offset)));
            while (true)
            {
                // 1、恢复消息
                new Timer((s) =>
                {
                    consumer.Resume(new List<TopicPartition> { new TopicPartition("create-order", 0) });
                }, null, Timeout.Infinite, Timeout.Infinite).Change(5000, 5000);

                // 1.1、消费暂停
                consumer.Pause(new List<TopicPartition> { new TopicPartition("create-order", 0) });

                // 2、消费
                var result = consumer.Consume();
                // 2.1、获取偏移量
                _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");

                // 3、业务处理
                string key = result.Key;
                string value = result.Value;
                _logger.LogInformation($"创建商品:Key:{key}");
                _logger.LogInformation($"创建商品:Order:{value}");

                // 2.2、把kafka队列中偏移量存起来。redis mysql
                // 2.3、重置kafka队列的偏移量
                distributedCache.SetString("create-order", result.Offset.Value.ToString());

                // 3、手动提交
                consumer.Commit(result);
            }
        }
    }).Start();
}
#endregion

5.生产者失败重试和事务

失败重试

#region 2、生产者-失败重试
{
    var producerConfig = new ProducerConfig
    {
        BootstrapServers = "127.0.0.1:9092",
        MessageTimeoutMs = 50000,
        EnableIdempotence = true // 保证消息:不重复发送,失败重试
    };

    var builder = new ProducerBuilder<string, string>(producerConfig);
    using (var producer = builder.Build())
    {
        try
        {
            var OrderJson = JsonConvert.SerializeObject(orderCreateDto);
            // TopicPartition topicPartition = new TopicPartition("order-create-5",new Partition(0));
            var dr = producer.ProduceAsync("order-create-5", new Message<string, string> { Key = "order", Value = OrderJson }).GetAwaiter().GetResult();
            _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
        }
        catch (ProduceException<string, string> ex)
        {
            _logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
        }
    }
}
#endregion

事务

#region 3、生产者-失败重试-多消息发送
{
    var producerConfig = new ProducerConfig
    {
        BootstrapServers = "127.0.0.1:9092",
        MessageTimeoutMs = 50000,
        EnableIdempotence = true,
        TransactionalId = Guid.NewGuid().ToString()
    };

    var builder = new ProducerBuilder<string, string>(producerConfig);
    using (var producer = builder.Build())
    {
        // 1、初始化事务
        producer.InitTransactions(TimeSpan.FromSeconds(60));
        try
        {
            var OrderJson = JsonConvert.SerializeObject(orderCreateDto);
            // 2、开发事务
            producer.BeginTransaction();
            for (int i = 0; i < 100; i++)
            {
                var dr = producer.ProduceAsync("order-create-5", new Message<string, string> { Key = "order", Value = OrderJson }).GetAwaiter().GetResult();
                _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
            }
            // 3、提交事务
            producer.CommitTransaction();
        }
        catch (ProduceException<string, string> ex)
        {
            _logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", "order", ex.Error.Reason);
            // 4、关闭事务
            producer.AbortTransaction();
        }
    }
}
#endregion
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。