kafka源码解析之二:Producer代码分析(scala版本)(上)

举报
步步清风 发表于 2017/11/13 10:33:17 2017/11/13
【摘要】 上期链接:kafka源码解析之一:源代码工程目录介绍1. 初始化代码分析:首先Demo里封装了一个Producer类, 这个类提供了同步和异步两种方式来发送消息。 异步发送消息是基于同步发送消息的接口来实现的。异步发送消息的实现很简单,客户端消息发送过来以后,先放入到一个队列中然后就返回了。Producer再开启一个线程(ProducerSendThread)不断从队列中取出消息,然后调用同步

声明:所有这系列代码分析章节,有些图片/资料整理来自网络公开资料,站在巨人肩膀上二次仔细解读代码总结分享给大家

上期链接:

kafka源码解析之一:源代码工程目录介绍

https://portal.huaweicloud.com/blogs/17b21140c51e11e7b8317ca23e93a891


1.   初始化代码分析:

image.png

首先Demo里封装了一个Producer类, 这个类提供了同步和异步两种方式来发送消息。 异步发送消息是基于同步发送消息的接口来实现的。异步发送消息的实现很简单,客户端消息发送过来以后,先放入到一个队列中然后就返回了。Producer再开启一个线程(ProducerSendThread)不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给Broker

Producer发送同步消息是委托给EventHandler做的,EventHandler是个接口,具体实现为DefaultEventHandler。它们的简化类图如下:


image.png

可以看到,Producer类中的成员producerSendThreadqueue是为了发送异步消息的,eventHandler是为了发送同步消息的,当然异步消息也需要它。KeyedMessage是封装了用户发送的消息。SeqScala中的序列,可以看成是Java中的List

在这个类的构造函数里面,首先创建ProduerConfig对象作为创建Kafka Producer的参数,然后生成Kafka Producer实例,实例构造函数其实做的事情是:

image.png

生成一个DefaultEventHandler对象, 同时需要生成ProducerPool对象、partition分类和序列化等。DefaultEventHandler这个类是接口EventHandler唯一的实现

简单类图如下:

image.png

2.   Producer发送流程

发送接口分为同步发送或异步发送,先来看看这两种方式的架构图:

Producer sync发送消息流程

image.png

Producer async发送消息流程

image.png


这两种方式的区别在于异步发送producer会创建一个send thread和一个Blocking Queueproducer把数据放入到Blocking Queue后就马上返回。Send thread线程再去Blocking Queue中批量取消息进行发送。后面发送的流程就跟同步发送相同了。

先看一下同步发送的流程:

image.png

客户端程序根据ProducerConfig配置生成Producer实例,当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:ProducerDefaultEventHandler。在创建的同时,会默认new一个ProducerPool,即我们每new一个javaProducer类,就会有创建ProducerEventHandlerProducerPoolProducerPool为连接不同kafka broker的池,初始连接个数有broker.list参数决定。

Producer每隔一段时间会定期从broker.list中获取选取一个broker更新metedata信息,流程是右边的框架。同时这个功能还是producer实现broker动态扩容的关键,通过定时与现有的metadata.broker.list通信,获取新增broker信息,然后把新建的SyncProducer放入ProducerPool中(见2.2updateinfo)。

我们先看同步发送的发送接口的代码:

image.png

2.1.           DefaultEventHandler.handle

再来看DefaultEventHandlerhandle代码流程:

image.png

代码如下并在代码中添加了注释:

image.png

image.png


2.2.           updateinfo

从主流程可以看到,Producer会定时更新brokermetedata信息或者在有发送数据失败后也会更新,代码如下:

image.png

image.png

关于上面代码中的最后一行, 我们需要着重说一下。每个producer应用程序都会保存一个producer池对象来缓存每个broker上对应的同步producer实例。具体格式为brokerId -> SyncProducerSyncProducer表示一个同步producer,其主要的方法是send,支持两种请求的发送:ProducerRequestTopicMetadataRequest。前者是发送消息的请求,后者是更新topic元数据信息的请求。为什么需要这份缓存呢?我们知道,每个topic分区都应该有一个leader副本在某个broker上,而只有leader副本才能接收客户端发来的读写消息请求。对producer而言,即只有这个leader副本所在的broker才能接收ProducerRequest请求。在发送消息时候,我们会首先找出这个消息要发给哪个topic,然后发送更新topic元数据请求给任意broker去获取最新的元数据信息——这部分信息中比较重要的就是要获取topic各个分区的leader副本都在哪些broker上,这样我们稍后会创建连接那些broker的阻塞通道(blocking channel)去实现真正的消息发送。Kafka目前的做法就是重建所有topic分区的leader副本所属broker上对应的SyncProducer实例

     我们在进入ClientUtils.fetchTopicMetadata看看是怎样获取topic metadata

