Kafka使用 kafka-consumer-groups.sh --list查询不到消费者组
问题描述:
FusionInsight C80版本,通过Flink提交消费Kafka的任务,消费Kafka的消费者组信息通过kafka-consumer-groups.sh --list命令查询不到,但是通过kafka-consumer-groups.sh --describe可以查询到信息。
问题分析:
1、C80版本kafka-consumer-groups.sh命令使用list查询groupid源码分析如下:
a. kafka-consumer-groups.sh命令的入口是执行ConsumerGroupCommand中的main函数
b. ConsumerGroupCommand中的main函数执行时根据使用的是--zookeeper还是--bootstrap-server判断consumerGroupService是ZkConsumerGroupService还是KafkaConsumerGroupService。
如果命令中有--list,就执行listGroups().foreach(println(_))
c. KafkaConsumerGroupService中listGroups()的实现是通过adminClient获取所有的consumerGroup,源码如下:
d. AdminClient中获取listAllConsumerGroupsFlattened的实现是获取所有的group组并过滤出所有协议是consumer的group组,源码如下:
2. 提交一个flink作业,groupid设置为“testgroup”,使用Kafka中的AdminClient来获取listAllGroupsFlattened的信息,执行结果如下:
如上图所示:执行结果中GroupOverview中testgroup对应的协议为空,而不是consumer,所以,使用list查询不到结果。
3. Flink作业消费kafka时,GroupOverview中groupid对应的协议为空的原因是:flink不是直接调用的kafka consumer client端的消费接口,而是通过自己的逻辑去消费、只用kafka中的__consumer_offsets保存数据。
4. 此问题在651版本已经解决,651版本中增加的协议为空的判断逻辑,源码如下所示:
- 点赞
- 收藏
- 关注作者
评论(0)