如何处理Flink的反压问题
一、什么是反压
反压是流处理系统中用来保障应用可靠性的一个重要机制。由于流应用是7*24小时运行,数据输入速率也不是一成不变,可能随时间产生波峰波谷,当某个处理单元由于到来的数据忽然增加,暂时性超出其处理能力时,就会出现数据在接收队列上累积,当数据的累积量超出处理单元的容量时,会出现数据丢失现象甚至因为系统资源耗尽而导致应用崩溃。为此,需要一种反压机制来告知上游处理单元降低数据发送的速率,以缓解下游处理单元的压力。
二、Flink中如何实现反压机制
Flink由于是天然的流计算架构,算子之间的数据传输采取类似阻塞队列的方式,当接收者队列满了后,发送者就会被阻塞,从而产生反压。先来看下flink的网络栈。
逻辑视图:
上游数据输出到ResultPartition,下游任务从InputGate收取数据。
物理视图:
不同任务之间的每个(远程)网络连接将在 Flink 的网络栈中获得自己的 TCP 通道。但是,如果同一任务的不同子任务被安排到了同一个 TaskManager,则它们与同一个 TaskManager 的网络连接将被多路复用,并共享一个 TCP 信道以减少资源占用。如上示例中是 A.1→B.3、A.1→B.4 以及 A.2→B.3 和 A.2→B.4就是复用同一个通道。
每个子任务的结果称为结果分区,每个结果拆分到单独的子结果分区( ResultSubpartitions )中——每个逻辑通道有一个。在堆栈的这一部分中,Flink 不再处理单个记录,而是将一组序列化记录组装到网络缓冲区中。每当子任务的发送缓冲池耗尽时——也就是缓存驻留在结果子分区的缓存队列中或更底层的基于 Netty 的网络栈中时——生产者就被阻塞了,无法继续工作,并承受背压。接收器也是类似:较底层网络栈中传入的 Netty 缓存需要通过网络缓冲区提供给 Flink。如果相应子任务的缓冲池中没有可用的网络缓存,Flink 将在缓存可用前停止从该通道读取。
在1.5之前,这种机制会导致单个子任务的背压会影响同一个通道的所有子任务,因此1.5之后Flink引入了基于信用的流量控制。
基于信用的流量控制,是在Flink原有反压机制上,在ResultPartition和InputGate中间又加了一层信用反馈。每一次 ResultPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息。当Credit未0时,上游就知道下游无法在接收数据,就会停止发送,这样就不会把底层链路堵住。
三、如何定位产生反压的节点
一看反压:
FlinkUI上可以直观的看到每个算子的背压状态。背压状态显示的是请求的block率。
ration <= 0.1 | OK |
0.1 < ratio <= 0.5 | LOW |
0.5 < ratio <= 1 | HIGH |
二看监控:
通过反压状态可以大致锁定反压可能存在的算子,但具体反压是由于自身处理速度慢还是由于下游处理慢导致的,需要通过metric监控进一步监控。
Flink 1.9版本以上可以通过outPoolUsage,inPoolUsage,floatingBuffersUsage,exclusiveBuffersUsage来确认。其中inPoolUsage是floatingBUffersUsage和exclusiveBuffersUsagee的总和。
outPoolUsage低 | outPoolUsage高 | |
inPoolUsage低 | 正常 | 被背压了,此时上游还未被反压,有可能会被反压 |
inPoolUsage高 | 反压正在向上游传递,有可能是背压源 | 背压是由下游或者网络问题引起的 |
三看检查点
通过查看检查点历史情况,可以看到检查点在哪个task耗时最长,以及每个subtask的耗时时间,时间长的一般有两种可能,状态较大或者barrier被阻塞。
反压可能产生的原因包括:
1)资源不足:如果CPU/内存或者IO使用率较高,可通过增加并发、资源或优化代码等方式调整。
2)GC频繁:分析GC日志或Heap Dump,确认是否有内存泄露,可适当提高内存缓解。
3)数据倾斜:观察subtask的数据处理是否分布均匀,可通过对热点key进行二次分发或者使用local/global aggregation解决。
注:部分截图来自https://flink.apache.org/2019/06/05/flink-network-stack.html
- 点赞
- 收藏
- 关注作者
评论(0)