谈谈Flink DataStream流计算中的优化(持续更新)

举报
米兰的小铁匠 发表于 2020/06/23 13:31:07 2020/06/23
【摘要】 Flink是一款事件驱动的支持高吞吐、低延迟、高性能的分布式流处理框架,本文主要介绍Stream API模块的流计算,主要从内存、cpu、网络传输三个角度出发谈流作业优化,最后介绍一些大数据处理中常用的数据结构。

       Flink是一款事件驱动的支持高吞吐、低延迟、高性能的分布式流处理框架,本文主要介绍Stream API模块的流计算,主要从内存、cpu、网络传输三个角度出发谈流作业优化。本文主要基于flink1.7,后续会基于flink1.10补充些sql模块的优化以及状态方面的优化。

1.flink作业本身使用的优化

 1.0 一些通用的优化方式    

       1. 尽早fliter掉一些不需要的数据以及避免一些不必要的序列化。

       2. 避免使用深层嵌套数据类型。

       3. 对于数据倾斜使用调整并行度或者双层聚合的方式。

       4. 一些基数较少的并且本身较长维度可以采用数据字典的方式减少网络传输及内存占用、gc开销。

1.1 数据类型和序列化

       flink支持java、scala基本数据类型,以及java Tuples、scala Case Class、Flink Value,对于这些数据类型,flink会采用自身的序列化反序列化器去做序列化操作,对于其他数据类型,flink会采用kyro方式序列化,kyro序列化方式效率会比flink自带的方式低很多,根据flink doc提供的数据,各种不同类型序列化效率如图

如果你想测试你的代码是否可以被flink自带序列化工具识别,请使用

env.getConfig().enableObjectReuse();

来禁用kyro,以验证类型是否都能被flink识别。

序列化方面

       flink对数据进行处理时,如涉及到网络传输,读写状态,checkoutponit,savepoint等都会涉及到序列化反序列化操作。对于flink支持的数据类型,flink会为其生成特定的序列化器,反序列化器以及比较器。对于flink无法识别的类型会被当做泛型嫁给kryo进行序列化,如上图所示,kyro的序列化方式效率较低。针对各种类型,flink会选取如下序列化器。 

       尝试使用transient修饰不需要序列化的变量,或者修饰你可以在下游通过其他方式获取到变量,这个可以减少序列化流程和网络传输(但可能带来更多的内存占用用和gc消耗)。

       对于一些特殊的数据你可以尝试重写writeObject() 和 readObject() 来自己控制一些序列化方式,如果更高效的话。

       如果使用了lambda或者泛型的话,请显式的指定类型信息让flink类型提取系统识别到以提升性能。

1.2 谨慎的开启对象复用

       这里首先简单讲下flink的数据传输模型,主要分为进程间、线程间、同一个线程内三种。flink进程间的数据传输如taskManager之间的数据交换,发送任务将数据推给netty,通过netty将数据推导远程netty上,发送任务需要将要发送的记录序列化到字节缓冲区,接收任务会通过网络通信将发送方的记录接收过来并反序列化,这里flink采用一种基于Credit-Based的流量控制,

      如果是同一个taskManager的发送任务和接收任务的话,接收任务会将数据从字节缓冲区的数据拉取过来而不涉及网络通信开销,数据序列化后会先通过recordwriter,然后通过local channel发送给另一个task的input gate,然后反序列化,再交给recorder reader进行处理。

      对于线程内本地通信而言,flink某些情况下会采用一种叫做chain的方式降低本地通信开销,多个算子的函数会被融合到一个任务中从而在同一个线程执行,这样函数之间的通过传参的方式进行数据传输,就不涉及序列化和通信开销,但此时两个算子之间的数据传输会涉及到一个深copy的操作。

