kafka源码解析之四:性能关键路径controller分析
1. 原理与代码分析
从kafka的一个节点中选举作为controller,负责管理分区和副本状态并执行管理着这些分区的重新分配。目前还包括AutoRebalance线程与Topic删除线程等内容
名词解释:
ISR:同步副本组
OfflinePartitionLeaderSelector:分区下线后新的领导者选举
OAR:老的分配副本
1.1 代码以及示意图
Controller代码文件夹中:
1. ControllerChannelManager: 管理controller与broker之间连接的类
2. KafkaController: controller类主要逻辑都在 这个类里面
3. PartitionLeaderSelector: 实现多种状态变迁的选举
4. PartitionStateMachine: 分区状态机的实现类
5. ReplicaStateMachine: 副本状态机的实现类
6. TopicDeletionManager: Topic删除线程实现
内部结构图:
1. Controller Context: 负责保存controller上下文信息,包括所有Topic列表、分区副本分配状态、分区Leader和ISR信息等。并提供了一些有用的接口例如或者所有可用Broker列表、可用副本等。
ControllerChannelManager: 负责管理controller 与各个Broker之间的连接。针对每个broker, controller会创建一个RequestSendThread线程。这个线程会一直阻塞等待在命令队列中,直到有命令过来就会和broker创建一个连接并发送请求命令,当命令执行完会断开与broker的连接。
2. PartitionStateMachine:分区状态机的实现类。通过对Topic目录与分区目录的监控,在目录发送变化时触发状态变更。目前主要订阅的Listener有:AddPartitionListener、TopicChangeListener、DeleteTopicsListener。
例如创建新Topic,在/broker/topics目录下创建新的以Topic名称的目录, TopicChangeListener发现这个目录增加后,会查找出那些Topic是新增的然后调用onNewTopicCreation
3. ReplicaStateMachine: 副本状态机的实现类。主要订阅的Listener有:BrokerChangeListener。如果有新创建的broker加入,则会从BrokerChangeListener触发onBrokerStartup;如果有Broker宕机,则会触发onBrokerFailure
4. ZookeeperLeaderElector: 主要负责controller leader选举。主要订阅的Listener是LeaderChangeListener。因为/controller目录是临时节点,如果controller broker节点宕机,则LeaderChangeListener会触发其他节点进行重新选举。
5. PartitionReassignedListener: 主要用于迁移分区的时候触发分区迁移操作的Listener,执行迁移操作onPartitionReassignment.
6. PreferredReplicaElectionListener: 主要用于侦听优选副本选举目录的变化,触发优选副本选举,具体操作是onPreferredReplicaElection
7. IsrChangeNotificationListener: 主要用于侦听ISR变迁,触发controller的更新ISR变化并通知其他节点。这个ISR的改动可以由副本追上Leader后更新ISR的变化,导致controller感知然后通知其他节点更新ISR;也可以由Leader选举直接修改ISR通知其他节点。
1.1 对分区的管理
PartitionStateChange
其有效状态如下:
NonExistentPartition: 这种状态表明该分区从来没有创建过或曾经创建过后来又删除了。
NewPartition:创建分区后,分区处于NewPartition状态。在这种状态下,分区副本应该分配给它,但还没有领导者/同步复制组。
OnlinePartition:一旦一个分区领导者被选出,就会为在线分区状态。
OfflinePartition:如果分区领导者成功选举后,当领导者分区崩溃或挂了,分区状态转变下线分区状态。
其有效的状态转移如下:
NonExistentPartition -> NewPartition
1.群集中央控制器根据计算规则,从zk中读取分区信息,创建新分区和副本。
NewPartition -> OnlinePartition
1.分配第一个活着的副本作为分区领导者,并且该分区所有副本作为一个同步复制组,写领导者和同步副本组数据到zk中。
2.对于这个分区,发送LeaderAndIsr请求给每一个副本分区和并发送UpdateMetadata请求到每个活者的broker server。
OnlinePartition,OfflinePartition -> OnlinePartition
1.对于这个分区,需要选择新的领导者和同步副本组,一个副本组要接受LeaderAndIsr请求,最后写领导者和同步副本组信息到zk中。
a.OfflinePartitionLeaderSelector:新领导者=存活副本(最好是在isr);新isr =存活isr如果不是空或恰好为新领导者,否则;正在接受中副本=存活已分配副本。
b.ReassignedPartitionLeaderSelector:新领导者=存活分区重新分配副本;新isr =当前isr;正在接受中副本=重新分配副本
c.PreferredReplicaPartitionLeaderSelector:新领导这=第一次分配副本(如果在isr);新isr =当前isr;接受副本=分配副本
d.ControlledShutdownLeaderSelector:新领导者=当前副本在isr中且没有被关闭,新isr =当前isr -关闭副本;接受副本=存活已分配副本。
2.对于这个分区,发送LeaderAndIsr请求给每一个接收副本和UpdateMetadata请求到每个broker server
NewPartition,OnlinePartition -> OfflinePartition
1.这只不过标识该分区为下线状态
OfflinePartition -> NonExistentPartition
1. 这只不过标识该分区为不存在分区状态
1.2 Partition重新分配
管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发ReassignedPartitionsIsrChangeListener,从而通过执行回调函数KafkaController.onPartitionReassignment来完成以下操作:
1. 将ZooKeeper中的AR(Current Assigned Replicas)更新为OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
2. 强制更新ZooKeeper中的leader epoch,向AR中的每个Replica发送LeaderAndIsrRequest。
3. 将RAR - OAR中的Replica设置为NewReplica状态。
4. 等待直到RAR中所有的Replica都与其Leader同步。
5. 将RAR中所有的Replica都设置为OnlineReplica状态。
6. 将Cache中的AR设置为RAR。
7. 若Leader不在RAR中,则从RAR中重新选举出一个新的Leader并发送LeaderAndIsrRequest。若新的Leader不是从RAR中选举而出,则还要增加ZooKeeper中的leader epoch。
8. 将OAR - RAR中的所有Replica设置为OfflineReplica状态,该过程包含两部分。第一,将ZooKeeper上ISR中的OAR - RAR移除并向Leader发送LeaderAndIsrRequest从而通知这些Replica已经从ISR中移除;第二,向OAR - RAR中的Replica发送StopReplicaRequest从而停止不再分配给该Partition的Replica。
9. 将OAR - RAR中的所有Replica设置为NonExistentReplica状态从而将其从磁盘上删除。
10. 将ZooKeeper中的AR设置为RAR。
11. 删除/admin/reassign_partition。
注意:最后一步才将ZooKeeper中的AR更新,因为这是唯一一个持久存储AR的地方,如果Controller在这一步之前crash,新的Controller仍然能够继续完成该过程。
以下是Partition重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition重新分配过程中ZooKeeper中的AR和Leader/ISR路径如下
AR | leader/isr | Sttep |
{1,2,3} | 1/{1,2,3} | (initial state) |
{1,2,3,4,5,6} | 1/{1,2,3} | (step 2) |
{1,2,3,4,5,6} | 1/{1,2,3,4,5,6} | (step 4) |
{1,2,3,4,5,6} | 4/{1,2,3,4,5,6} | (step 7) |
{1,2,3,4,5,6} | 4/{4,5,6} | (step 8) |
{4,5,6} | 4/{4,5,6} | (step 10) |
1.3 对副本的管理
ReplicaStateChange:
有效状态如下:
1.NewReplica:当创建topic或分区重新分配期间副本被创建。在这种状态下,副本只能成为追随者变更请求状态。
2.OnlineReplica:一旦此分区一个副本启动且部分分配副本,他将处于在线副本状态。在这种状态下,它可以成为领导者或成为跟随者状态变更请求。
3.OfflineReplica:每当broker server副本宕机或崩溃发生时,如果一个副本崩溃或挂了,它将变为此状态。
4.NonExistentReplica:如果一个副本被删除了,它将变为此状态。
有效状态转移如下:
NonExistentReplica - - > NewReplica
1.使用当前领导者和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
NewReplica - > OnlineReplica
1.添加新的副本到副本列表中
OnlineReplica,OfflineReplica - > OnlineReplica
1.使用当前领导者和isr分区发送LeaderAndIsr请求到新副本和UpdateMetadata请求给每一个存活borker
NewReplica,OnlineReplica - > OfflineReplica
1.发送StopReplicaRequest到相应副本(w / o删除)
2.从isr中删除该副本,发送LeaderAndIsr(New ISR)请求到领导者副本去删除此副本和发送该分区的UpdateMetadata请求到每个存活broker。
OfflineReplica - > NonExistentReplica
1. 发送StopReplicaRequest到副本(删除)
1.4 对Topic的管理
Topic创建
Controller通过partition状态机watch Zookeeper的/broker/topics目录,每当有新的topic创建时,就会由partition状态机的topic change listener触发onNewTopicCreation操作,具体步骤如下:
1. 为每个新创建的Topic注册partition change listener,侦听分区的变化
2. 对Topic的所有分区触发onNewPartitionCreation操作,流程如下:
2.1 所有分区的状态改变为NewPartition
2.2 所有副本的状态改变为NewReplica
2.3 把所有分区的从NewPartition状态改为OnlinePartition,主要是通过初始化每个分区的Leader和ISR,并写入zk的持久化路径:/brokers/topics/[topic]/partitions/[partition],然后发送LeaderAndISR请求到各个相关的Broker
2.4 所有分区的副本状态从NewReplica转变为OnlineReplica
Topic删除
Topic的删除是通过Controller后台的删除Topic线程来完成的,因为涉及到数据的删除,所以处理流程要比创建Topic复杂,具体流程如下:
. Kafka controller在启动的时候会注册对于Zookeeper节点/admin/delete_topics的子节点变更监听器——上面的分析已经告诉 我们,delete命令实际上就是要在该节点下创建一个临时节点,名字是待删除topic名,标记该topic是待删除的
2. Kafka controller在启动时创建一个单独的线程,执行topic删除的操作 (由DeleteTopicsThread类实现)
3. 线程启动时查看是否有需要进行删除的topic——假设我们是在controller启动之后执行的topic删除命令,那么该线程刚启动的时候待删除的topic集合应该就是空的
4. 一旦发现待删除topic集合是空,topic删除线程会被挂起
5. 这时,我们执行delete操作,删除topic: test-topic,delete命令在/admin/delete_topics下创建子节点test-topic
6. 监听器捕获到该变更,立刻触发删除逻辑
6.1 查询test-topic是否存在,不存在的话直接删除/admin/delete_topics/test-topic节点——随便删除一个不存在的 topic,删除命令也只是创建/admin/delete_topics/[topicName]的节点,它不负责做存在性校验
6.2 查询一下test-topic是不是当前正在执行Preferred副本选举或分区重分配,如果是的话,肯定是不适合进行删除掉的。Controller 本地会缓存当前无法进行删除的topic集合,待分区重分配完成或preferred副本选举后单独处理该集合中的topic
6.3 如何两者都不是的话说明现在可以进行删除操作,那么就恢复挂起的删除线程执行删除操作
删除线程执行删除操作的真正逻辑是:
1. 它首先会给当前所有broker发送更新元数据信息的请求,告诉这些broker说这个topic要删除了,你们可以把它的信息从缓存中删掉了
2. 开始删除这个topic的所有分区
2.1 给所有broker发请求,告诉它们这些分区要被删除。broker收到后就不再接受任何在这些分区上的客户端请求了
2.2 把每个分区下的所有副本都置于OfflineReplica状态,这样ISR就不断缩小,当leader副本最后也被置于OfflineReplica状态时leader信息将被更新为-1
2.3 将所有副本置于ReplicaDeletionStarted状态
2.4 副本状态机捕获状态变更,然后发起StopReplicaRequest给broker,broker接到请求后停止所有fetcher线程、移除缓存,然后删除底层log文件
2.5 关闭所有空闲的Fetcher线程
3. 删除zookeeper下/brokers/topics/test-topic节点
4. 删除zookeeper下/config/topics/test-topic节点
5. 删除zookeeper下/admin/delete_topics/test-topic节点
6. 更新各种缓存,把test-topic相关信息移除出去
1.1 对Broker的管理
Broker启动
Broker启动后首先根据其ID在ZooKeeper的/brokers/ids zonde下创建临时子节点(Ephemeral node),创建成功后Controller的ReplicaStateMachine注册其上的Broker Change Watch会被fire,从而通过回调KafkaController.onBrokerStartup方法完成以下步骤:
向所有活跃Brokers或者ShutingDown Brokers发送UpdateMetadataRequest请求,老的Broker会通过这个请求知道这个新的Broker。其定义如下:
将新启动的Broker上的所有Replica设置为OnlineReplica状态,同时这些Broker会为这些Partition启动high watermark线程。
通过partitionStateMachine触发OnlinePartitionStateChange。
如果有分区迁移任务的分区落在新启动的broker上,则触发这些分区的迁移onPartitionReassignment
检查新启动broker的副本是否属于正在删除topic,如果有则发送信号触发topic删除线程的工作。
Broker failover
Controller在Zookeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的Znode会自动被删除,Zookeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
对set_p中的每一个Partition:
3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。
3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有Controller版本在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。
直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
Broker failover顺序图如下所示。
LeaderAndIsrRequest结构如下
LeaderAndIsrResponse结构如下
1.1 Controller Failover
Controller也需要Failover。每个Broker都会在Controller Path (/controller)上注册一个Watch。当前Controller失败时,对应的Controller Path会自动消失(因为它是Ephemeral Node),此时该Watch被fire,所有“活”着的Broker都会去竞选成为新的Controller(创建新的Controller Path),但是只会有一个竞选成功(这点由Zookeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为Zookeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注册。
Broker成功竞选为新Controller后会触发KafkaController.onControllerFailover方法,并在该方法中完成如下操作:
读取并增加Controller Epoch。
在ReassignedPartitions Path(/admin/reassign_partitions)上注册Watch。
在PreferredReplicaElection Path(/admin/preferred_replica_election)上注册Watch。
通过partitionStateMachine在Broker Topics Patch(/brokers/topics)上注册Watch。
若delete.topic.enable设置为true(默认值是false),则partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注册Watch。
通过replicaStateMachine在Broker Ids Patch(/brokers/ids)上注册Watch。
初始化ControllerContext对象,设置当前所有Topic,“活”着的Broker列表,所有Partition的Leader及ISR等。
启动replicaStateMachine和partitionStateMachine。
将brokerState状态设置为RunningAsController。
将每个Partition的Leadership信息发送给所有“活”着的Broker。
若auto.leader.rebalance.enable配置为true(默认值是true),则启动partition-rebalance线程。
若delete.topic.enable设置为true且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
1. 可能瓶颈
1. 所有的状态变迁事件由单一controller管理,并且每次操作都要加一把大锁。
2. Controller节点资源的受限(线程、网络连接、CPU、内存等),例如需要对每个Broker创建一个RequestSend线程
- 点赞
- 收藏
- 关注作者
评论(0)