kafka源码解析之四:性能关键路径controller分析

举报
步步清风 发表于 2017/12/20 10:31:27 2017/12/20
【摘要】 对于kafka,我们不仅在单机、集群、参数调优等方面,对kakfa全方位研究,更要在代码层解读,从原理到代码细节一一打开看看。(声明:所有这系列代码分析章节,有些图片/资料整理来自网络公开资料,站在巨人肩膀上二次仔细解读代码总结分享给大家)

1.      原理与代码分析

从kafka的一个节点中选举作为controller,负责管理分区和副本状态并执行管理着这些分区的重新分配。目前还包括AutoRebalance线程与Topic删除线程等内容

名词解释:

ISR:同步副本组

OfflinePartitionLeaderSelector:分区下线后新的领导者选举

OAR:老的分配副本

1.1      代码以及示意图

Controller代码文件夹中:

image.png

1.     ControllerChannelManager: 管理controller与broker之间连接的类

2.     KafkaController:  controller类主要逻辑都在 这个类里面

3.     PartitionLeaderSelector: 实现多种状态变迁的选举

4.     PartitionStateMachine: 分区状态机的实现类

5.     ReplicaStateMachine:  副本状态机的实现类

6.     TopicDeletionManager: Topic删除线程实现

内部结构图:

image.png


1.     Controller Context: 负责保存controller上下文信息,包括所有Topic列表、分区副本分配状态、分区Leader和ISR信息等。并提供了一些有用的接口例如或者所有可用Broker列表、可用副本等。

ControllerChannelManager:  负责管理controller 与各个Broker之间的连接。针对每个broker, controller会创建一个RequestSendThread线程。这个线程会一直阻塞等待在命令队列中,直到有命令过来就会和broker创建一个连接并发送请求命令,当命令执行完会断开与broker的连接。

2.     PartitionStateMachine:分区状态机的实现类。通过对Topic目录与分区目录的监控,在目录发送变化时触发状态变更。目前主要订阅的Listener有:AddPartitionListener、TopicChangeListener、DeleteTopicsListener。

例如创建新Topic,在/broker/topics目录下创建新的以Topic名称的目录, TopicChangeListener发现这个目录增加后,会查找出那些Topic是新增的然后调用onNewTopicCreation

3.     ReplicaStateMachine: 副本状态机的实现类。主要订阅的Listener有:BrokerChangeListener。如果有新创建的broker加入,则会从BrokerChangeListener触发onBrokerStartup;如果有Broker宕机,则会触发onBrokerFailure

4.     ZookeeperLeaderElector: 主要负责controller leader选举。主要订阅的Listener是LeaderChangeListener。因为/controller目录是临时节点,如果controller broker节点宕机,则LeaderChangeListener会触发其他节点进行重新选举。

5.     PartitionReassignedListener: 主要用于迁移分区的时候触发分区迁移操作的Listener,执行迁移操作onPartitionReassignment.

6.     PreferredReplicaElectionListener: 主要用于侦听优选副本选举目录的变化,触发优选副本选举,具体操作是onPreferredReplicaElection

7.     IsrChangeNotificationListener: 主要用于侦听ISR变迁,触发controller的更新ISR变化并通知其他节点。这个ISR的改动可以由副本追上Leader后更新ISR的变化,导致controller感知然后通知其他节点更新ISR;也可以由Leader选举直接修改ISR通知其他节点。

1.1      对分区的管理

PartitionStateChange

其有效状态如下:

  • NonExistentPartition:  这种状态表明该分区从来没有创建过或曾经创建过后来又删除了。

  • NewPartition:创建分区后,分区处于NewPartition状态。在这种状态下,分区副本应该分配给它,但还没有领导者/同步复制组。

  • OnlinePartition:一旦一个分区领导者被选出,就会为在线分区状态。

  • OfflinePartition:如果分区领导者成功选举后,当领导者分区崩溃或挂了,分区状态转变下线分区状态。

其有效的状态转移如下:

