Flink性能调优(二)
1.设计分区方法
操作场景
合理的设计分区依据,可以优化task的切分。在程序编写过程中要尽量分区均匀,这样可以实现每个task数据不倾斜,防止由于某个task的执行时间过长导致整个任务执行缓慢。
操作步骤
以下是几种分区方法。
•随机分区:将元素随机地进行分区。dataStream.shuffle();
•Rebalancing (Round-robin partitioning):基于round-robin对元素进行分区,使得每个分区负责均衡。对于存在数据倾斜的性能优化是很有用的。dataStream.rebalance();
•Rescaling:以round-robin的形式将元素分区到下游操作的子集中。如果你想要将数据从一个源的每个并行实例中散发到一些mappers的子集中,用来分散负载,但是又不想要完全的rebalance 介入(引入rebalance()
),这会非常有用。dataStream.rescale();
•广播:广播每个元素到所有分区。dataStream.broadcast();
•自定义分区:使用一个用户自定义的Partitioner对每一个元素选择目标task,由于用户对自己的数据更加熟悉,可以按照某个特征进行分区,从而优化任务执行。
简单示例如下所示:
// fromElements构造简单的Tuple2流
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));
// 定义用于分区的key值,返回即属于哪个partition的,该值加1就是对应的子任务的id号
Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() { public int partition(Tuple2<String, Integer> key, int numPartitions) { return (key.f0.length() + key.f1) % numPartitions; }
};
// 使用Tuple2进行分区的key值
dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() { public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception { return value; }
}).print();
2.配置netty网络通信
操作场景
Flink通信主要依赖netty网络,所以在Flink应用执行过程中,netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。
操作步骤
以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。
•“taskmanager.network.netty.num-arenas”: 默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。
•“taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。
•“taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。
•“taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。
•“taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。
3.解决数据倾斜
当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。
•需要重新设计key,以更小粒度的key使得task大小合理化。
•修改并行度。
•调用rebalance操作,使数据分区均匀。
缓冲区超时设置
1由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
2 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。
示例可以参考如下:env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。
原文链接:www.jianshu.com/p/4313950ee943
- 点赞
- 收藏
- 关注作者
评论(0)