当你开启对象复用后,flink会把两个算子的深copy操作去掉,上一步算子的输出直接作为下一步算子的输入,这样就把深copy的的对象的创建和销毁的操作去掉了,但是使用对象复用是有条件的,可能对导致线程安全问题(下游function只有一种,或者下游function不会改变对象的值,这个务必注意。开启对象复用代码如下,

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

1.3 多组相同keyby可使用DataStreamUtils 

       大家都知道keyBy时会涉及shuffle操作,shuffle会涉及到序列化反序列化以及网络传输操作(1.1和1.2节有介绍),那怎么尽可能的减少shuffle就是一个很值得我们思考的问题,在多组keyby的场景可以采用DataStreamUtils.reinterpretAsKeyedStream的方式避免多次shuffle操作。但是一些负载较重的函数,需要在不同的slot中执行的话,不建议使用此方式,同理对于一些负载较重的算子,也可以使用disableOperatorChaining来禁用任务链接。

stream.keyBy(0)
           .map()
           .keyBy(0)
           .timeWindow()
           .processor()
           .key(0)
           .flatmap()

1.4 优先使用增量计算

       除却一些求中位数和众数需要全量计算的必须采用全量计算的其他尽量采用增加计算Function,对于窗口的长度过长或者滑动步长的大窗口计算要谨慎,可考虑使用processFunction替代窗口处理。

1.5 尽量减少状态的大小

     1. 设置合适的state TTL, 清洗过期状态,避免状态无限增大。

      2. 减少状态字段数, 比如使用aggreteFunction 做窗口聚合时,可以只将要聚合的信息放入状态,其他keyBy字段以及窗口信息,可以通过processWindowFunction的方式获取,这样就是 aggregateFunction + ProcessWindowFunction,agg函数获取聚合信息,输出的结果到processwindowFunction中取获取窗口信息。

     3. OperatorState 慎重使用长list。

     4. checkpoint频率不宜过高,超时时间不要太长,可以异步化的地方尽量异步化

1.6 维表join

        一个基本原则是尽量减少与外部数据源的交互,一些数据需要去mysql或者habase等外部数据源获取的信息,数据量较小的可以采用广播变量或者本地缓存的方式,数据量较大的维表可以使用热存储加lru。采用异步函数加载到Lru,lru中命中不到,再去外部数据源获取,同时更新lru。但如果你的维表数据实在过大,可以考虑将热点数据cache,其他长尾规则数据去外存查询。检测下缓存命中率,外部服务调用性能Async IO。

1.7 锋利的miniBatch

       flink的计算模型是来一条算一条,但很多时候我们并不需要这么低的延迟,那是否可以用时延获取更多的吞吐量呢,大家知道flink的状态的存取是一个非常耗性能的操作,当有状态计算时,一条数据进来通常是这样的流程

那这样一条数据过来是会先从State反序列化获取状态,计算后在序列化更新状态,

当使用miniBatch的方式时,可以缓存一批数据,一起计算,是这样的流程,该图片来自于flink forward云邪。


这样的话在聚合之前将多条数据放在heap中,计算时只需要存取一次状态,这样就可以降低状态频繁存取所带来的开销。

但是这个功能只在flink sql 1.10中实现,stream模块暂时未实现。

      如果你想要cache一批数据做计算可以使用processFunction + 自定义timwe或者窗口buffer机制实现攒批后再处理,比如和外部系统如mysql、hbase交互,或者写入kafka,也可以做批量转化,这样可以有效降低一些cpu使用和外部存储的压力,这种攒批机制也涉及到了状态的使用,这本身并不是sql对应的minibatch机制,还是等DataSteam中实现吧,小编还是非常期待stream中能够早日实现该功能。

1.8 并行度与调参

       一个基本公式是最大并行度<=tm数量*slot数量,当kafka是数据源时,数据量较小时可以尝试设置与source相同的并行度,对于数据量较大的,下游可以适当调大并行度,比如调成source的整数倍,同时根据反压来辅助定位性能瓶颈,你也可以根据cpu使用情况和内存占用、GC状况来调整taskmanager的cpu和内存分配。根据业务类型、运行状况调整source、中间operator节点、sink节点的并行度。

2. 内存模型与jvm 上的优化(后续补充内存模型内容)

        可考虑使用G1垃圾回收器替代cms垃圾回收器,在多组keyby的场景中通常有多字符串的重复,你可以选择选择G1垃圾回收器,并开启字符串压缩(-XX:-UseStringDeduplication),对比cms垃圾回收器,可以优化一部分内存和cpu使用,降低GC的频率(GC高耗cpu), 在小编的业务场中可以减少20%的内存,同时降低15%的cpu使用。同时G1可以动态调整年轻代老年代空间,并通过并行充分利用cpu,从而缩短stop the world时间。

3. 一些大数据中常用数据结构的使用

      大数据中一些数据结构可以有效的减少存储空间,加快计算速度,比如精确计算中常用的bitmap,近似计算(大数据计算中常用准确度换取时间和空间)中常用的bloomFliter、hyperloglog等,你可以根据你的业务场景选择适合的数据结构,本文只简单介绍这几种数据结构,具体原理你可以找资料了解。

3.1 bitmap

      bitmap可以使用一个bit位来标记某个元素的value,key即是该元素,bitmap可以用来大量数据的排序、查询和去重。一个难题是你要根据你的数据去设计去一个适合的hash算法。

3.2 bloomFilter

       bloomfilter是一种概率性数据结构,基于多路哈希函数映射实现,可以进行高效的插入和查询,通常用来判断一个元素是否在一个集合中,当判断为否时准确率为100%,而判断为是时则准确率不一定为100%。在google guava实现的bloomfilter,你只需要传入要存的数据量n和期望的误判率fpp即可

// the total number of elements be 
intexpectedInsertions = ….;
// desired false positive probability
doublefpp = 0.01;
BloomFilter<CharSequence> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.forName("UTF-8")), expectedInsertions,fpp)

3.3 HyperLogLog 

       hyperLogLog是一种基于基数统计的数据结构,主要用到伯努利过程,它不会存储每个元素的值,可以用来做不精确去重,比如常用的统计uv的场景,占用空间很少,可以用极少的内存来统计巨量的数据,一个概念是redis中实现的hyperLogLog使用12k的内存能够统计2^64个数据,误差率为0.81%,你可以对比下精确统计2^64个uuid需要多大的空间。使用起来很简单,如下

//number of bucket and bits per bucket
final HLL hll = new HLL(13, 5);
for (int item : data) {
  final long value = hash.newHasher().putInt(item).hash().asLong();
  hll.addRaw(value);
 }
System.out.println("Distinct count="+ hll.cardinality());

4. 结语与展望

       本文会结合小编的工作学习和flink的发展持续更新,flink1.8后flink主要在flink sql方向发力。小编后续会更多的关注flink sql方面的调优,如本帖有错误的地方,请批评指正,如果你有好的优化方案,也请你留言交流。

本文一部分内容参考自flink doc和ververica。

https://ci.apache.org/projects/flink/flink-docs-release-1.10/

https://ververica.cn/





【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。