NonExistentPartition -> NewPartition

       1.群集中央控制器根据计算规则,从zk中读取分区信息,创建新分区和副本。

NewPartition -> OnlinePartition

       1.分配第一个活着的副本作为分区领导者,并且该分区所有副本作为一个同步复制组,写领导者和同步副本组数据到zk中。

       2.对于这个分区,发送LeaderAndIsr请求给每一个副本分区和并发送UpdateMetadata请求到每个活者的broker server。

OnlinePartition,OfflinePartition -> OnlinePartition

       1.对于这个分区,需要选择新的领导者和同步副本组,一个副本组要接受LeaderAndIsr请求,最后写领导者和同步副本组信息到zk中。

               a.OfflinePartitionLeaderSelector:新领导者=存活副本(最好是在isr);新isr =存活isr如果不是空或恰好为新领导者,否则;正在接受中副本=存活已分配副本。

              b.ReassignedPartitionLeaderSelector:新领导者=存活分区重新分配副本;新isr =当前isr;正在接受中副本=重新分配副本

              c.PreferredReplicaPartitionLeaderSelector:新领导这=第一次分配副本(如果在isr);新isr =当前isr;接受副本=分配副本

              d.ControlledShutdownLeaderSelector:新领导者=当前副本在isr中且没有被关闭,新isr =当前isr -关闭副本;接受副本=存活已分配副本。

     2.对于这个分区,发送LeaderAndIsr请求给每一个接收副本和UpdateMetadata请求到每个broker server

     NewPartition,OnlinePartition -> OfflinePartition

     1.这只不过标识该分区为下线状态

     OfflinePartition -> NonExistentPartition

1.     这只不过标识该分区为不存在分区状态

 

1.2      Partition重新分配

管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发ReassignedPartitionsIsrChangeListener,从而通过执行回调函数KafkaController.onPartitionReassignment来完成以下操作:

1.  将ZooKeeper中的AR(Current Assigned Replicas)更新为OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。

2.  强制更新ZooKeeper中的leader epoch,向AR中的每个Replica发送LeaderAndIsrRequest。

3.  将RAR - OAR中的Replica设置为NewReplica状态。

4.  等待直到RAR中所有的Replica都与其Leader同步。

5.  将RAR中所有的Replica都设置为OnlineReplica状态。

6.  将Cache中的AR设置为RAR。

7.  若Leader不在RAR中,则从RAR中重新选举出一个新的Leader并发送LeaderAndIsrRequest。若新的Leader不是从RAR中选举而出,则还要增加ZooKeeper中的leader epoch。

8.  将OAR - RAR中的所有Replica设置为OfflineReplica状态,该过程包含两部分。第一,将ZooKeeper上ISR中的OAR - RAR移除并向Leader发送LeaderAndIsrRequest从而通知这些Replica已经从ISR中移除;第二,向OAR - RAR中的Replica发送StopReplicaRequest从而停止不再分配给该Partition的Replica。

9.  将OAR - RAR中的所有Replica设置为NonExistentReplica状态从而将其从磁盘上删除。

10. 将ZooKeeper中的AR设置为RAR。

11. 删除/admin/reassign_partition。

注意:最后一步才将ZooKeeper中的AR更新,因为这是唯一一个持久存储AR的地方,如果Controller在这一步之前crash,新的Controller仍然能够继续完成该过程。

以下是Partition重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition重新分配过程中ZooKeeper中的AR和Leader/ISR路径如下

AR

leader/isr

Sttep

{1,2,3}

1/{1,2,3}

(initial state)

{1,2,3,4,5,6}

1/{1,2,3}

(step 2)

{1,2,3,4,5,6}

1/{1,2,3,4,5,6}

(step 4)

{1,2,3,4,5,6}

4/{1,2,3,4,5,6}

(step 7)

{1,2,3,4,5,6}

4/{4,5,6}

(step 8)

{4,5,6}

4/{4,5,6}

(step 10)


1.3      对副本的管理

ReplicaStateChange:

