Kafka服务端性能优化指导
【摘要】 Kafka内核对于请求处理各个环节都打了断点,通过断点可以清楚的看到,服务在处理请求各个阶段的耗时,继而指导服务端性能优化。通过Request类,可以详细查看服务端如何计算请求处理各个阶段耗时:// RequstChannel#Requestclass Request(val processor: Int, val context: RequestContext, ...
Kafka内核对于请求处理各个环节都打了断点,通过断点可以清楚的看到,服务在处理请求各个阶段的耗时,继而指导服务端性能优化。
通过Request类,可以详细查看服务端如何计算请求处理各个阶段耗时:
// RequstChannel#Request
class Request(val processor: Int,
val context: RequestContext,
val startTimeNanos: Long,
memoryPool: MemoryPool,
@volatile private var buffer: ByteBuffer,
metrics: RequestChannel.Metrics) extends BaseRequest {
// These need to be volatile because the readers are in the network thread and the writers are in the request
// handler threads or the purgatory threads
@volatile var requestDequeueTimeNanos = -1L // request被IO线程从RequestQueue取出的时间点
@volatile var apiLocalCompleteTimeNanos = -1L // request被Broker本地处理完成的时间点
@volatile var responseCompleteTimeNanos = -1L // request被处理完成,执行RequestChannle#sendResponse的时间点(封装Response的时间点,默认也是入processor的response队列的时间点)
@volatile var responseDequeueTimeNanos = -1L // response被processor发送的时间点
@volatile var apiRemoteCompleteTimeNanos = -1L // request被远端Broker完成处理的时间点(部分请求涉及例如procuder请求)
@volatile var messageConversionsTimeNanos = 0L // 执行数据格式转换的时间
//
def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
val endTimeNanos = Time.SYSTEM.nanoseconds
if (apiLocalCompleteTimeNanos < 0)
apiLocalCompleteTimeNanos = responseCompleteTimeNanos
if (apiRemoteCompleteTimeNanos < 0)
apiRemoteCompleteTimeNanos = responseCompleteTimeNanos
... ...
// request从开始入RequestQueue到被IO线程处理耗时,此指标过大,有以下几个原因:
// 1. RequestQueue过小,不能承担大量的请求,可通过调大 queued.max.requests 参数来缓解
// 2. I/O线程少,不能及时处理RequestQueue里的请求,可通过调整IO线程个数(num.io.threads)来缓解
val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
// 请求在本节点处理耗时, 如果此指标过大,需做一下动作:
// 1. 检查节点CPU,磁盘IO 看是否存在瓶颈
// 2. 检查节点上的IO线程
val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
// 请求在其他节点处理耗时,如果此指标过大,需检查节点间网络、对端节点磁盘IO,CPU使用率等指标
val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
// 限流的时间, 这个参数对于定位数据生产、数据同步慢有帮助作用
val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
// response在processor的response队列里待的时间长度,如果此指标过大,可能原因是:
// 1. processor个数过少,处理不过来,可通过适当调节 num.network.threads 来缓解
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
// response被成功发送的耗时,如果此指标较大,说明服务端到对端的网络存在较大延迟,需检查网络
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
// 执行数据格式转化耗时
val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
// 请求从SocketChannel接收到被完全发送出去的总耗时
val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
上述代码转化成图片,如下图:
最后,上一段服务端Request debug log
2020-11-25 14:09:55,004 | DEBUG | [data-plane-kafka-network-thread-1-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-1] | Completed request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-4-fetcher-0, correlationId=166259) -- {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1414976616,session_epoch=166259,topics=[],forgotten_topics_data=[]},response:{throttle_time_ms=0,error_code=0,session_id=1414976616,responses=[]} from connection 10.244.228.252:21007-10.244.228.89:56264-1;totalTime:500.256,requestQueueTime:0.112,localTime:0.214,remoteTime:499.676,throttleTime:0.075,responseQueueTime:0.1,sendTime:0.077,securityProtocol:SASL_PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT | kafka.request.logger (RequestChannel.scala:256)
2020-11-25 14:09:55,094 | DEBUG | [data-plane-kafka-network-thread-1-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-2] | Completed request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-5-fetcher-0, correlationId=161103) -- {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=21892705,session_epoch=161103,topics=[],forgotten_topics_data=[]},response:{throttle_time_ms=0,error_code=0,session_id=21892705,responses=[]} from connection 10.244.228.252:21007-10.244.229.85:45824-1;totalTime:501.224,requestQueueTime:0.085,localTime:0.31,remoteTime:500.463,throttleTime:0.105,responseQueueTime:0.111,sendTime:0.148,securityProtocol:SASL_PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT | kafka.request.logger (RequestChannel.scala:256)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)