kafka源码解析之六:New consumer协调者特性分析与代码走读

举报
步步清风 发表于 2017/12/22 16:46:09 2017/12/22
【摘要】 对于kafka,我们不仅在单机、集群、参数调优等方面,对kakfa全方位研究,更要在代码层解读,从原理到代码细节一一打开看看。(声明:所有这系列代码分析章节,有些图片/资料整理来自网络公开资料,站在巨人肩膀上二次仔细解读代码总结分享给大家)

Old Consumer问题

当前High LevelNew Consumer主要受到脑裂和羊群效应,这些问题包括:

 

脑裂:

在一个大集群中往往会有一个master存在,在长期运行过程中不可避免的会出现宕机等问题导致master不可用,在出现这样的情况以后往往会对系统产生很大的影响,所以一般的分布式集群中的master都采用了高可用的解决方案来避免这样的情况发生。

master-slaver方式,存在一个master节点,平时对外服务,同时有一个slaver节点,监控着master,同时有某种方式来进行数据的同步。如果在master挂掉以后slaver能很快获知并迅速切换成为新的master。在以往master-slaver的监控切换是个很大的难题,但是现在有了Zookeeper的话能比较优雅的解决这一类问题。

image.png

master-slaver实现起来非常简单,而且在master上面的各种操作效率要较其他HA解决方案要高,早期的时候监控和切换很难控制,但是后来zookeeper出现了,他的watch和分布式锁机制很好的解决了这一类问题。

我们的系统和同事的系统都是这种模式,但是后来都发现由于ZooKeeper使用上的问题存在脑裂的问题。

记得很久以前参加一个大牛的技术交流会他就提到过在集群中假死问题是一个非常让人头痛的问题,假死也是导致脑裂的根源。

根据一个什么样的情况能判断一个节点死亡了down掉了,人可能很容易判断,但是对于在分布式系统中这些是有监控者来判断的,对于监控者来说很难判定其他的节点的状态,唯一可靠点途径的就是心跳,包括ZooKeeper就是使用心跳来判断客户端是否仍然活着的,使用ZooKeeper来做master HA基本都是同样的方式,每个节点都尝试注册一个象征master的临时节点其他没有注册成功的则成为slaver,并且通过watch机制监控着master所创建的临时节点,Zookeeper通过内部心跳机制来确定master的状态,一旦master出现意外Zookeeper能很快获悉并且通知其他的slaver,其他slaver在之后作出相关反应。这样就完成了一个切换。这种模式也是比较通用的模式,基本大部分都是这样实现的,但是这里面有个很严重的问题,如果注意不到会导致短暂的时间内系统出现脑裂,因为心跳出现超时可能是master挂了,但是也可能是masterzookeeper之间网络出现了问题,也同样可能导致。这种情况就是假死,master并未死掉,但是与ZooKeeper之间的网络出现问题导致Zookeeper认为其挂掉了然后通知其他节点进行切换,这样slaver中就有一个成为了master,但是原本的master并未死掉,这时候client也获得master切换的消息,但是仍然会有一些延时,zookeeper需要通讯需要一个一个通知,这时候整个系统就很混乱可能有一部分client已经通知到了连接到新的master上去了,有的client仍然连接在老的master上如果同时有两个client需要对master的同一个数据更新并且刚好这两个client此刻分别连接在新老的master上,就会出现很严重问题。

出现这种情况的主要原因在与Zookeeper集群和Zookeeperclient判断超时并不能做到完全同步(这些还依赖于操作系统调度等,很难保证),也就是说可能一前一后,如果是集群先于client发现那就会出现上面的情况了。同时在发现并切换后通知各个客户端也有先后快慢。出现这种情况的几率很小,需要masterzookeeper集群网络断开但是与其他集群角色之间的网络没有问题,还要满足上面那些条件,但是一旦出现就会引发很严重的后果,数据不一致了。

