【案例分享】Kafka异常分区修复典型场景
【问题描述】
Flink对接消费Kafka数据时,任务超时失败,Flink的JobManager日志上报“org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition trafficgo_realtime_traffic-0 could be determined”,如下所示:
【问题分析】
1、使用kafka-console-consumer.sh消费业务topic可以消费到数据,但是使用Flink2Kafka代码(代码中指定了消费组ID)无法消费到数据。命令如下:
kafka-console-consumer.sh --topic xx --bootstrap-server x.x.x.x:21007 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties
2、由以上现象判断,可能是由于Flink代码中指定的消费组存在问题。因为使用客户端命令kafka-console-consumer.sh消费时,加载的是客户端目录下consumer.properties配置文件(默认消费组是example-group1)。对比之下,两种消费方式在消费同一个topic,排除网络、配置无误的情况下,只有groupid不同。
3、describe查看代码指定的消费组,发现报“coordinator is not available”异常,如下图所示。
kafka-consumer-groups.sh --bootstrap-server x.x.x.x:21007 --describe --group <groupid> --command-config /opt/client/Kafka/kafka/config/consumer.properties
4、同时describe查看其它正常可以消费的groupid,结果正常,如下所示:
5、由3、4现象判断,现场使用的groupid所在__consumer_offsets分区丢失了leader(group coordinator节点就是消费组经过计算后的partitaion分区leader),查看__consumer_offsets情况如下:
6、由现场__consumer_offsets情况看,有两个问题:问题一:部分分区leader为none,且为none的分区Replicas里面没有当前集群当前存在的brokerid(集群实际brokerid为2、3、4);问题二:所有分区的Replicas里面都包含了除实际集群存在的brokerid外的其他id(怀疑集群做过缩容或者升级)。集群实际brokerid如下所示:
7、咨询现场后,确定在集群部署完成后做过缩容操作,broker节点从5个缩容至3个。现场缩容操作步骤是直接在Manager界面上删除Kafka的broker实例,方案未经过评审,导致系统自带的__consumer_offsets被删除节点的副本分片未迁移至现有broker节点,导致消费异常。
8、正确的缩容操作为:参考《XX版本容量调整指导书》章节中“Kafka减容操作指导”进行操作,生产环境变更前必须走RFC变更流程。
9、再回到现场无法消费的group组来分析,根据kafka消费组计算实际消费时所在__consumer_offsets的分区及coordinator节点,kafka官方内核的计算公式为:Math.abs(("groupid".hashCode() % 50)),Ps现场:Math.abs(("FloatingCarIndexAnalysisTest".hashCode() % 50))=34
10、根据第9步公式计算出现场使用的groupid所在的__consumer_offsets分区时34号,但是因为34号分区leader为none,zookeeper上查看34分区state为-1。所以该消费组的coordinator不存在。无法消费到数据。其他分区leader为none现象一样。
总结:由于现场未按照正常缩容指导流程进行缩容,导致__consumer_offsets、__default_metrics等默认系统topic分区丢失,需重新恢复分区。
【临时规避方案】
重新定义groupid,根据公式计算后若分区leader正常即可消费到数据,应急使用。
【解决方案】
1、查询需要修复的异常分区
2、登录zk客户端查询异常topic的元数据
source /opt/client/bigdata_env
kinit XXX 密码:XXXX
sh zkCli.sh -server x.x.x.x:24002/kafka --x.x.x.x为zk节点业务ip
执行如下命令:
addauth krbgroup
get /brokers/topics/test --test为异常topic名称
3、将查出来的粘出来,然后备份
4、修改查出来的副本信息至目的实例brokerid,然后set
set /brokers/topics/__consumer_offsets
{"version":2,"partitions":{"4":[2,3,4],"5":[3,2,4],"1":[4,3,2],"0":[3,4,2],......"2":[2,4,3],"3":[4,2,3]}}
原则:第一个副本为优先副本会被选举为leader,例如:"4":[2,3,4],"5":[3,2,4],其中2和3为优先副本,建议在set时尽量均衡各个节点的leader,避免系统热点压力。
备注:此操作为高危操作,执行下务必反复检查后执行;现网生产环境务必走RFC变更流程。
5、查看修改后的副本信息
get /brokers/topics/test 查看是否修改成功
6、重启或滚动重启kafka服务。
- 点赞
- 收藏
- 关注作者
评论(0)