2.3.           fetchTopicMetadata


image.png

image.png

这个函数主要做的事情是构造TopicMetadataRequest,然后从第一个broker开始尝试发送请求。一旦成功从一个broker中获取了metadata就终止后面的请求发送尝试。

   TopicMetadataRequestKeyRequestKeys.MetadataKeyBroker端根据这个Key选择处理什么类型的请求。

2.4.           dispatchSerializedData

我们再看消息发送的代码, 是在代码dispatchSerializedData中实现,该方法接收一组KeyedMessage消息集合并返回发送失败的消息集合。如果返回None自然表示发送成功。该方法主要的逻辑如下图所示:


image.png

dispatchSerializedData代码如下:

image.png

image.png


下面对里面的子函数再进一步进行分析。

2.5.           partitionAndCollate

消息分组,函数是partitionAndCollate,代码如下:

image.png

image.png

image.png

该方法接收一组待发送的消息集合——Scala表示的话就是Seq[KeyedMessage[K, Message]],在我们的例子中很显然这个集合中有4条消息。这个方法的输出比较复杂,完整的写法是:

Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]]]

熟悉Scala语法的朋友可能会知道,这个返回值类型表示该方法可能会返回None——这表示producer代码没法对你要发送的消息按照broker进行分组或在分组过程中遇到了严重的错误,只能返回None由上层代码来处理这种情况。如果确实返回了值,这个值长的是什么样子呢?拿我们的例子来说,假定每条消息去被发送到的分区如下:(这里的对应关系是假设的,其实在partitionAndCollate方法中会为每条消息都分配它要去的分区!)


消息

要被发送到的分区ID

该分区leader副本所在broker ID

M1

P0

0

M2

P0

0

M3

P1

1

M4

P2

3



 


 



那么这个方法返回的结果就是:

0 - > {test-topic + P0 -> {M1, M2}},

1 -> {test-topic + P1 -> {M3}},

2 -> {test-topic + P2 -> {M4}} 

}


该方法的效果就是将所有待发送的消息首先按照broker进行分组,然后再按照分区进行整理。 

当然了,上面我们假定了每条消息要去的分区,其实这也是在partitionAndCollate方法中被计算出来的。主要的逻辑是:

 

1. 首先判断每条消息的分区key是否指定,如果指定了调用默认的分区类Partitionerpartition计算目标分区就是了。

 

2. 如果没有指定key,就像默认使用console-producer的情况,代码会首先从缓存中判断以前是否保存该topic的信息——即该topic下所有没有key的消息默认会被发送到同一个分区下。如果存在直接找出来就好了;否则随机挑选一个返回并把它加入到缓存中,如下面代码所示:

val index = Utils.abs(Random.nextInt) % availablePartitions.size // 随机确定broker id

val partitionId = availablePartitions(index).partitionId

sendPartitionPerTopicCache.put(topic, partitionId) // 加入缓存中以便后续使用


2.6.           groupMessagesToSet

通过上一步中将待发送消息集合按照brokertopic分区进行分组,Kafka对要发送的消息进行了分区。该操作完成之后代码就需要遍历整理过的消息数据,获取消息数据中每个broker对应的分区消息映射,也就是类似于{test-topic + P0 -> {M1, M2}}这样的数据。然后将每个映射转换为这样的格式:

{(topic + 分区,  ByteBufferMessageSet(message),  (topic + 分区, ByteBufferMessage(message) }

还是以我们的例子而言,经过groupMessageToSet之后,每个broker对应的数据变为:

{

(topic + P0 ByteBufferMessageSet(M1, M2)),

(topic + P1 ByteBufferMessageSet(M3)),

(topic + P2 ByteBufferMessageSet(M4)),

}

这个方法还考虑压缩的情况,即producer的属性compression.codec中指定的压缩策略。如果启用了压缩,追加写当前日志段的时候会先解压缩消息再写入(详见Log.scalaappend方法)

2.7.           send

private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {

    if(brokerId < 0) {

      warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))

      messagesPerTopic.keys.toSeq

    } else if(messagesPerTopic.size > 0) {

      val currentCorrelationId = correlationId.getAndIncrement

      //生产producer request

      val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,

        config.requestTimeoutMs, messagesPerTopic)

      var failedTopicPartitions = Seq.empty[TopicAndPartition]

      try {

        //获取与brokerID对应的brokersyncProducer对象,对象里封装了与broker的链接

        val syncProducer = producerPool.getProducer(brokerId)

        debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"

          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))

        val response = syncProducer.send(producerRequest)

        debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"

          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))

        if(response != null) {

          if (response.status.size != producerRequest.data.size)

            throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))

          if (logger.isTraceEnabled) {

            val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)

            successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>

              trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload)))))

          }

          val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq

          failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)

          if(failedTopicPartitions.size > 0) {

            val errorString = failedPartitionsAndStatus

              .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||

                                    (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))

              .map{

                case(topicAndPartition, status) =>

                  topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName

              }.mkString(",")

            warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))

          }

          failedTopicPartitions

        } else {

          Seq.empty[TopicAndPartition]

        }

      } catch {

        case t: Throwable =>

          warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"

            .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)

          messagesPerTopic.keys.toSeq

      }

    } else {

      List.empty

    }

  }


   这个方法基于上一步中构造的(topic+分区, ByteBufferMessageSet)元组构造ProducerRequest发送给对应的broker,并返回发送失败的topic分区集合。具体的逻辑如下:

1. 判断要发送到的broker id是否合法,如果小于0的话(通常是-1),说明消息要发送到的分区没有leader。这种情况下直接记录一个警告信息并直接返回未发送的消息集合

2. 如果broker id是合法的,那么还需要再判断一下要发送的消息是否为空,如果为空自然也不需要做什么,直接返回空集合就好了

3. 如果上一步中的确有要发送的消息,那么就根据request.required.acks以及超时时间等配置构造一个ProducerRequest将消息封装进这个请求中。

4. 获取这个broker上的syncProducer——这个也是从producer池缓存中拿到的,如果池缓存中没有的话也只是记录为一个警告,下次重试的时候刷新一下topic元数据信息就能够创建出来了。

5. 一旦拿到目标broker上的syncProducer,就可以使用它来发送请求了,即调用syncProducer.send(producerRequest)

6. 请求被Kafka server处理之后(如何处理的下面会有详细介绍)会发送一个对应的响应(response)eventHandler

7. 拿到response之后需要判断一下response是否为空。这其实还要看下request.required.acks的设置。当该值是默认值0时表示producer不需要等待broker的应答(acknowledgement),这可以带来最低的延迟但持久性也最差,因为如果一个broker宕机了有可能会丢失数据。如果该值是0 那么Kafka处理完ProducerRequest之后并不发送任何response。因此若发现response是空,那么自然表示所有数据已经被发送了,返回空集合表示没有发送失败的分区消息

8. 但倘若request.required.acks1(其实还有两种情况,比如分区数是0——这里不做讨论),那么就表示producerleader副本获得数据后需要等待broker的应答。这个值的设置有更好的持久化效果。假设request.required.acks1的话,那么Kafka处理完请求后悔发送response,因此代码还要继续解析response中的数据以确定到底有无失败消息

9. 在开始解析response代码之前,先来说说ProduceResponse的格式,如下图所示:

image.png

response中比较重要的信息是topic下面多个分区对应的错误码和消息待追加的第一条消息的位移。

 

因此,在拿到response之后,需要先判断一下response中总的分区数是否和请求中的分区数一样,如果不同的话说明在返回的response不完整,Kafka代码会抛出异常。否则,就从response中找出那些有错误的分区(即错误码不是NoError)并返回。

 

至此,客户端的producer程序就已经执行完毕了。可能有些人会感到奇怪?貌似消息只是以请求的方式被发送到Kafka server上,但消息不是还要被写入到日志中吗?这部分功能又是在哪里做的呢? 我们会在后面的章节介绍Server是如何处理ProducerRequest的。


下一篇接上。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。