Kafka服务端性能优化指导

举报
尘枫 发表于 2021/07/26 10:18:54 2021/07/26
【摘要】 Kafka内核对于请求处理各个环节都打了断点,通过断点可以清楚的看到,服务在处理请求各个阶段的耗时,继而指导服务端性能优化。通过Request类,可以详细查看服务端如何计算请求处理各个阶段耗时:// RequstChannel#Requestclass Request(val processor: Int, val context: RequestContext, ...

Kafka内核对于请求处理各个环节都打了断点,通过断点可以清楚的看到,服务在处理请求各个阶段的耗时,继而指导服务端性能优化。

通过Request类,可以详细查看服务端如何计算请求处理各个阶段耗时:

// RequstChannel#Request
class Request(val processor: Int,
              val context: RequestContext,
              val startTimeNanos: Long,
              memoryPool: MemoryPool,
              @volatile private var buffer: ByteBuffer,
              metrics: RequestChannel.Metrics) extends BaseRequest {
  // These need to be volatile because the readers are in the network thread and the writers are in the request
  // handler threads or the purgatory threads
  @volatile var requestDequeueTimeNanos = -1L          // request被IO线程从RequestQueue取出的时间点
  @volatile var apiLocalCompleteTimeNanos = -1L        // request被Broker本地处理完成的时间点
  @volatile var responseCompleteTimeNanos = -1L        // request被处理完成,执行RequestChannle#sendResponse的时间点(封装Response的时间点,默认也是入processor的response队列的时间点) 
  @volatile var responseDequeueTimeNanos = -1L         // response被processor发送的时间点
  @volatile var apiRemoteCompleteTimeNanos = -1L       // request被远端Broker完成处理的时间点(部分请求涉及例如procuder请求)
  @volatile var messageConversionsTimeNanos = 0L       // 执行数据格式转换的时间

// 
def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
  val endTimeNanos = Time.SYSTEM.nanoseconds
  if (apiLocalCompleteTimeNanos < 0)
    apiLocalCompleteTimeNanos = responseCompleteTimeNanos
  if (apiRemoteCompleteTimeNanos < 0)
    apiRemoteCompleteTimeNanos = responseCompleteTimeNanos

  ... ... 

  // request从开始入RequestQueue到被IO线程处理耗时,此指标过大,有以下几个原因:
  // 1. RequestQueue过小,不能承担大量的请求,可通过调大 queued.max.requests 参数来缓解
  // 2. I/O线程少,不能及时处理RequestQueue里的请求,可通过调整IO线程个数(num.io.threads)来缓解
  val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
  // 请求在本节点处理耗时, 如果此指标过大,需做一下动作:
  // 1. 检查节点CPU,磁盘IO 看是否存在瓶颈
  // 2. 检查节点上的IO线程
  val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
  // 请求在其他节点处理耗时,如果此指标过大,需检查节点间网络、对端节点磁盘IO,CPU使用率等指标
  val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
  // 限流的时间, 这个参数对于定位数据生产、数据同步慢有帮助作用
  val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
  // response在processor的response队列里待的时间长度,如果此指标过大,可能原因是:
  // 1. processor个数过少,处理不过来,可通过适当调节 num.network.threads 来缓解
  val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
  // response被成功发送的耗时,如果此指标较大,说明服务端到对端的网络存在较大延迟,需检查网络
  val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
  // 执行数据格式转化耗时
  val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
  // 请求从SocketChannel接收到被完全发送出去的总耗时
  val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)

上述代码转化成图片,如下图:
timeline.PNG

最后,上一段服务端Request debug log

2020-11-25 14:09:55,004 | DEBUG | [data-plane-kafka-network-thread-1-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-1] | Completed request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-4-fetcher-0, correlationId=166259) -- {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1414976616,session_epoch=166259,topics=[],forgotten_topics_data=[]},response:{throttle_time_ms=0,error_code=0,session_id=1414976616,responses=[]} from connection 10.244.228.252:21007-10.244.228.89:56264-1;totalTime:500.256,requestQueueTime:0.112,localTime:0.214,remoteTime:499.676,throttleTime:0.075,responseQueueTime:0.1,sendTime:0.077,securityProtocol:SASL_PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT | kafka.request.logger (RequestChannel.scala:256)
2020-11-25 14:09:55,094 | DEBUG | [data-plane-kafka-network-thread-1-ListenerName(SASL_PLAINTEXT)-SASL_PLAINTEXT-2] | Completed request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=broker-5-fetcher-0, correlationId=161103) -- {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=21892705,session_epoch=161103,topics=[],forgotten_topics_data=[]},response:{throttle_time_ms=0,error_code=0,session_id=21892705,responses=[]} from connection 10.244.228.252:21007-10.244.229.85:45824-1;totalTime:501.224,requestQueueTime:0.085,localTime:0.31,remoteTime:500.463,throttleTime:0.105,responseQueueTime:0.111,sendTime:0.148,securityProtocol:SASL_PLAINTEXT,principal:User:kafka,listener:SASL_PLAINTEXT | kafka.request.logger (RequestChannel.scala:256)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。