kafka源码解析之七:Delayed Request代码分析
1 涉及的类分析
1.1 RequestPurgatory
Purgatory:炼狱?暂时受苦的地方?名字比较有趣,延迟请求都被放到炼狱里接受拷问,满足条件的请求才能出狱,否则要*** 掉。该scala定义了两个类(包括一个抽象类)都是很多类会继承的。
首先来说说DelayedRequest类。DelayedRequest定义了一个延时处理请求,延迟的时间单位是ms,在构造函数中需要传入这个值。构造函数还要求传入一组key集合来触发一些action,比如检查该请求是否已经满足某些条件等。这些key可以是一对(topic, 分区)。这个类是放在和RequestPurgatory同一个scala文件中的。
定义的第二个类是一个抽象类:RequestPurgatory——它是一个帮助类用于处理带超时的异步请求。上面说到的DelayedRequest是一个延时请求并且定义了一组key能够触发某种操作。在具体实现的时候可以添加自定义的逻辑来定义对于一个给定的请求而已满足条件表示什么含义。比如说,可能是等待用户指定的acks数或者是等待积累了指定数目的字节数。DelayedRequest的key通常都是一个(topic, 分区)对。
该类定义了一个嵌套类:Watchers——一个DelayedRequest的链表来监听每个请求是否已满足条件(satisfied)。该类定义的方法有:
1. watched:按返回监听列表的个数
2. addIfNotSatisfied:如果给定的请求未satisfied,则将其加入到监听列表中
3. purgeSatisfield:遍历整个列表删除satisfied的请求
4. collectSatisfiedRequests:遍历整个列表尝试为每个被监听的请求检查是否满足条件,主要使用checkSatisfied方法
还有一个嵌套类:ExpiredRequestReaper,是一个Runnable对象,会将那些已经很长时间没能满足条件的请求置于过期状态。具体做法就是在内部维护了一个延时队列DelayQueue,提供了一些方法可以来操作此线程:
1. enqueue:将请求T入队
2. delayed:获取当前delayed的请求数
3. pollExpired:获取下一条已过期的请求
4. purgeSatisfied:从延时队列和监控列表中删除所有已满足条件的请求
该类作为RequestPurgatory后台线程的一个实现类,其中的重载Runnable对象run接口代码如下:
回到RequestPurgatory类, 该类维护了一个请求列表,每个key都对应了一个Watchers。另外该类还定义了一个过期请求处理器和一个专门的线程来执行这个过期请求处理器中指定的工作,并在构造函数时启动该线程。该类还提供了一些方法:
1. isSatisfiedByMe:判断这个请求是否是由调用者线程满足的条件——即是否由调用者线程设置satisfied为true
2. checkAndMaybeWatch:尝试添加请求以监听所有列表中的key。如果该请求本身已经satisfied了且是由调用者线程做出的则返回true
3. update:更新watchers并返回刚刚satisfied请求的列表
4. watched:返回watch列表的个数
5. delayed:返回在过期请求处理器队列中的请求数
6. watchersFor:根据给定的watch key返回对应的watch列表
7. checkSatisfied:检查这个请求是否已经satisfied
8. expire:处理过期请求
9. shutdown:关闭过期请求处理器
1.2 DelayedFetch
延迟获取请求,出现以下情况时该请求才不会被阻塞:
这个请求要获取的目标分区的leader已不再是这个broker了——这种情况下应该返回其他分区的可用数据
broker中无请求中有些目标分区的记录——这种情况下应该返回其他分区的可用数据
位移信息没有保存在日志的最后一个日志段中——这种情况下应该返回日志段中所有的数据
从所有目标分区中读取的字节数已超过最小字节数——这种情况下应该返回可用的数据
该类定义的方法有:
1. isSatisfied:判断目标请求是否没有被阻塞。从代码中来看貌似永远返回true,除非上述前三个条件都没满足。
2. respond:创建新的FetchResponse返回
1.3 DelayedProduce
一个延迟produce请求,继承了DelayedRequest。该请求不会被阻塞,如果对于它要生产消息的每个分区:
1) 该broker都不是leader,那么就不会阻塞——应该返回一个错误
2) 如果这个broker是leader:
a) 如果有一个本地错误(比如在写本地log时),那么该请求也不会被阻塞——而是返回一个错误
b) 否则,至少应该有requiredAcks个副本追上到请求的offset
该类还定义了一个case类:DelayedProduceResponseStatus封装了ProduceResponse的状态,包括offset和错误码。该类维护了一个volatile的状态标识位acksPending
在该类的构造函数中,会根据错误码来更新acks pending值。该类也定义了两个方法:respond和isSatisfied:
1. respond:如果response中无错误的话,将位移信息加入到位移管理器的缓存中,然后构建一个response返回
2. isSatisfied:检查每个分区是否仍然有未响应的应答。如果没有则置为unblocked状态
1.4 FetchRequestPurgatory
保存延迟Fetch请求的临时场所。该类定义了一个嵌套类DelayedFetchRequestMetrics和recordDelayedFetchExpired都是关于更新统计信息的。提供的其他方法还有:
1. checkSatisfied:检查一个给定的DelayedFetch请求是否是非阻塞状态
2. respond:把FetchRequest请求对应的响应发回到Socket server
3. expire:当一个请求FetchRequest过期时,把它原样发回去
1.5 ProducerRequestPurgatory
包含延时producer请求。内定义了一个嵌套类用于保存一些度量元信息,另外还定义了一些方法:
1. recordDelayedProducerKeyExpired:记录延迟Producer过期
2. checkSatisfied:检查一个特定的延时Fetch请求是否未被阻塞
3. expire:将延迟Producer请求过期
4. respond:将response发回
1.6 DelayedItem
继承Delayed接口,主要实现了getDelay和compareTo接口。
getDelay: 获取剩下的delay时间(毫秒)。用于判断是否到底延迟时间。
compareTo:用于判断队列内部排序的接口
2 代码分析
2.1 延迟Producer请求
主要是用于生产者发送消息后,需要确认副本写入成功后,broker才向生产者会响应的场景。其流程图如下:
这个一部分代码是broker处理生产者请求代码的一个分支(handleProducerOrOffsetCommitRequest)代码如下:
2.1.1 生成延迟请求
这个一步主要的工作就是根据本地Append的结果,生成一个延迟请求(DelayedProduce)。首先遍历localProduceResults的每个元素,它是一个ProduceResult的链表,ProduceResult包括的元素有: TopicAndPartition, 写入文件后的firstOffset和lastOffset。 然后生成一个status Map, 是每个TopicAndPartition 和其对应DelayedProduceResponseStatus。注意DelayedProduceResponseStatus需要等到收到lastOffset + 1的确认才是成功写入所有insync 副本, 等所有分区的延迟请求响应都得到满足后,才会回响应给生产者。
2.1.2 添加请求到监控炼狱
首先检测延迟请求是否能马上满足,如果是则返回true,否则添加到watch列表中监控。
上面代码中检测delayedRequest是否满足的代码checkSatisfied(delayedRequest)如下:
再来看看是如何检测是否有足够的副本到达请求的lastOffset+1:
2.1.3 返回响应
如果满足了请求,则调用producerRequestPurgatory.respond(delayedRequest)返回响应给生产者。
但是如果本线程检测的时候延迟请求还不满足副本写入足够多,是谁去给生产者回响应?答案是Broker在处理副本拉Leader分区时的Fetch请求时,就会知道每个副本已经拉到那个offset了,从而更新高水位,然后触发一批延迟请求的检测,最后返回响应。
在更新高水位代码:
2.2 延迟Fetch请求
当Fetch请求不满足以下4个条件,就会产生一个延迟请求:
(1)用户不想等待(fetch.wait.max.ms小于等于0, 这个参数会作为延迟请求的超时时间)
(2)Fetch不到数据
(3)获取的数据不足(未达到fetch.min.bytes)
(4)有错误产生
代码如下(handleFetchRequest里面):
2.2.1 生成延迟Fetch请求
我们主要看看延迟Fetch请求满足的条件是什么:
(1)Broker已经不是某个分区的Leader
(2)Broker对请求的分区未知
(3)已经fetch的Offset不在最新的Log segment上
(4)Fetch请求所有分区累计的数据,已经大于fetch.min.bytes
满足上面4个条件,延迟Fetch请求的isSatisfied函数都是返回true。
2.2.2 添加请求到监控炼狱
检测并添加到炼狱的代码checkAndMaybeWatch和延迟生产请求是一样的,不同的地方是checkSatisfied(delayedRequest)时用的是delayedFetch.isSatisfied,代码如下:
其实代码就是DelayedFetch被满足的4个条件,代码比较简单,不再展开。
2.2.3 返回响应
如果本线程能满足请求,则调用fetchRequestPurgatory.respond(delayedFetch)返回响应给消费者,这个跟延迟生产请求类似。
那么如果本线程不能满足,是如何返回响应的? 主要有3个地方:
1. 延迟请求超时(由fetch.wait.max.ms指定超时时间)
2. 更新高水位的时候
3. 生产者往Leader写入数据的时候。
前2个在前面已经有介绍,第3个地方代码如下:
unblockDelayedFetchRequests这个函数实现跟unblockDelayedProduceRequests都类似,不再赘述。
3 附录:
3.1 DelayQueue
3.1.1 应用场景
我们谈一下实际的场景吧。我们在开发中,有如下场景
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
一种笨笨的办法就是,使用一个后台线程,遍历所有对象,挨个检查。这种笨笨的办法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。 这场景,使用DelayQueue最适合了。
3.1.2 DelayQueue及其相关类介绍
DelayQueue 是java.util.concurrent中提供的一个很有意思的类。很巧妙,非常棒!但是java doc和Java SE 5.0的source中都没有提供Sample。我最初在阅读ScheduledThreadPoolExecutor源码时,发现DelayQueue 的妙用。随后在实际工作中,应用在session超时管理,网络应答通讯协议的请求超时处理。
本文将会对DelayQueue做一个介绍,然后列举应用场景。并且提供一个Delayed接口的实现和Sample代码。
DelayQueue是一个BlockingQueue,其特化的参数是Delayed。(不了解BlockingQueue的同学,先去了解BlockingQueue再看本文)
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。 DelayQueue队列中保存的是实现了Delayed接口的实现类,里面必须实现getDelay()和compareTo()方法,前者用于取DelayQueue里面的元素时判断是否到了延时时间,否则不予获取,是则获取。 compareTo()方法用于进行队列内部的排序
DelayQueue内部是使用PriorityQueue实现的。 DelayQueue = BlockingQueue +PriorityQueue + Delayed DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。
- 点赞
- 收藏
- 关注作者
评论(0)