kafka源码解析之五:New Producer源码分析

举报
步步清风 发表于 2017/12/21 17:32:50 2017/12/21
【摘要】 对于kafka,我们不仅在单机、集群、参数调优等方面,对kakfa全方位研究,更要在代码层解读,从原理到代码细节一一打开看看。(声明:所有这系列代码分析章节,有些图片/资料整理来自网络公开资料,站在巨人肩膀上二次仔细解读代码总结分享给大家)

1.    接口

接口代码:

image.png

image.png

该接口是一个异步接口,发送一个ProducerRecord类型的消息,后面会介绍这个类。返回类型是一个Future,具体看附录介绍。

image.png

这个接口与上面的区别是当成功发送后,会回调callback函数。

image.png

返回指定topic的分区信息列表

image.png

返回Producer维护的metrics数据。

image.png

关闭生产者


2.    示意图

image.png

如上图,蓝色的框图属于Producer对象中的部件。客户端调用Send接口发送消息,先会追加到RecordAccumulator中形成一个一个的RecordBatch,然后再由后台的Sender Thread针对每个节点发送请求,请求中带有每个分区的RecordBatchKafka集群给Sender Thread的请求回响应,每个RecordBatch中的消息要么都写入成功,要么都失败。

New ProducerOld Producer最主要的区别就是New Producer使用非阻塞式IO并且实现都是基于异步的,同步接口也是基于异步实现。而Old Producer的异步接口是基于同步接口来实现。用下面的图可以说明问题:

image.png

可以看出使用非阻塞式的设计可以使得Producer同样发送3个请求完成的时间大大减少,提升吞吐量。


3.    Common相关类

3.1  Record

这个类是对Kafka存储消息结构体的封装。类似core 包中的Message

 

3.2  LogEntry

这个类是offsetrecord配对类, 类似于core包中的MessageAndOffset

3.3  Records

这个类是一个接口类,继承了Iterable<LogEntry>以便遍历集合。相当于core包中的MessageSet

image.png


提供了两个接口

writeTo: records记录内容写到指定的channel

sizeInBytes: 记录集中所有消息的长度

 

3.1  MemoryRecords

Records的实现类,分为读模式和写模式,带有一个ByteBuffer,初始化的时候为写模式,有点类似Core包中的ByteBufferMessageSet。这个类还带有压缩功能,在写入记录到这个buffer的时候就能自动压缩。

当一个RecordBach被取出来的时候,调用memoryRecordsclose接口,则不能再写入,切换为读模式

 

3.2  Cluster

保存集群信息的类,提供不同维度的查询分区信息功能,主要有以下成员变量:

Nodes: 节点信息列表

partitionsByTopicPartition: 是一个以TopicPartitionKey ValuePartitionInfohashMap

partitionsByTopic topic名称为Key,  所有分区信息(List<PartitionInfo>)为ValuehashMap

availablePartitionsByTopic topic名称为Key 所有可用分区为ValuehashMap

partitionsByNode:   broker节点编号为Key,所有分区信息(List<PartitionInfo>)为ValuehashMap

 

4.    网络相关类

4.1  KafkaClient

是一个接口类,提供了KafkaClient端的一系列接口,例如检查节点是否准备好发送,进行网络发送poll等。

 

4.2  NetworkClient

是对KafkaClient的实现类。主要的关键成员变量有:

selector: 封装了nio Selector对象,实现了selectable接口,主要用来做网络连接、发送接收等实际的网络操作。

inFlightRequests:  正在发送和等待响应的请求的个数。 这个值设置为1,则每次在发送路途上的请求只有1个,是实现发送保序的关键参数。

 

实现的关键接口有:

ready:  判断节点是否准备好发送,如果未准备好则开始连接到某个节点。如果已经连接则返回true

isReady: 用来判断某个节点是否可以发送。可以发送是指连接已经建立或者当前inFlightRequests未达到设定值。

poll : 初始化请求消息,定期做元数据更新并做真正的网络发送和接收请求。返回接收到的响应。后面有详细介绍

 

4.3  InFlightRequests

这个类用来管理正在发送或者等待响应的请求,主要成员变量有:

maxInFlightRequestsPerConnection 每个链接的在途请求最大个数

requests 是一个HashMap<Integer, Deque<ClientRequest>> Key为每个节点的broker ID Value为该节点所对应的请求。

 

主要接口如下:

add : 把某个请求添加到requests Map中。会自动从ClientRequest中找到node id,如果该id存在,则直接添加到其双向队列的头部,否则会创建这个node id的双向队列然后添加到队列中。

canSendMore: 判断某个节点是否能继续发送,能发送的条件是节点的双向队列不存在或者为空或者是第一个请求已经发送完毕(响应有可能未回来)并且队列个数未达到在途个数的最大值。

5.    生产者相关类

5.1  RecordMetadata

记录被服务器确认后的元数据。含有两个成员变量:

1.       Offset: 记录所在topicpartition下的offset

2.       topicPartition: TopicPartition类,即topic名称和分区所在的编号

 

5.2  ProduceRequestResult

这个类是作为producer请求的结果,异步接口返回的future对象里面包含这个result。所以这个接口需要实现等待和唤醒操作。先看一下它的成员变量:

latch: 一个同步计数器CountDownLatch 是实现等待和唤醒的关键,这个latch的初始化值为1

topicPartitionTopicPartition信息

baseOffset

 

 

 

5.3  FutureRecordMetadata

这个类是异步send接口返回的结果,是对Future<RecordMetadata>的实现。有两个成员变量:

result: 是一个ProduceRequestResult

relativeOffset 添加到RecordBatch时递增的offfset。例如第一条为0,第二条为1,依次类推

这个类主要实现接口:

