Kafka使用 kafka-consumer-groups.sh --list查询不到消费者组

举报
Summer_ 发表于 2022/04/28 14:30:20 2022/04/28
【摘要】 问题描述:FusionInsight C80版本,通过Flink提交消费Kafka的任务,消费Kafka的消费者组信息通过kafka-consumer-groups.sh --list命令查询不到,但是通过kafka-consumer-groups.sh --describe可以查询到信息。问题分析:1、C80版本kafka-consumer-groups.sh命令使用list查询group...

问题描述:

FusionInsight C80版本,通过Flink提交消费Kafka的任务,消费Kafka的消费者组信息通过kafka-consumer-groups.sh --list命令查询不到,但是通过kafka-consumer-groups.sh --describe可以查询到信息。

问题分析:

1、C80版本kafka-consumer-groups.sh命令使用list查询groupid源码分析如下:

a. kafka-consumer-groups.sh命令的入口是执行ConsumerGroupCommand中的main函数

005.png

b. ConsumerGroupCommand中的main函数执行时根据使用的是--zookeeper还是--bootstrap-server判断consumerGroupServiceZkConsumerGroupService还是KafkaConsumerGroupService

如果命令中有--list,就执行listGroups().foreach(println(_))


006.png

c. KafkaConsumerGroupServicelistGroups()的实现是通过adminClient获取所有的consumerGroup,源码如下:

007.png

d. AdminClient中获取listAllConsumerGroupsFlattened的实现是获取所有的group组并过滤出所有协议是consumergroup组,源码如下:

008.png

2. 提交一个flink作业,groupid设置为“testgroup”,使用Kafka中的AdminClient来获取listAllGroupsFlattened的信息,执行结果如下:

009.png

如上图所示:执行结果中GroupOverviewtestgroup对应的协议为空,而不是consumer,所以,使用list查询不到结果。


3. Flink作业消费kafka时,GroupOverviewgroupid对应的协议为空的原因是:flink不是直接调用的kafka consumer client端的消费接口,而是通过自己的逻辑去消费、只用kafka中的__consumer_offsets保存数据。

4. 此问题在651版本已经解决,651版本中增加的协议为空的判断逻辑,源码如下所示:

010.png

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。