如何处理Flink的反压问题

举报
kyo 发表于 2020/05/26 21:12:21 2020/05/26
【摘要】 一、什么是反压 反压是流处理系统中用来保障应用可靠性的一个重要机制。由于流应用是7*24小时运行,数据输入速率也不是一成不变,可能随时间产生波峰波谷,当某个处理单元由于到来的数据忽然增加,暂时性超出其处理能力时,就会出现数据在接收队列上累积,当数据的累积量超出处理单元的容量时,会出现数据丢失现象甚至因为系统资源耗尽而导致应用崩溃。为此,需要一种反压机制来告知上游处理单元降低数据发送的速率,...

一、什么是反压

  反压是流处理系统中用来保障应用可靠性的一个重要机制。由于流应用是7*24小时运行,数据输入速率也不是一成不变,可能随时间产生波峰波谷,当某个处理单元由于到来的数据忽然增加,暂时性超出其处理能力时,就会出现数据在接收队列上累积,当数据的累积量超出处理单元的容量时,会出现数据丢失现象甚至因为系统资源耗尽而导致应用崩溃。为此,需要一种反压机制来告知上游处理单元降低数据发送的速率,以缓解下游处理单元的压力。

二、Flink中如何实现反压机制

Flink由于是天然的流计算架构,算子之间的数据传输采取类似阻塞队列的方式,当接收者队列满了后,发送者就会被阻塞,从而产生反压。先来看下flink的网络栈。

逻辑视图:

 上游数据输出到ResultPartition,下游任务从InputGate收取数据。

物理视图:

  不同任务之间的每个(远程)网络连接将在 Flink 的网络栈中获得自己的 TCP 通道。但是,如果同一任务的不同子任务被安排到了同一个 TaskManager,则它们与同一个 TaskManager 的网络连接将被多路复用,并共享一个 TCP 信道以减少资源占用。如上示例中是 A.1→B.3、A.1→B.4 以及 A.2→B.3 和 A.2→B.4就是复用同一个通道。

  每个子任务的结果称为结果分区,每个结果拆分到单独的子结果分区( ResultSubpartitions )中——每个逻辑通道有一个。在堆栈的这一部分中,Flink 不再处理单个记录,而是将一组序列化记录组装到网络缓冲区中。每当子任务的发送缓冲池耗尽时——也就是缓存驻留在结果子分区的缓存队列中或更底层的基于 Netty 的网络栈中时——生产者就被阻塞了,无法继续工作,并承受背压。接收器也是类似:较底层网络栈中传入的 Netty 缓存需要通过网络缓冲区提供给 Flink。如果相应子任务的缓冲池中没有可用的网络缓存,Flink 将在缓存可用前停止从该通道读取。

  在1.5之前,这种机制会导致单个子任务的背压会影响同一个通道的所有子任务,因此1.5之后Flink引入了基于信用的流量控制。

    基于信用的流量控制,是在Flink原有反压机制上,在ResultPartition和InputGate中间又加了一层信用反馈。每一次 ResultPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息。当Credit未0时,上游就知道下游无法在接收数据,就会停止发送,这样就不会把底层链路堵住。


三、如何定位产生反压的节点

   一看反压:

   FlinkUI上可以直观的看到每个算子的背压状态。背压状态显示的是请求的block率。

ration <= 0.1 OK
0.1 < ratio <= 0.5  LOW
0.5 < ratio <= 1 HIGH

 image.png

  二看监控:

  通过反压状态可以大致锁定反压可能存在的算子,但具体反压是由于自身处理速度慢还是由于下游处理慢导致的,需要通过metric监控进一步监控。

  Flink 1.9版本以上可以通过outPoolUsage,inPoolUsage,floatingBuffersUsage,exclusiveBuffersUsage来确认。其中inPoolUsage是floatingBUffersUsage和exclusiveBuffersUsagee的总和。


outPoolUsage低 outPoolUsage高
inPoolUsage低 正常 被背压了,此时上游还未被反压,有可能会被反压
inPoolUsage高 反压正在向上游传递,有可能是背压源 背压是由下游或者网络问题引起的

image.png

三看检查点

通过查看检查点历史情况,可以看到检查点在哪个task耗时最长,以及每个subtask的耗时时间,时间长的一般有两种可能,状态较大或者barrier被阻塞。

image.png

反压可能产生的原因包括:

1)资源不足:如果CPU/内存或者IO使用率较高,可通过增加并发、资源或优化代码等方式调整。

2)GC频繁:分析GC日志或Heap Dump,确认是否有内存泄露,可适当提高内存缓解。

3)数据倾斜:观察subtask的数据处理是否分布均匀,可通过对热点key进行二次分发或者使用local/global aggregation解决。



注:部分截图来自https://flink.apache.org/2019/06/05/flink-network-stack.html


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。