kafka源码解析之二:Producer代码分析(scala版本)(下)
这里主要是根据config.queueEnqueueTimeoutMs参数选择不同的入队列方式,该参数的值含义如下:
# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueue.timeout.ms=
最后再看看异步发送线程里面到底做了些什么,线程的run函数很简单只是调用processEvents函数,代码如下:
private def processEvents() {
var lastSend = SystemTime.milliseconds
var events = new ArrayBuffer[KeyedMessage[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
currentQueueItem =>
val elapsed = (SystemTime.milliseconds - lastSend)
// check if the queue time is reached. This happens when the poll method above returns after a timeout and
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
events += currentQueueItem
}
// check if the batch size is reached
full = events.size >= batchSize
//有两种情况会导致消息开始发送,一种是events数组个数达到批量发送的个数,另一种是poll超时后也会发送
if(full || expired) {
if(expired)
debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full)
debug("Batch full. Sending..")
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
// send the last batch of events
tryToHandle(events)
if(queue.size > 0)
throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
.format(queue.size))
}
这个函数主要做的事情是从queue.poll 中获取消息,直到收到一个shutdown命令
把每个消息加到events数组中, 如果evnes数组个数得到config.batchNumMessages或者得到超时时间,则批量发送数据。
最后发送时调用handler.handle(events)进行发送,流程跟同步发送一样 。
3. Server处理发送流程
Kafka server在启动的时候会开启N个线程来处理请求。其中N是由num.io.threads属性指定,默认是8。Kafka推荐你设置该值至少是机器上磁盘数。在KafkaServer的startup方法中,如代码所示:
def startup() {
...
// 创建一个请求处理的线程池,在构造时就会开启多个线程准备接收请求
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
...
}
class KafkaRequestHandlerPool {
...
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start() // 启动每个请求处理线程
}
...
}
KafkaRequestHandler实际上是一个Runnable,它的run核心方法中以while (true)的方式调用api.handle(request)不断地接收请求处理,如下面的代码所示:
class KafkaRequestHandler... extends Runnable {
...
def run() {
...
while (true) {
...
apis.handle(request) // 调用apis.handle等待请求处理
}
...
}
...
}
在KafkaApis中handle的主要作用就是接收各种类型的请求。本文只关注ProducerRequest请求:
def handle(request: RequestChannel.Request) {
...
request.requestId match {
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) // 如果接收到ProducerRequest交由handleProducerOrOffsetCommitRequest处理
case ...
}
...
}
如此看来,核心的方法就是handleProducerOrOffsetCommitRequest了。这个方法之所以叫这个名字,是因为它同时可以处理ProducerRequest和OffsetCommitRequest两种请求,后者其实也是一种特殊的ProducerRequest。从Kafka 0.8.2之后kafka使用一个特殊的topic来保存提交位移(commit offset)。这个topic名字是__consumer_offsets。本文中我们关注的是真正的ProducerRequest。下面来看看这个方法的逻辑,如下图所示:
整体逻辑看上去非常简单,如下面的代码所示:
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
...
val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) // 将消息追加写入本地提交日志
val numPartitionsInError = localProduceResults.count(_.error.isDefined) // 计算是否存在发送失败的分区
if(produceRequest.requiredAcks == 0) { // request.required.acks = 0时的代码路径
if (numPartitionsInError != 0) {
info(("Send the close connection response due to error handling produce request " +
"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
requestChannel.closeConnection(request.processor, request) // 关闭底层Socket以告知客户端程序有发送失败的情况
} else {
...
}
} else if (produceRequest.requiredAcks == 1 || // request.required.acks = 0时的代码路径,当然还有其他两个条件
produceRequest.numPartitions <= 0 ||
numPartitionsInError == produceRequest.numPartitions) {
val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) // 发送response给客户端
} else { // request.required.acks = -1时的代码路径
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest = new DelayedProduce(...) // 此时需要构造延时请求进行处理,此段逻辑比较复杂,需要理解Purgatory的概念,本文暂不考虑,后续再分析
...
}
由上面代码可见,无论request.required.acks是何值,都需要首先将待发送的消息集合追加写入本地的提交日志中。此时如何按照默认值是是0的情况,那么这写入日志后需要判断下所有消息是否都已经发送成功了。如果出现了发送错误,那么就将关闭连入broker的Socket Server以通知客户端程序错误的发生。
最后的这个方法就是Partition的appendMessagesToLeader,其主要代码如下:
def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {
inReadLock(leaderIsrUpdateLock) {
val leaderReplicaOpt = leaderReplicaIfLocal() // 判断目标分区的leader副本是否在该broker上
leaderReplicaOpt match {
case Some(leaderReplica) => // 如果leader副本在该broker上
val log = leaderReplica.log.get // 获取本地提交日志文件句柄
val minIsr = log.config.minInSyncReplicas
val inSyncSize = inSyncReplicas.size
// Avoid writing to leader if there are not enough insync replicas to make it safe
if (inSyncSize < minIsr && requiredAcks == -1) { //只有request.required.acks等于-1时才会判断ISR数是否不足
throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
.format(topic,partitionId,minIsr,inSyncSize))
}
val info = log.append(messages, assignOffsets = true) // 真正的写日志操作,由于涉及Kafka底层写日志的,以后有机会写篇文章专门探讨这部分功能
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
case None => // 如果不在,直接抛出异常表明leader不在该broker上
throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, localBrokerId))
}
}
至此,一个最简单的scala版同步producer的代码走读就算正式完成了,可以发现Kafka设计的思路就是在每个broker上启动一个server不断地处理从客户端发来的各种请求,完成对应的功能并按需返回对应的response
4. 问题 解答
1. producer客户端是否会跟zk交互?
实际上不会跟zookeeper交互,所有的获取metedata的信息都是通过和broker来获取。只有Consumer需要和zk交互。
2. compression.codec 这个参数是如何使用的
这个是Producer端的一个参数,用来设置消息打包是否要加压。从官网看目前支持如下选项:This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip", "snappy" and “lz4”.
例如设置为:
compression.codec=none
3. 问题 : blockingChannel.send(request) 到底是阻塞的还是非阻塞的。
这个是阻塞的发送。因为在BlockingChannel中的Connect中,设置的socket属性是blocking:
class BlockingChannel( val host: String,
val port: Int,
val readBufferSize: Int,
val writeBufferSize: Int,
val readTimeoutMs: Int ) extends Logging { def connect() = lock synchro n
ized {
…
def connect() = lock syn
chro n ized {
if(!connected) {
try {
channel = SocketChannel.open()
if(readBufferSize > 0)
channel.socket.setReceiveBufferSize(readBufferSize)
if(writeBufferSize > 0)
channel.socket.setSendBufferSize(writeBufferSize)
channel.configureBlocking(true) }
…
5.
附录1.
Case
类
http://nerd-is.in/2013-09/scala-learning-pattern-matching-and-case-classes/
6.
参考资料1. Kafka Producer同步模式发送message
http://blog.csdn.net/itleochen/article/details/19926785
producer)
- 点赞
- 收藏
- 关注作者
评论(0)