kafka源码解析之二:Producer代码分析(scala版本)(下)

举报
步步清风 发表于 2017/11/13 10:41:08 2017/11/13
【摘要】 这里主要是根据config.queueEnqueueTimeoutMs参数选择不同的入队列方式,该参数的值含义如下:# Timeout for event enqueue:# 0: events will be enqueued immediately or dropped if the queue is full# -ve: enqueue will block indefinitely if

这里主要是根据config.queueEnqueueTimeoutMs参数选择不同的入队列方式,该参数的值含义如下:

# Timeout for event enqueue:

# 0: events will be enqueued immediately or dropped if the queue is full

# -ve: enqueue will block indefinitely if the queue is full

# +ve: enqueue will block up to this many milliseconds if the queue is full

#queue.enqueue.timeout.ms=


   最后再看看异步发送线程里面到底做了些什么,线程的run函数很简单只是调用processEvents函数,代码如下:

private def processEvents() {

    var lastSend = SystemTime.milliseconds

    var events = new ArrayBuffer[KeyedMessage[K,V]]

    var full: Boolean = false


    // drain the queue until you get a shutdown command

    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))

                      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {

      currentQueueItem =>

        val elapsed = (SystemTime.milliseconds - lastSend)

        // check if the queue time is reached. This happens when the poll method above returns after a timeout and

        // returns a null object

        val expired = currentQueueItem == null

        if(currentQueueItem != null) {

          trace("Dequeued item for topic %s, partition key: %s, data: %s"

              .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))

          events += currentQueueItem

        }


        // check if the batch size is reached

        full = events.size >= batchSize

        //有两种情况会导致消息开始发送,一种是events数组个数达到批量发送的个数,另一种是poll超时后也会发送

        if(full || expired) {

          if(expired)

            debug(elapsed + " ms elapsed. Queue time reached. Sending..")

          if(full)

            debug("Batch full. Sending..")

          // if either queue time has reached or batch size has reached, dispatch to event handler

          tryToHandle(events)

          lastSend = SystemTime.milliseconds

          events = new ArrayBuffer[KeyedMessage[K,V]]

        }

    }

    // send the last batch of events

    tryToHandle(events)

    if(queue.size > 0)

      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"

        .format(queue.size))

  }

   这个函数主要做的事情是从queue.poll 中获取消息,直到收到一个shutdown命令

把每个消息加到events数组中, 如果evnes数组个数得到config.batchNumMessages或者得到超时时间,则批量发送数据。

最后发送时调用handler.handle(events)进行发送,流程跟同步发送一样


3.   Server处理发送流程

Kafka server在启动的时候会开启N个线程来处理请求。其中N是由num.io.threads属性指定,默认是8Kafka推荐你设置该值至少是机器上磁盘数。在KafkaServerstartup方法中,如代码所示:

def startup() {

    ...

    // 创建一个请求处理的线程池,在构造时就会开启多个线程准备接收请求

    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

    ...

}

 

class KafkaRequestHandlerPool {

    ...

    for(i <- 0 until numThreads) {

        runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)

        threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))

        threads(i).start() // 启动每个请求处理线程

    }

    ...

}

KafkaRequestHandler实际上是一个Runnable,它的run核心方法中以while (true)的方式调用api.handle(request)不断地接收请求处理,如下面的代码所示:  

class KafkaRequestHandler... extends Runnable {

    ...

    def run() {

        ...

        while (true) {

            ...

            apis.handle(request) // 调用apis.handle等待请求处理

        }

        ...

    }

    ...   

}

KafkaApishandle的主要作用就是接收各种类型的请求。本文只关注ProducerRequest请求:  

def handle(request: RequestChannel.Request) {

    ...

    request.requestId match {

        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) // 如果接收到ProducerRequest交由handleProducerOrOffsetCommitRequest处理

        case ...

    }

    ...

}

如此看来,核心的方法就是handleProducerOrOffsetCommitRequest了。这个方法之所以叫这个名字,是因为它同时可以处理ProducerRequestOffsetCommitRequest两种请求,后者其实也是一种特殊的ProducerRequest。从Kafka 0.8.2之后kafka使用一个特殊的topic来保存提交位移(commit offset)。这个topic名字是__consumer_offsets。本文中我们关注的是真正的ProducerRequest。下面来看看这个方法的逻辑,如下图所示:

image.png

整体逻辑看上去非常简单,如下面的代码所示:

