阿里四面:kafka何时、如何删除Topic?

举报
JavaEdge 发表于 2022/01/01 22:29:35 2022/01/01
【摘要】 Kafka有很多状态机和管理器,如Controller通道管理器ControllerChannelManager、处理Controller事件的ControllerEventManager等。这些管理器和状态机,大多与各自“宿主”联系密切。就如Controller这俩管理器,必须与Controller组件紧耦合,才能实现各自功能。Kafka还有一些状态机和管理器,具有相对独立的功能框架,不严...

Kafka有很多状态机和管理器,如Controller通道管理器ControllerChannelManager、处理Controller事件的ControllerEventManager等。这些管理器和状态机,大多与各自“宿主”联系密切。就如Controller这俩管理器,必须与Controller组件紧耦合,才能实现各自功能。

Kafka还有一些状态机和管理器,具有相对独立的功能框架,不严重依赖使用方,如:

  • TopicDeletionManager(主题删除管理器)

    负责对指定Kafka主题执行删除操作,清除待删除主题在集群上的各类“痕迹”。

  • ReplicaStateMachine(副本状态机)

    负责定义Kafka副本状态、合法的状态转换,以及管理状态之间的转换。

  • PartitionStateMachine(分区状态机)

    负责定义Kafka分区状态、合法的状态转换,以及管理状态之间的转换。

本文看看Kafka是如何删除一个主题的。

前言

以为成功执行kafka-topics.sh --delete命令后,主题就会被删除。这种不正确的认知会导致经常发现主题没被删干净。于是,网传终极“武林秘籍”:手动删除磁盘上的日志文件,手动删除ZooKeeper下关于主题的各节点。但我不推荐这么干:

  • 并不完整

    除非你重启Broker,否则,这套“秘籍”无法清理Controller端和各个Broker上元数据缓存中的待删除主题的相关条目

  • 并没有被官方所认证,后果自负

与其琢磨删除主题失败之后怎么自救,还是研究Kafka到底如何执行该操作。TopicDeletionManager.scala包括:

  • DeletionClient接口:负责实现删除主题以及后续的动作

    如更新元数据

  • ControllerDeletionClient类:实现DeletionClient接口的类,分别实现了刚刚说到的那4个方法。

  • TopicDeletionManager类:主题删除管理器类

    定义方法维护主题删除前后集群状态的正确性。如,何时删除主题、何时主题不能被删除、主题删除过程中要规避哪些操作等

DeletionClient接口及实现

删除主题,并将删除主题的事件同步给其他Broker。

DeletionClient接口目前只有一个实现类ControllerDeletionClient,构造器的两个字段:

  • KafkaController实例

    Controller组件对象

  • KafkaZkClient实例

    Kafka与ZooKeeper交互的客户端对象

API

deleteTopic

删除主题在zk上的所有“痕迹”。分别调用KafkaZkClient的3个方法删除ZooKeeper下/brokers/topics/节点、/config/topics/节点和/admin/delete_topics/节点。

deleteTopicDeletions

删除zk下待删除主题的标记节点。调用KafkaZkClient#deleteTopicDeletions,批量删除一组主题在/admin/delete_topics下的子节点。注意,deleteTopicDeletions这个方法名结尾的Deletions,表示/admin/delete_topics下的子节点。所以:

  • deleteTopic是删除主题
  • deleteTopicDeletions是删除/admin/delete_topics下的对应子节点

这两个方法里都有epochZkVersion字段,代表期望的Controller Epoch版本号。若使用一个旧Epoch版本号执行这些方法,zk会拒绝,因为和它自己保存的版本号不匹配。若一个Controller的Epoch<ZooKeeper中保存的,则该Controller很可能是已过期的Controller。这就是Zombie Controller。epochZkVersion字段的作用,就是隔离Zombie Controller发送的操作。

mutePartitionModifications

屏蔽主题分区数据变更监听器:取消/brokers/topics/节点数据变更的监听。

