kafka源码解析之二:Producer代码分析(scala版本)(上)
声明:所有这系列代码分析章节,有些图片/资料整理来自网络公开资料,站在巨人肩膀上二次仔细解读代码总结分享给大家
上期链接:
kafka源码解析之一:源代码工程目录介绍
https://portal.huaweicloud.com/blogs/17b21140c51e11e7b8317ca23e93a891
1. 初始化代码分析:
首先Demo里封装了一个Producer类, 这个类提供了同步和异步两种方式来发送消息。 异步发送消息是基于同步发送消息的接口来实现的。异步发送消息的实现很简单,客户端消息发送过来以后,先放入到一个队列中然后就返回了。Producer再开启一个线程(ProducerSendThread)不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给Broker。
Producer发送同步消息是委托给EventHandler做的,EventHandler是个接口,具体实现为DefaultEventHandler。它们的简化类图如下:
可以看到,Producer类中的成员producerSendThread和queue是为了发送异步消息的,eventHandler是为了发送同步消息的,当然异步消息也需要它。KeyedMessage是封装了用户发送的消息。Seq是Scala中的序列,可以看成是Java中的List。
在这个类的构造函数里面,首先创建ProduerConfig对象作为创建Kafka Producer的参数,然后生成Kafka Producer实例,实例构造函数其实做的事情是:
生成一个DefaultEventHandler对象, 同时需要生成ProducerPool对象、partition分类和序列化等。DefaultEventHandler这个类是接口EventHandler唯一的实现
简单类图如下:
2. Producer发送流程
发送接口分为同步发送或异步发送,先来看看这两种方式的架构图:
Producer sync发送消息流程
Producer async发送消息流程
这两种方式的区别在于异步发送producer会创建一个send thread和一个Blocking Queue,producer把数据放入到Blocking Queue后就马上返回。Send thread线程再去Blocking Queue中批量取消息进行发送。后面发送的流程就跟同步发送相同了。
先看一下同步发送的流程:
客户端程序根据ProducerConfig配置生成Producer实例,当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer、DefaultEventHandler。在创建的同时,会默认new一个ProducerPool,即我们每new一个java的Producer类,就会有创建Producer、EventHandler和ProducerPool,ProducerPool为连接不同kafka broker的池,初始连接个数有broker.list参数决定。
Producer每隔一段时间会定期从broker.list中获取选取一个broker更新metedata信息,流程是右边的框架。同时这个功能还是producer实现broker动态扩容的关键,通过定时与现有的metadata.broker.list通信,获取新增broker信息,然后把新建的SyncProducer放入ProducerPool中(见2.2的updateinfo)。
我们先看同步发送的发送接口的代码:
2.1. DefaultEventHandler.handle
再来看DefaultEventHandler的handle代码流程:
代码如下并在代码中添加了注释:
2.2. updateinfo
从主流程可以看到,Producer会定时更新broker的metedata信息或者在有发送数据失败后也会更新,代码如下:
关于上面代码中的最后一行, 我们需要着重说一下。每个producer应用程序都会保存一个producer池对象来缓存每个broker上对应的同步producer实例。具体格式为brokerId -> SyncProducer。SyncProducer表示一个同步producer,其主要的方法是send,支持两种请求的发送:ProducerRequest和TopicMetadataRequest。前者是发送消息的请求,后者是更新topic元数据信息的请求。为什么需要这份缓存呢?我们知道,每个topic分区都应该有一个leader副本在某个broker上,而只有leader副本才能接收客户端发来的读写消息请求。对producer而言,即只有这个leader副本所在的broker才能接收ProducerRequest请求。在发送消息时候,我们会首先找出这个消息要发给哪个topic,然后发送更新topic元数据请求给任意broker去获取最新的元数据信息——这部分信息中比较重要的就是要获取topic各个分区的leader副本都在哪些broker上,这样我们稍后会创建连接那些broker的阻塞通道(blocking channel)去实现真正的消息发送。Kafka目前的做法就是重建所有topic分区的leader副本所属broker上对应的SyncProducer实例
我们在进入ClientUtils.fetchTopicMetadata看看是怎样获取topic metadata的
2.3. fetchTopicMetadata
这个函数主要做的事情是构造TopicMetadataRequest,然后从第一个broker开始尝试发送请求。一旦成功从一个broker中获取了metadata就终止后面的请求发送尝试。
TopicMetadataRequest的Key是RequestKeys.MetadataKey,Broker端根据这个Key选择处理什么类型的请求。
2.4. dispatchSerializedData
我们再看消息发送的代码, 是在代码dispatchSerializedData中实现,该方法接收一组KeyedMessage消息集合并返回发送失败的消息集合。如果返回None自然表示发送成功。该方法主要的逻辑如下图所示:
dispatchSerializedData代码如下:
下面对里面的子函数再进一步进行分析。
2.5. partitionAndCollate
消息分组,函数是partitionAndCollate,代码如下:
该方法接收一组待发送的消息集合——用Scala表示的话就是Seq[KeyedMessage[K, Message]],在我们的例子中很显然这个集合中有4条消息。这个方法的输出比较复杂,完整的写法是:
Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]]]
熟悉Scala语法的朋友可能会知道,这个返回值类型表示该方法可能会返回None——这表示producer代码没法对你要发送的消息按照broker进行分组或在分组过程中遇到了严重的错误,只能返回None由上层代码来处理这种情况。如果确实返回了值,这个值长的是什么样子呢?拿我们的例子来说,假定每条消息去被发送到的分区如下:(这里的对应关系是假设的,其实在partitionAndCollate方法中会为每条消息都分配它要去的分区!)
消息 | 要被发送到的分区ID | 该分区leader副本所在broker ID |
M1 | P0 | 0 |
M2 | P0 | 0 |
M3 | P1 | 1 |
M4 | P2 | 3 |
那么这个方法返回的结果就是:
{
0 - > {test-topic + P0 -> {M1, M2}},
1 -> {test-topic + P1 -> {M3}},
2 -> {test-topic + P2 -> {M4}}
}
该方法的效果就是将所有待发送的消息首先按照broker进行分组,然后再按照分区进行整理。
当然了,上面我们假定了每条消息要去的分区,其实这也是在partitionAndCollate方法中被计算出来的。主要的逻辑是:
1. 首先判断每条消息的分区key是否指定,如果指定了调用默认的分区类Partitioner的partition计算目标分区就是了。
2. 如果没有指定key,就像默认使用console-producer的情况,代码会首先从缓存中判断以前是否保存该topic的信息——即该topic下所有没有key的消息默认会被发送到同一个分区下。如果存在直接找出来就好了;否则随机挑选一个返回并把它加入到缓存中,如下面代码所示:
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 随机确定broker id
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 加入缓存中以便后续使用
2.6. groupMessagesToSet
通过上一步中将待发送消息集合按照broker和topic分区进行分组,Kafka对要发送的消息进行了分区。该操作完成之后代码就需要遍历整理过的消息数据,获取消息数据中每个broker对应的分区消息映射,也就是类似于{test-topic + P0 -> {M1, M2}}这样的数据。然后将每个映射转换为这样的格式:
{(topic + 分区, ByteBufferMessageSet(message), (topic + 分区, ByteBufferMessage(message) }。
还是以我们的例子而言,经过groupMessageToSet之后,每个broker对应的数据变为:
{
(topic + P0, ByteBufferMessageSet(M1, M2)),
(topic + P1, ByteBufferMessageSet(M3)),
(topic + P2, ByteBufferMessageSet(M4)),
}
这个方法还考虑压缩的情况,即producer的属性compression.codec中指定的压缩策略。如果启用了压缩,追加写当前日志段的时候会先解压缩消息再写入(详见Log.scala的append方法)。
2.7. send
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) {
warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
val currentCorrelationId = correlationId.getAndIncrement
//生产producer request
val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
config.requestTimeoutMs, messagesPerTopic)
var failedTopicPartitions = Seq.empty[TopicAndPartition]
try {
//获取与brokerID对应的broker的syncProducer对象,对象里封装了与broker的链接
val syncProducer = producerPool.getProducer(brokerId)
debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
val response = syncProducer.send(producerRequest)
debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
if(response != null) {
if (response.status.size != producerRequest.data.size)
throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
if (logger.isTraceEnabled) {
val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload)))))
}
val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
if(failedTopicPartitions.size > 0) {
val errorString = failedPartitionsAndStatus
.sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
(p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
.map{
case(topicAndPartition, status) =>
topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
}.mkString(",")
warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
}
failedTopicPartitions
} else {
Seq.empty[TopicAndPartition]
}
} catch {
case t: Throwable =>
warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
.format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
messagesPerTopic.keys.toSeq
}
} else {
List.empty
}
}
这个方法基于上一步中构造的(topic+分区, ByteBufferMessageSet)元组构造ProducerRequest发送给对应的broker,并返回发送失败的topic分区集合。具体的逻辑如下:
1. 判断要发送到的broker id是否合法,如果小于0的话(通常是-1),说明消息要发送到的分区没有leader。这种情况下直接记录一个警告信息并直接返回未发送的消息集合
2. 如果broker id是合法的,那么还需要再判断一下要发送的消息是否为空,如果为空自然也不需要做什么,直接返回空集合就好了
3. 如果上一步中的确有要发送的消息,那么就根据request.required.acks以及超时时间等配置构造一个ProducerRequest将消息封装进这个请求中。
4. 获取这个broker上的syncProducer——这个也是从producer池缓存中拿到的,如果池缓存中没有的话也只是记录为一个警告,下次重试的时候刷新一下topic元数据信息就能够创建出来了。
5. 一旦拿到目标broker上的syncProducer,就可以使用它来发送请求了,即调用syncProducer.send(producerRequest)
6. 请求被Kafka server处理之后(如何处理的下面会有详细介绍)会发送一个对应的响应(response)给eventHandler。
7. 拿到response之后需要判断一下response是否为空。这其实还要看下request.required.acks的设置。当该值是默认值0时表示producer不需要等待broker的应答(acknowledgement),这可以带来最低的延迟但持久性也最差,因为如果一个broker宕机了有可能会丢失数据。如果该值是0, 那么Kafka处理完ProducerRequest之后并不发送任何response。因此若发现response是空,那么自然表示所有数据已经被发送了,返回空集合表示没有发送失败的分区消息
8. 但倘若request.required.acks是1(其实还有两种情况,比如分区数是0等——这里不做讨论),那么就表示producer在leader副本获得数据后需要等待broker的应答。这个值的设置有更好的持久化效果。假设request.required.acks是1的话,那么Kafka处理完请求后悔发送response,因此代码还要继续解析response中的数据以确定到底有无失败消息
9. 在开始解析response代码之前,先来说说ProduceResponse的格式,如下图所示:
response中比较重要的信息是topic下面多个分区对应的错误码和消息待追加的第一条消息的位移。
因此,在拿到response之后,需要先判断一下response中总的分区数是否和请求中的分区数一样,如果不同的话说明在返回的response不完整,Kafka代码会抛出异常。否则,就从response中找出那些有错误的分区(即错误码不是NoError的)并返回。
至此,客户端的producer程序就已经执行完毕了。可能有些人会感到奇怪?貌似消息只是以请求的方式被发送到Kafka server上,但消息不是还要被写入到日志中吗?这部分功能又是在哪里做的呢? 我们会在后面的章节介绍Server是如何处理ProducerRequest的。
下一篇接上。
- 点赞
- 收藏
- 关注作者
评论(0)