kafka源码解析之五:New Producer源码分析
1. 接口
接口代码:
该接口是一个异步接口,发送一个ProducerRecord类型的消息,后面会介绍这个类。返回类型是一个Future,具体看附录介绍。
这个接口与上面的区别是当成功发送后,会回调callback函数。
返回指定topic的分区信息列表
返回Producer维护的metrics数据。
关闭生产者
2. 示意图
如上图,蓝色的框图属于Producer对象中的部件。客户端调用Send接口发送消息,先会追加到RecordAccumulator中形成一个一个的RecordBatch,然后再由后台的Sender Thread针对每个节点发送请求,请求中带有每个分区的RecordBatch。Kafka集群给Sender Thread的请求回响应,每个RecordBatch中的消息要么都写入成功,要么都失败。
New Producer与Old Producer最主要的区别就是New Producer使用非阻塞式IO并且实现都是基于异步的,同步接口也是基于异步实现。而Old Producer的异步接口是基于同步接口来实现。用下面的图可以说明问题:
可以看出使用非阻塞式的设计可以使得Producer同样发送3个请求完成的时间大大减少,提升吞吐量。
3. Common相关类
3.1 Record
这个类是对Kafka存储消息结构体的封装。类似core 包中的Message
3.2 LogEntry
这个类是offset与record配对类, 类似于core包中的MessageAndOffset
3.3 Records
这个类是一个接口类,继承了Iterable<LogEntry>以便遍历集合。相当于core包中的MessageSet。
提供了两个接口
writeTo: 把records记录内容写到指定的channel里
sizeInBytes: 记录集中所有消息的长度
3.1 MemoryRecords
Records的实现类,分为读模式和写模式,带有一个ByteBuffer,初始化的时候为写模式,有点类似Core包中的ByteBufferMessageSet。这个类还带有压缩功能,在写入记录到这个buffer的时候就能自动压缩。
当一个RecordBach被取出来的时候,调用memoryRecords的close接口,则不能再写入,切换为读模式
3.2 Cluster
保存集群信息的类,提供不同维度的查询分区信息功能,主要有以下成员变量:
Nodes: 节点信息列表
partitionsByTopicPartition: 是一个以TopicPartition为Key, Value为PartitionInfo的hashMap。
partitionsByTopic: 以topic名称为Key, 所有分区信息(List<PartitionInfo>)为Value的hashMap
availablePartitionsByTopic: 以topic名称为Key, 所有可用分区为Value的hashMap
partitionsByNode: 以broker节点编号为Key,所有分区信息(List<PartitionInfo>)为Value的hashMap
4. 网络相关类
4.1 KafkaClient
是一个接口类,提供了KafkaClient端的一系列接口,例如检查节点是否准备好发送,进行网络发送poll等。
4.2 NetworkClient
是对KafkaClient的实现类。主要的关键成员变量有:
selector: 封装了nio Selector对象,实现了selectable接口,主要用来做网络连接、发送接收等实际的网络操作。
inFlightRequests: 正在发送和等待响应的请求的个数。 这个值设置为1,则每次在发送路途上的请求只有1个,是实现发送保序的关键参数。
实现的关键接口有:
ready: 判断节点是否准备好发送,如果未准备好则开始连接到某个节点。如果已经连接则返回true
isReady: 用来判断某个节点是否可以发送。可以发送是指连接已经建立或者当前inFlightRequests未达到设定值。
poll : 初始化请求消息,定期做元数据更新并做真正的网络发送和接收请求。返回接收到的响应。后面有详细介绍
4.3 InFlightRequests
这个类用来管理正在发送或者等待响应的请求,主要成员变量有:
maxInFlightRequestsPerConnection: 每个链接的在途请求最大个数
requests: 是一个HashMap<Integer, Deque<ClientRequest>>, Key为每个节点的broker ID, Value为该节点所对应的请求。
主要接口如下:
add : 把某个请求添加到requests Map中。会自动从ClientRequest中找到node id,如果该id存在,则直接添加到其双向队列的头部,否则会创建这个node id的双向队列然后添加到队列中。
canSendMore: 判断某个节点是否能继续发送,能发送的条件是节点的双向队列不存在或者为空或者是第一个请求已经发送完毕(响应有可能未回来)并且队列个数未达到在途个数的最大值。
5. 生产者相关类
5.1 RecordMetadata
记录被服务器确认后的元数据。含有两个成员变量:
1. Offset: 记录所在topic和partition下的offset
2. topicPartition: 是TopicPartition类,即topic名称和分区所在的编号
5.2 ProduceRequestResult
这个类是作为producer请求的结果,异步接口返回的future对象里面包含这个result。所以这个接口需要实现等待和唤醒操作。先看一下它的成员变量:
latch: 一个同步计数器CountDownLatch, 是实现等待和唤醒的关键,这个latch的初始化值为1。
topicPartition:TopicPartition信息
baseOffset:
5.3 FutureRecordMetadata
这个类是异步send接口返回的结果,是对Future<RecordMetadata>的实现。有两个成员变量:
result: 是一个ProduceRequestResult
relativeOffset: 添加到RecordBatch时递增的offfset。例如第一条为0,第二条为1,依次类推
这个类主要实现接口:
get : Future的get接口实现,在异步发送后调用get会等待。等待的原理是用result里面的await方法,背后就是一个CountDownLatch的await等待。直到有线程调用result的done方法才返回。
5.4 RecodBatch
这个类是封装了一系列需要发送的一批消息。其中主要的成员变量有:
records: MemoryRecords对象,用于保存一批记录。
topicPartition: 保存topic名称和分区编号
produceFuture: ProduceRequestResult对象
5.5 ProducerRecord
这个类是对发送消息的封装,主要有4个成员:
topic: 消息发往的topic名称,这个在生成消息时必须有
partition: 指定消息发往的分区编号,这个可以为null。
key: 消息的key
value: 消息的具体内容
5.6 KafkaProducer
这个是生产者类,用户使用New Producer接口的时候需要先初始化这个类。这个类初始化的时候代码如下:
会生成累积器(RecordAccumulator)和Sender线程,并运行Sender线程。
异步发送接口也是在这个类里面实现。
5.7 BufferPool
这个类是记录累积器中的内存池。主要成员有:
totalMemory: 内存池所能分配内存的最大值。
free: 是一个ByteBuffer的数组双端队列ArrayDeque<ByteBuffer>。初始化为拥有16个ByteBuffer元素。用于分配空闲内存。
waiters: 条件变量的数组双端队列ArrayDeque<Condition>。当BufferPool中内存不足时,会生成一个条件变量用于等待空闲内存足够的时候被唤醒
5.8 RecordAccumulator
这个类类似一个queue,用于汇聚成记录以便批量发送到broker。它创建时会指定使用的内存总量,当内存不足时会阻塞不能再放入记录,直到已有记录被发送释放后才能继续放入记录。
它的一些成员变量如下:
batches: 是一个CopyOnWriteMap, 用于保存每个TopoicPartition所对应的RecordBatch。这些RecordBatch用双端队列Deque来管理。
free: 是一个BufferPool,用于管理从系统分配的内存。
累积器有两个子类:
RecordAppendResult:用做把记录添加到累加器时返回的结果,只是对FutureRecordMetadata的一些封装,代码如下:
ReadyCheckResult:用作检查节点是否Redy的结果。
5.9 Metadata
这个类对元数据的一些操作进行了封装。主要的成员变量有:
cluster: 封装了一个Cluster元素,用于查询集群状态信息。对象初始化时Cluster初始化为Empty。
topics: 是一个HashSet<String>对象,用于保存集群中的topic
提供如下接口:
fetch: 直接返回cluster对象
add: 把topic信息添加到topics 集合中
update:更新cluster信息,更新后并唤醒所有等待更新的线程。
6. 发送流程
调用发送接口的流程如下:
Send接口发送代码如下:
6.1 等待topic metadata可用
代码如下:
metadata.requestUpdate
这个函数比较简单,设置更新标志位,同时返回当前metadata的版本,代码如下:
metadata.awaitUpdate
这个函数的作用就是等待metadata版本更新,代码如下:
6.2 记录添加到累积器
这一步的代码如下:
我们再看看free.allocate,即BufferPool的allocate函数:
内存分配好后,看看RecordBatch.tryAppend函数,这个函数的作用是尝试把消息和callback函数添加到RecordBatch里,并返回一个元数据的Future,代码如下:
7. 后台io线程流程
后台io线程是Sender类,继承与Runnable接口,本身实现的run接口很简单,主要操作都在run(now)函数里面,代码如下:
下面我们主要看一次io迭代的流程:
代码以及注释如下:
我们先看看this.accumulator.drain是如何选取到要发送的batch的。代码就是RecordAccumulator.drain如下:
生成每个节点的发送RecordBatch后,就可以生成每个节点的请求,即createProduceRequests函数,代码如下:
我们再看看实际做网络请求的代码NetworkClient.poll:
最后再看如何处理响应请求,代码是Sender.handleResponse:
这里主要的函数就是completeBatch,代码如下:
8. 附录
Future
在Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实现,可以来进行异步计算。
Future模式可以这样来描述:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。就相当于下了一张订货单,一段时间后可以拿着提订单来提货,这期间可以干别的任何事情。其中Future 接口就是订货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。
Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间。这个设置超时的方法就是实现Java程序执行超时的关键。
Future接口是一个泛型接口,严格的格式应该是Future<V>,其中V代表了Future执行的任务返回值的类型。 Future接口的方法介绍如下:
boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true
boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
V get () throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
Future的实现类有java.util.concurrent.FutureTask<V>即 javax.swing.SwingWorker<T,V>。通常使用FutureTask来处理我们的任务。FutureTask类同时又实现了Runnable接口,所以可以直接提交给Executor执行。使用FutureTask实现超时执行的代码如下:
不直接构造Future对象,也可以使用ExecutorService.submit方法来获得Future对象,submit方法即支持以 Callable接口类型,也支持Runnable接口作为参数,具有很大的灵活性。使用示例如下:
利用Future接口实现程序执行超时大致用法就这么多,改天需要研究下Future接口的内部实现,特别是设定执行超时的实现。
CopyOnWrite
http://ifeve.com/java-copy-on-write/
Kafka Client Rewrite
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
- 点赞
- 收藏
- 关注作者
评论(0)