get :  Futureget接口实现,在异步发送后调用get会等待。等待的原理是用result里面的await方法,背后就是一个CountDownLatchawait等待。直到有线程调用resultdone方法才返回。

 

5.4  RecodBatch

这个类是封装了一系列需要发送的一批消息。其中主要的成员变量有:

records MemoryRecords对象,用于保存一批记录。

topicPartition: 保存topic名称和分区编号

produceFuture: ProduceRequestResult对象

 

5.5  ProducerRecord

这个类是对发送消息的封装,主要有4个成员:

topic: 消息发往的topic名称,这个在生成消息时必须有

partition: 指定消息发往的分区编号,这个可以为null

key: 消息的key

value: 消息的具体内容

5.6  KafkaProducer

这个是生产者类,用户使用New Producer接口的时候需要先初始化这个类。这个类初始化的时候代码如下:


image.png

会生成累积器(RecordAccumulator)Sender线程,并运行Sender线程。

异步发送接口也是在这个类里面实现。

5.7  BufferPool

这个类是记录累积器中的内存池。主要成员有:

totalMemory 内存池所能分配内存的最大值。

free 是一个ByteBuffer的数组双端队列ArrayDeque<ByteBuffer>。初始化为拥有16ByteBuffer元素。用于分配空闲内存。

waiters: 条件变量的数组双端队列ArrayDeque<Condition>。当BufferPool中内存不足时,会生成一个条件变量用于等待空闲内存足够的时候被唤醒

5.8  RecordAccumulator

这个类类似一个queue,用于汇聚成记录以便批量发送到broker。它创建时会指定使用的内存总量,当内存不足时会阻塞不能再放入记录,直到已有记录被发送释放后才能继续放入记录。

image.png

它的一些成员变量如下:

batches: 是一个CopyOnWriteMap 用于保存每个TopoicPartition所对应的RecordBatch。这些RecordBatch用双端队列Deque来管理。

free:  是一个BufferPool,用于管理从系统分配的内存。

 

累积器有两个子类:

RecordAppendResult:用做把记录添加到累加器时返回的结果,只是对FutureRecordMetadata的一些封装,代码如下:

image.png

ReadyCheckResult:用作检查节点是否Redy的结果。

 

5.9 Metadata

这个类对元数据的一些操作进行了封装。主要的成员变量有:

cluster: 封装了一个Cluster元素,用于查询集群状态信息。对象初始化时Cluster初始化为Empty

topics: 是一个HashSet<String>对象,用于保存集群中的topic

提供如下接口:

fetch: 直接返回cluster对象

add: topic信息添加到topics 集合中

update:更新cluster信息,更新后并唤醒所有等待更新的线程。

 

6.    发送流程

调用发送接口的流程如下:

image.png

Send接口发送代码如下:

image.png

6.1  等待topic metadata可用

  代码如下:

image.png

metadata.requestUpdate

这个函数比较简单,设置更新标志位,同时返回当前metadata的版本,代码如下:

image.png

metadata.awaitUpdate

   这个函数的作用就是等待metadata版本更新,代码如下:

image.png

6.2  记录添加到累积器

这一步的代码如下:

image.png

我们再看看free.allocate,即BufferPoolallocate函数:

image.png

image.png

内存分配好后,看看RecordBatch.tryAppend函数,这个函数的作用是尝试把消息和callback函数添加到RecordBatch里,并返回一个元数据的Future,代码如下:

image.png


7.    后台io线程流程

后台io线程是Sender类,继承与Runnable接口,本身实现的run接口很简单,主要操作都在run(now)函数里面,代码如下:

image.png

下面我们主要看一次io迭代的流程:

image.png

代码以及注释如下:

image.png

image.png

我们先看看this.accumulator.drain是如何选取到要发送的batch的。代码就是RecordAccumulator.drain如下:

image.png

生成每个节点的发送RecordBatch后,就可以生成每个节点的请求,即createProduceRequests函数,代码如下:

image.png

我们再看看实际做网络请求的代码NetworkClient.poll

image.png

最后再看如何处理响应请求,代码是Sender.handleResponse

image.png

这里主要的函数就是completeBatch,代码如下:

image.png


8.    附录

Future

Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实现,可以来进行异步计算。

Future模式可以这样来描述:我有一个任务,提交给了FutureFuture替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。就相当于下了一张订货单,一段时间后可以拿着提订单来提货,这期间可以干别的任何事情。其中Future 接口就是订货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。

Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间。这个设置超时的方法就是实现Java程序执行超时的关键。

Future接口是一个泛型接口,严格的格式应该是Future<V>,其中V代表了Future执行的任务返回值的类型。 Future接口的方法介绍如下:

  • boolean cancel (boolean      mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束

  • boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true

  • boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true

  • V get () throws      InterruptedException, ExecutionException  等待任务执行结束,然后获得V类型的结果。InterruptedException      线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException

  • V get (long timeout,      TimeUnit unit) throws InterruptedException, ExecutionException,      TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException

Future的实现类有java.util.concurrent.FutureTask<V> javax.swing.SwingWorker<T,V>。通常使用FutureTask来处理我们的任务。FutureTask类同时又实现了Runnable接口,所以可以直接提交给Executor执行。使用FutureTask实现超时执行的代码如下:

image.png

不直接构造Future对象,也可以使用ExecutorService.submit方法来获得Future对象,submit方法即支持以 Callable接口类型,也支持Runnable接口作为参数,具有很大的灵活性。使用示例如下:image.png

利用Future接口实现程序执行超时大致用法就这么多,改天需要研究下Future接口的内部实现,特别是设定执行超时的实现。

 

CopyOnWrite

http://ifeve.com/java-copy-on-write/

 

Kafka Client Rewrite

https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。