当该主题的分区数据发生变更后,由于对应zk监听器已被取消,因此不会触发Controller相应处理逻辑。

为何取消该监听器?为避免操作相互干扰:假设用户A发起主题删除,同时用户B为这个主题新增分区。此时,这两个操作就会冲突,若允许Controller同时处理这俩操作,势必会造成逻辑混乱及状态不一致。为应对这种情况,在移除主题副本和分区对象前,代码要先执行这个方法,确保不再响应用户对该主题的其它操作。

mutePartitionModifications调用unregisterPartitionModificationsHandlers,并接着调用KafkaZkClient#unregisterZNodeChangeHandler,取消zk上对给定主题的分区节点数据变更的监听。

sendMetadataUpdate

调用KafkaController#sendUpdateMetadataRequest,给集群所有Broker发送更新请求,告诉它们不要再为已删除主题的分区提供服务:

该方法会给集群中的所有Broker发送更新元数据请求,告知它们要同步给定分区的状态。

TopicDeletionManager定义及初始化

创建TopicDeletionManager类实例

在KafkaController类初始化时被创建:

实例化了一个全新的ControllerDeletionClient对象,然后利用该对象实例和replicaStateMachine、partitionStateMachine,一起创建TopicDeletionManager实例。

KafkaServerStartable.startup()=》KafkaServer.startup()=》KafkaController.init=》TopicDeletionManager

TopicDeletionManager重要API

除了类定义和初始化,还有resumeDeletions:重启主题删除操作过程。

主题因为某些事件可能一时无法完成删除,如主题分区正在进行副本重分配等。一旦这些事件完成,主题重新具备可删除资格。就需调用resumeDeletions重启删除操作。

  • 从元数据缓存中获取要删除主题列表,之后定义了两个空的主题列表,分别保存待重试删除主题和待删除主题
  • 遍历每个要删除的主题,去看它所有副本的状态。如果副本状态都是ReplicaDeletionSuccessful,就表明该主题已经被成功删除,此时,再调用completeDeleteTopic方法,完成后续的操作就可以了。对于那些删除操作尚未开始,并且暂时无法执行删除的主题,源码会把这类主题加到待重试主题列表中,用于后续重试;如果主题是能够被删除的,就将其加入到待删除列表中。
  • 最后,调用retryDeletionForIneligibleReplicas重试待重试主题列表中的主题删除操作。对待删除主题列表中的主题则调用onTopicDeletion删除。

retryDeletionForIneligibleReplicas重试主题删除:将对应主题副本的状态,从ReplicaDeletionIneligible变更到OfflineReplica。这样,后续再次调用resumeDeletions,就会尝试重新删除主题。

下面,我再用一张图来解释下resumeDeletions方法的执行流程:

resumeDeletions串联起了TopicDeletionManger中的很多方法,较关键的:

completeDeleteTopic:

onTopicDeletion:

onTopicDeletion会多次使用分区状态机,调整待删除主题的分区状态。最后调用onPartitionDeletion执行真正的底层物理磁盘文件删除。这是通过副本状态机状态转换操作完成的。

总结

在主题删除过程中,Kafka会调整集群中三个地方的数据:

  • ZooKeeper

    删除主题时,zk上与该主题相关的所有ZNode节点必须被清除

  • 元数据缓存

    Controller端元数据缓存中的相关项,也必须要被处理,并且要被同步到集群的其他Broker上

  • 磁盘日志文件

    要清理的首要目标

这三个地方须统一处理,就好似原子操作。回想“秘籍”,它无法清除Controller端的元数据缓存项。因此,避免使用这“大招”。

DeletionClient接口主要是操作ZooKeeper,实现ZooKeeper节点的删除等操作。

TopicDeletionManager,是在KafkaController创建过程中被初始化的,主要通过与元数据缓存进行交互的方式,来更新各类数据。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。