有效状态如下:

     1.NewReplica:当创建topic或分区重新分配期间副本被创建。在这种状态下,副本只能成为追随者变更请求状态。

     2.OnlineReplica:一旦此分区一个副本启动且部分分配副本,他将处于在线副本状态。在这种状态下,它可以成为领导者或成为跟随者状态变更请求。

     3.OfflineReplica:每当broker server副本宕机或崩溃发生时,如果一个副本崩溃或挂了,它将变为此状态。

     4.NonExistentReplica:如果一个副本被删除了,它将变为此状态。

有效状态转移如下:

     NonExistentReplica - - > NewReplica

     1.使用当前领导者和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker

     NewReplica - > OnlineReplica

     1.添加新的副本到副本列表中

      OnlineReplica,OfflineReplica - > OnlineReplica

     1.使用当前领导者和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker

      NewReplica,OnlineReplica - > OfflineReplica

     1.发送StopReplicaRequest到相应副本(w / o删除)

     2.从isr中删除该副本,发送LeaderAndIsr(New ISR)请求到领导者副本去删除此副本和发送该分区的UpdateMetadata请求到每个存活broker。

     OfflineReplica - > NonExistentReplica

1.     发送StopReplicaRequest到副本(删除)

1.4      对Topic的管理

Topic创建

Controller通过partition状态机watch Zookeeper的/broker/topics目录,每当有新的topic创建时,就会由partition状态机的topic change listener触发onNewTopicCreation操作,具体步骤如下:

1.     为每个新创建的Topic注册partition change listener,侦听分区的变化

2.     对Topic的所有分区触发onNewPartitionCreation操作,流程如下:

2.1  所有分区的状态改变为NewPartition

2.2  所有副本的状态改变为NewReplica

2.3  把所有分区的从NewPartition状态改为OnlinePartition,主要是通过初始化每个分区的Leader和ISR,并写入zk的持久化路径:/brokers/topics/[topic]/partitions/[partition],然后发送LeaderAndISR请求到各个相关的Broker

2.4  所有分区的副本状态从NewReplica转变为OnlineReplica

Topic删除

   Topic的删除是通过Controller后台的删除Topic线程来完成的,因为涉及到数据的删除,所以处理流程要比创建Topic复杂,具体流程如下:

image.png

. Kafka controller在启动的时候会注册对于Zookeeper节点/admin/delete_topics的子节点变更监听器——上面的分析已经告诉 我们,delete命令实际上就是要在该节点下创建一个临时节点,名字是待删除topic名,标记该topic是待删除的

2. Kafka controller在启动时创建一个单独的线程,执行topic删除的操作 (由DeleteTopicsThread类实现)

3. 线程启动时查看是否有需要进行删除的topic——假设我们是在controller启动之后执行的topic删除命令,那么该线程刚启动的时候待删除的topic集合应该就是空的

4. 一旦发现待删除topic集合是空,topic删除线程会被挂起

5. 这时,我们执行delete操作,删除topic: test-topic,delete命令在/admin/delete_topics下创建子节点test-topic

6. 监听器捕获到该变更,立刻触发删除逻辑

    6.1 查询test-topic是否存在,不存在的话直接删除/admin/delete_topics/test-topic节点——随便删除一个不存在的 topic,删除命令也只是创建/admin/delete_topics/[topicName]的节点,它不负责做存在性校验

    6.2 查询一下test-topic是不是当前正在执行Preferred副本选举或分区重分配,如果是的话,肯定是不适合进行删除掉的。Controller 本地会缓存当前无法进行删除的topic集合,待分区重分配完成或preferred副本选举后单独处理该集合中的topic

    6.3 如何两者都不是的话说明现在可以进行删除操作,那么就恢复挂起的删除线程执行删除操作

删除线程执行删除操作的真正逻辑是:

1. 它首先会给当前所有broker发送更新元数据信息的请求,告诉这些broker说这个topic要删除了,你们可以把它的信息从缓存中删掉了