避免这种情况其实也很简单,在slaver切换的时候不在检查到老的master出现问题后马上切换,而是在休眠一段足够的时间,确保老的master已经获知变更并且做了相关的shutdown清理工作了然后再注册成为master就能避免这类问题了,这个休眠时间一般定义为与zookeeper定义的超时时间就够了,但是这段时间内系统可能是不可用的,但是相对于数据不一致的后果我想还是值得的。

当然最彻底的解决这类问题的方案是将master HA集群做成peer2peer的,屏蔽掉外部Zookeeper的依赖。每个节点都是对等的没有主次,这样就不会存在脑裂的问题,但是这种ha解决方案需要使用两阶段,paxos这类数据一致性保证协议来实现,不可避免的会降低系统数据变更的系统,如果系统中主要是对master的读取操作很少更新就很适合了。

 

 

羊群效应

例如Zookeeper的一个羊群效应例子是当一个特定的znode 改变的时候ZooKeper 触发了所有watches 的事件。

举个例子,如果有1000个客户端watch 一个znode的exists调用,当这个节点被创建的时候,将会有1000个通知被发送。这种由于一个被watch的znode变化,导致大量的通知需要被发送,将会导致在这个通知期间的其他操作提交的延迟。因此,只要可能,我们都强烈建议不要这么使用watch。仅仅有很少的客户端同时去watch一个znode比较好,理想的情况是只有1个。

举个例子,有n 个clients 需要去拿到一个全局的lock.一种简单的实现就是所有的client 去create 一个/lock znode.如果znode 已经存在,只是简单的watch 该znode 被删除。当该znode 被删除的时候,client收到通知并试图create /lock。这种策略下,就会存在上文所说的问题,每次变化都会通知所有的客户端。

 

New Consumer采用协调者来避免羊群效应,只由协调者来watch 分区的变化,不需要每个consumerwatch

 

 

New consumer协调者方案

每一个broker被选举为一部分消费组的协调者(coordinator)。其主要职责在于策划rebalance操作时分区的分配,同时负责把分区的分配信息告知消费者

 

Consumer的步骤

1.       启动时或者协调者宕机,consumer发送一个ConsumerMetadataRequest 到任意的broker list中,而在ConsumerMetadataResponse中会告知这组的协调者是哪个broker

2.       Consumer连接到协调者,并且发送HeartbeatRequest给协调者,如果在心跳的响应HeartbeatResponse中有IllegalGeneration 错误码则表示协调者正在初始化一个rebalance,消费者则停止fetch 数据,提交offset并且发送JoinGroupRequest请求到协调者。消费者接收到JoinGroupResponse后,会得到topic和分区的拥有情况和所在Group generation id  此时消费者就可以继续fetch 数据和提交offset

3.       如果在HeartbeatResponse中没有错误码,则consumer继续如常获取数据。

协调者步骤

1.       在稳定状态下,协调者通过故障检测协议(Failure Detect Protocol)检查组内消费者的健康情况

2.       在选举或者启动过程中,协调者会从zookeeper中读取其负责的消费组的成员信息, 如果没有之前的消费组信息,则直到有新的consumer加入才开始干活。

3.       在协调者完成读取消费组成员信息的之前, HearbeatRequests, OffsetCommitRequests JoinGroupRequests三个请求的响应都会返回CoordinatorStartupNotComplete 错误码,消费组需要重试这些请求。

4.       在选举或者重启的时候,协调者也会继续对消费组做故障检测,如果消费者被标记为dead则会从消费组中移除,同时触发组内的rebalance

5.       Rebalance通过在 HeartbeatResponse的错误码返回IllegalGeneration 来触发,消费者会向协调者发送JoinGroupRequests请求重新注册,当所有的消费者都完成注册后,协调者会向消费者发送JoinGroupResponse响应,里面含有分区的分配信息从而完成rebalacne操作。

6.       协调者负责检查topic和分区的变动,然后触发rebalance操作。

故障检测协议:

Consumer在发送JoinGroupRequest会设置一个sesssion timeout时间,当consumer成功加入消费组后,故障检测就开始了。消费者就会每session.timeout.ms/heartbeat.frequency 时间就发送一次HeartbeatRequest并等待响应。 如果协调者在session.timeout.ms时间内都没有收到消费者的心跳请求,则判断消费者为dead;同样地如果消费者在session.timeout.ms时间内没有心跳响应,则认为协调者为dead并启动协调者发现流程。session.timeout.msheartbeat.frequency参数都必须设置得比较合理才行,如果session.timeout.ms设置比较大,heartbeat.frequency设置比较小则会导致协调者负载增高

具体的步骤如下:

1.     After receiving a ConsumerMetadataResponse or a JoinGroupResponse, a consumer periodically sends a HeartbeatRequest to the coordinator every session.timeout.ms/heartbeat.frequency milliseconds.

2.     Upon receiving the HeartbeatRequest, coordinator checks the generation number, the consumer id and the consumer group. If the consumer specifies an invalid or stale generation id, it send an IllegalGeneration error code in the HeartbeatResponse.

3.     If the coordinator does not receive a HeartbeatRequest from a consumer at least once in session.timeout.ms, it marks the consumer dead and triggers a rebalance process for the group.

4.     If the consumer does not receive a HeartbeatResponse from the coordinator after session.timeout.ms or finds the socket channel to the coordinator to be closed, it treats the coordinator as failed and triggers the co-ordinator re-discovery process.

协调者失败的时候,消费者可能在新的协调者完成failover 过程(包括加载metadata信息等)前或者后发现新的协调者。  如果是后者,协调者只需要想正常场景地接收消费者的请求即可。如果是前者,协调者会拒接消费者的请求,导致消费者重新查找协调者。如果消费者连接新协调者超时,则会被标记为dead并且认为消费者的请求为一个新的consumer从而导致rebalance

image.png

Down - The co-ordinator is dead or demoted

Catch up - In this state, the co-ordinator is elected but not ready to serve requests

Ready - In this state, the newly elected co-ordinator has finished loading the group metadata for all groups that it is responsible for

Prepare for rebalance - In this state, the co-ordinator sends the IllegalGeneration error in the HeartbeatResponse for all consumers in the group and waits for the consumers to send it a JoinGroupRequest

Rebalancing - In this state, the co-ordinator has received a JoinGroupRequest from the consumers in the current generation and itincrements the group generation idassigns consumer ids where required and does the partition assignment (注意这个状态下如果又有消费者失去联系,会重新变为Prepare for rebalance状态)

Steady - In the steady state, the co-ordinator accepts OffsetCommitRequests and heartbeats from all consumers in every group(图中应该是ready状态,不断收到所有消费者的心跳请求)

 

Down - The consumer process is down

Startup up & discover co-ordinator - In this state, the consumer discovers the co-ordinator for it's group. The consumer sends a JoinGroupRequest (with no consumer id) once it discovers the co-ordinator. The JoinGroupRequest can receive InconsistentPartitioningStrategy error code if some consumers in the same group specify conflicting partition assignment strategies. It can receive an UnknownPartitioningStrategy error code if the friendly name of the strategy in the JoinGroupRequest is unknown to the brokers. In this case, the consumer is unable to join a group.

Part of a group - In this state, the consumer is part of a group if it receives a JoinGroupResponse with no error code, a consumer id and the generation id for it's group. In this state, the consumer sends a HeartbeatRequest. Depending on the error code received, it either stays in this state or moves to Stopped Consumption or Rediscover co-ordinator.

Re-discover co-ord - In this state, the consumer does not stop consumption but tries to re-discover the co-ordinator by sending a ConsumerMetadataRequest and waiting for a response as long as it gets one with no error code. (在这个状态下还能继续消费,说明协调者失去联系并不代表其订阅的分区不能使用)

Stopped consumption - In this state, the consumer stops consumption and commits offsets, until it joins the group again

image.png

Down - The co-ordinator is dead or demoted

