Flink对接Kafka常见问题与解决方案

举报
FI小粉丝 发表于 2021/08/28 16:26:13 2021/08/28
【摘要】 1.Flink对接kafka出现数据倾斜问题现象使用FlinkKafkaProducer进行数据生产过程中数据只写到了kafka的部分分区中,其它的分区没有数据写入。可能原因1:Flink写kafka使用的机制与原生接口的写入方式是有差别的,在默认情况下,Flink使用了并行度编号和分区数量做取模运算计算出来。那么会有以下两种场景:      1. 并行度%分区数量=0,表示并行度是kafk...

1.Flink对接kafka出现数据倾斜

  • 问题现象

使用FlinkKafkaProducer进行数据生产过程中数据只写到了kafka的部分分区中,其它的分区没有数据写入。

可能原因1Flink写kafka使用的机制与原生接口的写入方式是有差别的,在默认情况下,Flink使用了并行度编号和分区数量做取模运算计算出来。那么会有以下两种场景:

      1. 并行度%分区数量=0,表示并行度是kafkatopic分区数的多倍,数据的写入每个分区数据量是均衡的。

      2. 并行度%分区数量≠0,那么数据量势必会在每个分区上的数据量产生倾斜。

  • 解决方案
  1. 调整kafka的分区数跟flink的并行度保持一致,这种配置要求kafka的分区数跟flinkkafkasink并行度保持强一致性,好处在于每个并行度仅需要跟每个kafka分区所在的broker保持一个常链接即可。能够节省线程与分区之间调度的时间和并且节省内存空间
  2. 将生产侧的分区策略写成随机写入模式,如下图:

这样数据会随即写入kafka的所有分区中,但是会有一部分时间损耗在线程向寻址分区这个过程中。建议使用解决方案1

可能原因2

在部分算子中使用了keyby()方法,由于现网中的数据流的key值不一致(就是说某些key的数据量会非常大,有些又非常小),导致每个并行度中输出的数据流量不一致。

  • 解决方案

调整key的选择,建议用户在选择key时保证每个key的数据量是均衡散列的。例如:可以在key前加入一个范围数据段的随即数字,在每个并行度中将数据打散。

2.使用仅一次语义进行生产

  • 问题现象

Kafka开启仅一次语义后出现报错:

  • 可能原因

仅一次语义使用了两段提交的方式,在提交过程中会生成一个事务ID,如果上一个事务ID还没有提交成功,新的ID再提交就会导致重合

  • 解决方案
    1. 仅一次语义对于数据量小的业务比较适用,如果数据量大容易出现以上问题,因此如果没有特殊需求不要开启producer仅一次语义
    2. 根据Flink官网的解释,如果开启仅一次语义,需要做以下的优化
    3. 上述内容中提到的poolsize是以下内容

           

3.Flink消费kafka任务一段时间后重启,任务无法启动

  • 问题现象

  Flink对接kafka运行一段时间手动重启任务后,任务无法重启报Ask timed out on xxxx after [30000 ms]

  • 可能原因
  • akka相关的超时时间太短,网络状态不太好。
  • 内存配置太小,kafka消费的数据量太大导致任务失败。
  • 解决方案
  • 原因一解决方案:调整以下超时参数,增加akka的超时时间

         akka.ask.timeout 60s
         akka.watch.heartbeat.pause 120s
         akka.tcp.timeout 60s

  • 原因二解决方案1. 查看后台checkpoint的历史记录,是否有长时间的checkpoint超时失败,如果有长时间的超时失败说明,在消费kafka数据的过程中,消费偏移量没有正确提交,下次消费会从头开始消费。

       

       2. 如果数据从头开始消费,消费者初始的积压量可能会变大,通过命令./kafka-consumer-groups.sh查看积压量

       

       如果积压量比较大,kafka一次消费的数据量会非常大,如果后端对接的组件性能不足(例如:ESGUASSDB)可能会产生背压,如果taskmanager的内存不足可能会导致taskmanagerFullGC而失败。

需要在启动命令中调整taskmanager的启动内存,例如,如下命令中修改的-ytm参数

./flink run -m yarn-cluster  -p 2 -yjm 1024 -ytm 2048 -c ……

也可以通过kafka-consumer-group.sh 命令调整消费偏移量(注意:该操作会导致一部分数据消费不到需要征求客户同意)

命令如下:

./kafka-consumer-groups.sh --reset-offsets --to-earliest --group test  --bootstrap-server  kafka业务:21007 --command-config ../config/consumer.properties --all-topics --execute

其中

--reset-offsets 为重置offset操作

--to-earliest 所有分区重置到分区的第一条信息

--group 需要重置的groupid

--all-topics 表示要重置组内的所有topic,如果重置一个topic 这个地方可以写 –-topic topicName

--execute 表示立即执行。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。