2. 开始删除这个topic的所有分区

    2.1 给所有broker发请求,告诉它们这些分区要被删除。broker收到后就不再接受任何在这些分区上的客户端请求了

    2.2 把每个分区下的所有副本都置于OfflineReplica状态,这样ISR就不断缩小,当leader副本最后也被置于OfflineReplica状态时leader信息将被更新为-1

    2.3 将所有副本置于ReplicaDeletionStarted状态

    2.4 副本状态机捕获状态变更,然后发起StopReplicaRequest给broker,broker接到请求后停止所有fetcher线程、移除缓存,然后删除底层log文件

    2.5 关闭所有空闲的Fetcher线程

3. 删除zookeeper下/brokers/topics/test-topic节点

4. 删除zookeeper下/config/topics/test-topic节点

5. 删除zookeeper下/admin/delete_topics/test-topic节点

6. 更新各种缓存,把test-topic相关信息移除出去

 

1.1      对Broker的管理

Broker启动

Broker启动后首先根据其ID在ZooKeeper的/brokers/ids zonde下创建临时子节点(Ephemeral node),创建成功后Controller的ReplicaStateMachine注册其上的Broker Change Watch会被fire,从而通过回调KafkaController.onBrokerStartup方法完成以下步骤:

  1. 向所有活跃Brokers或者ShutingDown Brokers发送UpdateMetadataRequest请求,老的Broker会通过这个请求知道这个新的Broker。其定义如下:

image.png


  1. 将新启动的Broker上的所有Replica设置为OnlineReplica状态,同时这些Broker会为这些Partition启动high watermark线程。

  2. 通过partitionStateMachine触发OnlinePartitionStateChange。

  3. 如果有分区迁移任务的分区落在新启动的broker上,则触发这些分区的迁移onPartitionReassignment

  4. 检查新启动broker的副本是否属于正在删除topic,如果有则发送信号触发topic删除线程的工作。

Broker failover

  1. Controller在Zookeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的Znode会自动被删除,Zookeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。

  2. Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。

  3. 对set_p中的每一个Partition:
           3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。
           3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
           3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有Controller版本在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。

直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
  Broker failover顺序图如下所示。

image.png

LeaderAndIsrRequest结构如下
image.png

LeaderAndIsrResponse结构如下
image.png

1.1      Controller Failover

Controller也需要Failover。每个Broker都会在Controller Path (/controller)上注册一个Watch。当前Controller失败时,对应的Controller Path会自动消失(因为它是Ephemeral Node),此时该Watch被fire,所有“活”着的Broker都会去竞选成为新的Controller(创建新的Controller Path),但是只会有一个竞选成功(这点由Zookeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为Zookeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注册。

Broker成功竞选为新Controller后会触发KafkaController.onControllerFailover方法,并在该方法中完成如下操作:

  1. 读取并增加Controller Epoch。

  2. 在ReassignedPartitions Path(/admin/reassign_partitions)上注册Watch。

  3. 在PreferredReplicaElection Path(/admin/preferred_replica_election)上注册Watch。

  4. 通过partitionStateMachine在Broker      Topics Patch(/brokers/topics)上注册Watch。

  5. 若delete.topic.enable设置为true(默认值是false),则partitionStateMachine在Delete      Topic Patch(/admin/delete_topics)上注册Watch。

  6. 通过replicaStateMachine在Broker      Ids Patch(/brokers/ids)上注册Watch。

  7. 初始化ControllerContext对象,设置当前所有Topic,“活”着的Broker列表,所有Partition的Leader及ISR等。

  8. 启动replicaStateMachine和partitionStateMachine。

  9. 将brokerState状态设置为RunningAsController。

  10. 将每个Partition的Leadership信息发送给所有“活”着的Broker。

  11. 若auto.leader.rebalance.enable配置为true(默认值是true),则启动partition-rebalance线程。

  12. 若delete.topic.enable设置为true且Delete      Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

1.      可能瓶颈

1.     所有的状态变迁事件由单一controller管理,并且每次操作都要加一把大锁。

2.     Controller节点资源的受限(线程、网络连接、CPU、内存等),例如需要对每个Broker创建一个RequestSend线程

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。