Catch up - In this state, the co-ordinator is elected but not ready to serve requests

Ready - In this state, the newly elected co-ordinator has finished loading the group metadata for all groups that it is responsible for

Prepare for rebalance - In this state, the co-ordinator sends the IllegalGeneration error in the HeartbeatResponse for all consumers in the group and waits for the consumers to send it a JoinGroupRequest

Rebalancing - In this state, the co-ordinator has received a JoinGroupRequest from the consumers in the current generation and it increments the group generation idassigns consumer ids where required and does the partition assignment (注意这个状态下如果又有消费者失去联系,会重新变为Prepare for rebalance状态)

Steady - In the steady state, the co-ordinator accepts OffsetCommitRequests and heartbeats from all consumers in every group (图中应该是ready状态,不断收到所有消费者的心跳请求)

 

消费者ID的分配

1.     After startup, a consumer learns it's consumer id in the very first JoinGroupResponse it receives from the co-ordinator. From that point onwards, the consumer must include this consumer id in every HeartbeatRequest and OffsetCommitRequest it sends to the co-ordinator. If the co-ordinator receives a HeartbeatRequest or an OffsetCommitRequest with a consumer id that is different from the ones in the group, it sends an UnknownConsumer error code in the corresponding responses. 

2.     The co-ordinator assigns a consumer id to a consumer on a successful rebalance and sends it in the JoinGroupResponse. The consumer can choose to include this id in every subsequent JoinGroupRequest as well until it is shutdown or dies. The advantage of including the consumer id in subsequent JoinGroupRequests is a lower latency on rebalance operations. When a rebalance is triggered, the co-ordinator waits for all consumers in the previous generation to send it a JoinGroupRequest. The way it identifies a consumer is by it's consumer id. If the consumer chooses to send JoinGroupRequest with empty consumer id, the co-ordinator waits a full session.timeout.ms before it proceeds with the rest of the rebalance operation. It does this since there is no way to map the incoming JoinGroupRequest with a consumer in the absence of a consumer id. This puts a lower bound on the rebalance latency (session.timeout.ms). On the other hand, if the consumer sends it's consumer id on subsequent JoinGroupRequests, the co-ordinator can immediately identify the consumers and proceed with a rebalance once all the known consumers have sent a JoinGroupRequest. 

3.     The co-ordinator does consumer id assignment after it has received a JoinGroupRequest from all existing consumers in a group. At this point, it assigns a new id <group>-<uuid> to every consumer that did not have a consumer id in the JoinGroupRequest. The assumption is that such consumers are either newly started up or choose to not send the previously assigned consumer id. 

4.     If a consumer id is specified in the JoinGroupRequest but it does not match the ids in the current group membership, the co-ordinator sends an UnknownConsumer error code in the JoinGroupResponse and prevents the consumer from joining the group. This does not cause a rebalance operation for the rest of the consumers in the group, but also does not allow such a consumer to join an existing group.

Old Consumer的比较

分区变动和组内消费者的变动而做RebalanceOld Consumer都是由消费者每个人自己触发。而New Consumer则是由协调者触发

 

对应代码走读

消费者对象

image.png

生成Coordinator对象,设置session_timeout_msheartbeat_interval_ms

 

在基类中,设置memberIDunknown ID。还会启动心跳线程:

image.png

HeartbeatTaskrun方法中,首先判断协调者是否已经dead,如果是标记协调者为dead,然后则改变状态;否则发心跳

image.png


协调者

kafkaKafkaServer 中的startup函数中,会启动协调者

image.png

处理心跳的几个函数

image.png

kafkaAPI中,多了几种处理协调者请求的handler:  handleGroupCoordinatorRequesthandleJoinGroupRequesthandleHeartbeatRequesthandleLeaveGroupRequest

image.png


handleJoinGroupRequest GroupCoordinator处理JoinGroup 请求:

image.png

JoniGroup里面就是doJoinGroup,实际就是状态机的实现:

image.png



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。