Flink性能调优(二)

举报
bigdata张凯翔 发表于 2021/03/29 02:53:08 2021/03/29
【摘要】 1.设计分区方法 操作场景 合理的设计分区依据,可以优化task的切分。在程序编写过程中要尽量分区均匀,这样可以实现每个task数据不倾斜,防止由于某个task的执行时间过长导致整个任务执行缓慢。 操作步骤 以下是几种分区方法。 •随机分区:将元素随机地进行分区。dataStream.shuffle(); •Rebalancing (Round-robin partitioni...

1.设计分区方法

操作场景

合理的设计分区依据,可以优化task的切分。在程序编写过程中要尽量分区均匀,这样可以实现每个task数据不倾斜,防止由于某个task的执行时间过长导致整个任务执行缓慢。

操作步骤

以下是几种分区方法。
•随机分区:将元素随机地进行分区。dataStream.shuffle();
•Rebalancing (Round-robin partitioning):基于round-robin对元素进行分区,使得每个分区负责均衡。对于存在数据倾斜的性能优化是很有用的。dataStream.rebalance();
•Rescaling:以round-robin的形式将元素分区到下游操作的子集中。如果你想要将数据从一个源的每个并行实例中散发到一些mappers的子集中,用来分散负载,但是又不想要完全的rebalance 介入(引入rebalance()),这会非常有用。dataStream.rescale();
•广播:广播每个元素到所有分区。dataStream.broadcast();
•自定义分区:使用一个用户自定义的Partitioner对每一个元素选择目标task,由于用户对自己的数据更加熟悉,可以按照某个特征进行分区,从而优化任务执行。
简单示例如下所示:

// fromElements构造简单的Tuple2流
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));
// 定义用于分区的key值,返回即属于哪个partition的,该值加1就是对应的子任务的id号
Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() {  public int partition(Tuple2<String, Integer> key, int numPartitions) { return (key.f0.length() + key.f1) % numPartitions; }
};
// 使用Tuple2进行分区的key值
dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {  public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception { return value; }
}).print();

2.配置netty网络通信

操作场景

Flink通信主要依赖netty网络,所以在Flink应用执行过程中,netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。
操作步骤
以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。


•“taskmanager.network.netty.num-arenas”: 默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。
•“taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。
•“taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。
•“taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。
•“taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。

3.解决数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。
•需要重新设计key,以更小粒度的key使得task大小合理化。
•修改并行度。
•调用rebalance操作,使数据分区均匀。

缓冲区超时设置

  • 1由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。

  • 2 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。
    示例可以参考如下:env.setBufferTimeout(timeoutMillis);
    env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:www.jianshu.com/p/4313950ee943

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。