关于kafka的Producer的粘性分区的坑

举报
Bigdata Go! 发表于 2022/11/28 10:40:26 2022/11/28
【摘要】 kafka的粘性分区特性是在kafka的2.0版本以后提出的,旨在提升生产者发送数据的调度性能,笔者在生产环境使用过程中发现了一个致命的问题并在此处分享

问题背景:

     某集群集群做了kafka集群切换zk服务变更。在变更期间,停止多个kafka业务(主要是Flink流作业)30分钟左右。变更结束后,恢复kafka业务,其中一个Flink作业写Kafka不稳定,运行几分钟后其中一个partition写入超时,导致作业失败。

               cke_805.png

问题分析:

  1. 从报错堆栈上来看,数据向topic所在节点发送数据过程中出现了超时,也就是producer端与kafka端连接超时。触发报错的场景一般有两种原因:

    (1)网络问题:发送数据的客户端到服务端之间存在一定网络延迟,导致发送失败。通过检测网络质量,这个场景能够排除。

    (2)Kafka服务端异常。分区所在的kafka节点本身存在异常,导致数据发送超时。例如:磁盘、CPU等硬件资源使用过载会出现处理能力下降等。

   在登录这个异常的broker节点后,通过磁盘io命令查看磁盘使用率,发现磁盘io长时间处于了90%以上。

          cke_2212.png

     2. 磁盘io长时间处于100%,与节点的数据流量异常有关。对比异常的broker节点和其它的broker节点,发现数据流量较之前增长明显。并且这个节点上的分区大小相比较于其它分区大了将近10

             cke_9347.png

              cke_15564.png

    3. 通常出现这种情况时,往往有如下几种场景:

     (1)数据带了key而导致的数据倾斜。例如如下写法:

                     cke_30551.png

               通过排查业务侧的代码。数据中并没有带有key值,因此该假设不成立。

       (2)分区倾斜:业务的数据量大,但是分区数量少。Topic的分区数均衡200分区,每个分区数量不一致。如果分区倾斜每个分区中的数据量应该是一致的。不会出现这种现象。故不成立。

      4. 通过观查,出现问题的节点只有一个。也就是说,只要topic的分区在异常的broker节点上,这些分区的数据量就会异常。如果停止了这个异常节点,流量会迁移到另外一个节点上

原因分析

      Producer的粘性分区特性

        先来了解一下2.4版本以后引进的粘性特性:https://cwiki.apache.org/confluence/display/KAFKA/KIP-480:+Sticky+Partitioner

关于粘性的介绍可以看下这篇文章:https://developer.huawei.com/consumer/cn/forum/topic/0203860215257330221?fid=0101592429757310384

     划重点:

             在6.5.1版本(1.1.0版本)之前,如果数据没有key默认的分区散列算法如下:

                             cke_63035.png

                                                                图一:原始的分区散列算法

  1. 每条数据会随机选择一个分区
  2. 数据进入分区所在的Deque队列,deque队列中以batch为单位进行数据缓存,每个batch大小默认为16384bytes(由生产者参数batch.size决定)
  3. 当满足batch大小满足条件或者超过ling.ms设定时间时,触发数据发送。

    651版本(kafka为2.4版本)以后,数据不带key的默认发送场景为粘性发送。

                 ​​​​​​​cke_87921.png

                                                        图二:粘性分区散列算法

    (1)随即挑选一个可用分区(如果leader不为-1或者none均为可用分区,被选择过的分区在下次选择时候不再作为候选分区)。见代码:

                   cke_111114.png

    (2) 当至少将分区填满或者达到linger.ms上限后,发送整个分区的数据。

             根据上述说明,当设置了linger.ms就意味着要等到到达linger.ms设置的限定时间或者batch.size后才能发送数据。 

        1. 使用原始的发送方法。数据均匀散列到各个分区,batch.size很难填满,此时就必须要等待到达linger.ms设定的时间限制。在到达时间后,topic的所有分区同时发送请求,例如图一中的topic有三个分区,等待时间超过linger.ms后才会发送请求。

        2.  使用粘性分区发送。数据会集中发送到一个分区,这个分区会写满一个batch才会选择另外一个分区。如果在linger.ms设定的时间内写满,那么就会体现发送这个batch的数据,并且在同一时间只产生一个请求。

        通过比对,粘性分区从吞吐率和资源使用上都有一定程度的优化。但是粘性设计仍然存在一定的缺陷。见粘性优化方案:KAFKA-10888

    二,粘性分区的问题

    ​​​​​​​     回到问题中,为什么粘性会带来数据倾斜。上文提到如果数据的发送依赖于linger.ms和batch.size两个参数。在默认情况下linger.ms会配置为0,也就是立即发送。这样每个分区中的数据难达到batch.size的大小就会立即发送。  

         生产者中有一个参数能够限制生产者最大的请求数量:max.in.flight.requests.per.connection 该参数能够限制生产者与一个broker的链接上最大的请求数量,也就是说当生产者与broker建立一个常链接后,这个链接上能够持有的最多未通过acks确认的发送请求最大数。默认值为5。假设5个链接全部被占用,那么生产者中的数据将的缓存起来,当有可用的链接时。缓存中的数据将以batch的形式发出去。

          在有可用发送线程的情况,如下图:

                           cke_307902.png

          Producerbatch能够及时发送到kafka的broker节点,并且由于linger.ms设置为0,batch.Size不会写满就会发送。

    如果kafkabroker节点出现性能问题,例如CPU、磁盘IO、网络等问题导致节点响应慢,就会出现大批量的batch挤压,多数batch都会被填满。如下图:

                              cke_335355.png

    此时,这样就会产生这样的现象:

    1.无异常的节点batch无法写满,发送的量少,分区中的数据量少。

    2. 异常节点由于响应慢,请求池被占用完,大量的数据挤压,每个batch的数据全部写满。分区中的数据会越来越多。最后所达到的现象就是。每个分区的数据量差异变大。


                          ​​​​​​​​​​​​​​cke_378672.png

    更严重的是,如果这个现象一旦出现,性能差的节点会成为短板节点,很难自行恢复,并且性可能会越来越差。

    解决方案:

    通过修改分区散列算法能够规避这个问题。

    (1)Kafka生产者原生API:将散列算法修改为RoundRobin 随机算法。如下配置

                   cke_424661.png

              初始化properties时加入配置:"partitioner.class",并且修改value"org.apache.kafka.clients.producer.RoundRobinPartitioner"

    (2)如果使用的是Flink作为生产者。如果配置了下图中的内容将使用粘性分区。

                  cke_452243.png

    可以将上图的红框内容替换为Optional.of(new FlinkFixedPartitioner<>())

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。