def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {

    ...

    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) // 将消息追加写入本地提交日志

    val numPartitionsInError = localProduceResults.count(_.error.isDefined) // 计算是否存在发送失败的分区

    if(produceRequest.requiredAcks == 0) { // request.required.acks = 0时的代码路径

      if (numPartitionsInError != 0) {

        info(("Send the close connection response due to error handling produce request " +

          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")

          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))

        requestChannel.closeConnection(request.processor, request) // 关闭底层Socket以告知客户端程序有发送失败的情况

      } else {

        ...

      }

    } else if (produceRequest.requiredAcks == 1 || // request.required.acks = 0时的代码路径,当然还有其他两个条件

        produceRequest.numPartitions <= 0 ||

        numPartitionsInError == produceRequest.numPartitions) {

      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))

                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))

      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) // 发送response给客户端

    } else { //  request.required.acks = -1时的代码路径

      // create a list of (topic, partition) pairs to use as keys for this delayed request

      val producerRequestKeys = produceRequest.data.keys.toSeq

      val statuses = localProduceResults.map(r =>

        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap

      val delayedRequest =  new DelayedProduce(...) // 此时需要构造延时请求进行处理,此段逻辑比较复杂,需要理解Purgatory的概念,本文暂不考虑,后续再分析

        ...

}

由上面代码可见,无论request.required.acks是何值,都需要首先将待发送的消息集合追加写入本地的提交日志中。此时如何按照默认值是是0的情况,那么这写入日志后需要判断下所有消息是否都已经发送成功了。如果出现了发送错误,那么就将关闭连入brokerSocket Server以通知客户端程序错误的发生。



最后的这个方法就是PartitionappendMessagesToLeader,其主要代码如下:

def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {

    inReadLock(leaderIsrUpdateLock) {

      val leaderReplicaOpt = leaderReplicaIfLocal() // 判断目标分区的leader副本是否在该broker

      leaderReplicaOpt match {

        case Some(leaderReplica) => // 如果leader副本在该broker

          val log = leaderReplica.log.get // 获取本地提交日志文件句柄

          val minIsr = log.config.minInSyncReplicas

          val inSyncSize = inSyncReplicas.size

 

          // Avoid writing to leader if there are not enough insync replicas to make it safe

          if (inSyncSize < minIsr && requiredAcks == -1) { //只有request.required.acks等于-1时才会判断ISR数是否不足

            throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"

              .format(topic,partitionId,minIsr,inSyncSize))

          }

          val info = log.append(messages, assignOffsets = true) // 真正的写日志操作,由于涉及Kafka底层写日志的,以后有机会写篇文章专门探讨这部分功能

          // probably unblock some follower fetch requests since log end offset has been updated

          replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))

          // we may need to increment high watermark since ISR could be down to 1

          maybeIncrementLeaderHW(leaderReplica)

          info

        case None => // 如果不在,直接抛出异常表明leader不在该broker

          throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"

            .format(topic, partitionId, localBrokerId))

      }

    }

至此,一个最简单的scala版同步producer的代码走读就算正式完成了,可以发现Kafka设计的思路就是在每个broker上启动一个server不断地处理从客户端发来的各种请求,完成对应的功能并按需返回对应的response



4.   问题 解答

1. producer客户端是否会跟zk交互?

  实际上不会跟zookeeper交互,所有的获取metedata的信息都是通过和broker来获取。只有Consumer需要和zk交互。


2. compression.codec 这个参数是如何使用的

这个是Producer端的一个参数,用来设置消息打包是否要加压。从官网看目前支持如下选项:This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip", "snappy" and “lz4”.

例如设置为:

compression.codec=none


3. 问题  blockingChannel.send(request) 到底是阻塞的还是非阻塞的。

这个是阻塞的发送。因为在BlockingChannel中的Connect中,设置的socket属性是blocking

class BlockingChannel( val host: String,

                       val port: Int,

                       val readBufferSize: Int,

                       val writeBufferSize: Int,

                       val readTimeoutMs: Int ) extends Logging { def connect() = lock synchro n

ized  {

   …

   def connect() = lock syn

chro n ized  {

    if(!connected) {

      try {

        channel = SocketChannel.open()

        if(readBufferSize > 0)

          channel.socket.setReceiveBufferSize(readBufferSize)

        if(writeBufferSize > 0)

          channel.socket.setSendBufferSize(writeBufferSize)

        channel.configureBlocking(true) }


   …

5.

  

附录1.

      

Case


http://nerd-is.in/2013-09/scala-learning-pattern-matching-and-case-classes/

6.

  

参考资料1.       Kafka Producer同步模式发送message

源码分析


http://blog.csdn.net/itleochen/article/details/19926785

2.       Kafka producer原理 (Scala版同步

producer)

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。