Apache Flink On Yarn模式高可用(HA)集群部署
本文介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),采用HDP 2.6.3以及Apache Flink 1.7.1。
Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色,可以更好对集群资源进行调度和控制。
此处不对HDP安装做讲述,需要安装HDP的可以通过HDP官网安装指南进行安装。
官方文档QuickStart中包含两种Flink启动方式:
启动一个YARN session(Start a long-running Flink cluster on YARN)
直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。
在讲解运行方式之前,我们先来讲解Flink基于HDP之上的On Yarn安装。
安装
下载Apache Flink安装包
从Apache Flink官网下载地址(http://flink.apache.org/downloads.html)下载对应版本的安装包并解压
curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.8-SNAPSHOT-bin-hadoop2.tgz
与Hadoop集成
Flink On Yarn模式需要用户配置与Hadoop集群,设置HADOOP_CONF_DIR以及HADOOP_CLASSPATH。
将如下代码添加到~/.bash_profile
配置文件中
$ vi ~/.bash_profile
export HADOOP_CONF_DIR="/etc/hadoop/conf"
export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn启动前环境变量
oop-yarn-client/lib/*"
source .bash_profile文件引入环境变量并检查变量是否设置正确
source ~/.bash_profile
echo $HADOOP_CONFIG_DIR
echo $HADOOP_CLASSPATH
配置
yarn-session.sh配置
由于HDP是运行Hadoop任务以及访问HDFS都是使用hdfs用户,我们需要在yarn启动前指定HADOOP_USER_NAME变量,flink才不会因为权限问题而无法启动。
$ vi /usr/local/flink-1.3.3/bin/yarn-session.sh
#!/usr/bin/env bash
...
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
# get Flink config
. "$bin"/config.sh
if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
export HADOOP_USER_NAME=hdfs
JVM_ARGS="$JVM_ARGS -Xmx512m"
CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"
export FLINK_CONF_DIR
$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
注意:HADOOP_USER_NAME
参数必须在JAVA_RUN之前配置,否则程序运行之后无法读取到该环境变量
conf/flink-conf.yaml(HA配置)
要启动HA群集,需要在conf/flink-conf.yaml添加以下配置:
高可用性模式(必需):必须在conf/flink-conf.yaml中将高可用模式设置为zookeeper才能启用高可用模式。 或者,此选项可以设置为Flink应该用于创建HighAvailabilityServices实例的工厂类的FQN。
high-availability: zookeeper
ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper服务器的复制组,它提供分布式协调服务。
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
每个addressX:port指的是一个ZooKeeper服务器,Flink可以在给定的地址和端口访问它。
Zookeeper root(推荐):
ZooKeeper root节点,在该节点下放置所有集群节点。
high-availability.zookeeper.path.root: /flink
Zookeeper Cluster-id(推荐):
cluster-id ZooKeeper节点,在该节点下放置集群的所有必需的协调数据。
high-availability.cluster-id: /default_ns # important: customize per cluster
存储目录(必需):JobManager元数据保存在文件系统storageDir中,只有一个指向该状态的指针存储在ZooKeeper中。
high-availability.storageDir: hdfs:///flink/recovery
storageDir存储JobManager故障恢复所需的所有元数据。
配置主服务器和ZooKeeper quorum后,您可以像往常一样使用提供的集群启动脚本。他们将启动HA群集。请记住,调用脚本时必须运行ZooKeeper quorum,并确保为要启动的每个HA群集配置单独的ZooKeeper根路径。
除HA配置外,还需要配置最大尝试次数conf/flink-conf.yaml:
yarn.application-attempts: 10
这意味着在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。
由于我们是基于HDP创建的Hadoop集群,已有现成的zookeeper集群,所以这里我们使用现有的zookeeper进行HA配置,配置如下:
high-availability: zookeeper
high-availability.zookeeper.quorum: flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs://ns1/flink/recovery
yarn.application-attempts: 10
配置hadoop yarn-site.xml
配置Application最大的尝试次数
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
当前YARN版本的默认值为2(表示允许单个JobManager失败)。
java.lang.IllegalAccessError:
tried to access method问题
hdp平台需要去掉uber shaded hadoop的包,同时添加mapreduce的包到yarn应用classpath,否则会出现如下问题:
Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
删除/重命名
rm -f /root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar
添加mapreduce的包到yarn应用classpath
进入ambari界面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath添加
/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*
修改后,需要重启yarn相关组件,ambari界面会有指示如何重启,一键搞定.
Flink日志配置
Flink默认包含两种配置方式:log4j以及logback
不配置的情况下运行flink集群或者运行flink job会提示建议移除其中一种。
org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/root/flink-1.7.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
直接移除或者重命名都可行。
例如:
$ mv logback.xml logback.xml_bak
示例配置:
vi /usr/local/flink-1.3.3/conf/log4j.properties
log4j.appender.file.append=true
log4j.appender.file.MaxFileSize=100M #最大文件大小
log4j.appender.file.MaxBackupIndex=10 # 最大备份索引大小
启动Flink
本节主要介绍Flink的两种启动方式。
启动一个长期运行的Flink集群
启动一个长期运行的flink集群通过yarn-session.sh执行部署。
yarn-session.sh使用指南
$ ./bin/yarn-session.sh
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
as typing Ctrl + C.
-st,--streaming Start Flink in streaming mode
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
主要参数讲解:
1、-n 指定TaskManager数量
2、-jm 指定JobManager使用内存
3、-m 指定JobManager地址
4、-tm 指定TaskManager使用内存
5、-D 指定动态参数
6、-d 客户端分离,指定后YarnSession部署到yarn之后,客户端会自行关闭。
7、-j 指定执行jar包
部署一个长期运行的Flink on Yarn实例
bin/yarn-session.sh -n 8 -s 5 -jm 2048 -tm 4096 -nm pinpoint-flink-job
实例说明:
8个TaskManager
每个TaskManager5个slot
每个TaskManager内存4g,
指定application名称为pinpoint-flink-job
注意:部署长期运行的flink on yarn实例后,在flink web上看到的TaskManager以及Slots都为0。只有在提交任务的时候,才会依据分配资源给对应的任务执行。
提交Job到长期运行的flink on yarn实例上
执行任务提交命令:
$ bin/flink run ./examples/batch/WordCount.jar --input hdfs://xdata2/tmp/LICENSE-2.0.txt --output hdfs://xdata2/tmp/wordcount_result.txt
指定输入文件:hdfs://xdata2/tmp/LICENSE-2.0.txt
指定输出文件:hdfs://xdata2/tmp/wordcount_result.txt
命令运行日志如下:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-01-24 16:05:26,059 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root.
2019-01-24 16:05:26,059 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root.
2019-01-24 16:05:26,358 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 40
2019-01-24 16:05:26,358 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 40
YARN properties set default parallelism to 40
2019-01-24 16:05:26,618 INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at vigor-dc-38/192.168.2.38:10200
2019-01-24 16:05:26,628 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-01-24 16:05:26,628 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-01-24 16:05:26,638 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Looking for the active RM in [rm1, rm2]...
2019-01-24 16:05:26,773 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Found active RM [rm1]
2019-01-24 16:05:26,779 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'vigor-dc-41' and port '39925' from supplied application id 'application_1548213441093_0011'
2019-01-24 16:05:27,186 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Starting execution of program
Program execution finished
Job with JobID 7ab3cf90748c8d05c7aa2e7cbce85730 has finished.
Job Runtime: 8979 ms
提交后可以在Flink web页面上看到提交的任务信息及执行情况。
检查任务结果
使用hadoop命令查询执行结果信息
[root@vigor-dc-38 flink-1.7.1]# hadoop fs -cat /tmp/wordcount_result.txt
...
above 1
acceptance 1
accepting 3
act 1
acting 1
acts 1
add 2
addendum 1
additional 5
additions 1
advised 1
against 2
agree 1
agreed 3
agreement 1
all 3
...
在Yarn上运行单个Flink任务
若你想在Yarn上启动Flink用于单独任务执行,可以直接通过bin/flink run
的方式来实现。
示例:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
Yarn会话的命令行选项也可以用于./bin/flink。使用y或yarn(对于长参数选项)作为前缀。
命令执行后,yarn会为任务单独启动一个flink on yarn实例,用于运行flink任务,在flink web界面上可以看到该任务。
查看后段执行结果:
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
...
总结
Flink on Yarn两种部署方式可以根据自身的需求自行选择。可选择单独一种,也可以两种结合使用。
重要任务建议单独运行一个实例,其他的任务可以使用长时间运行方式,将多个任务部署到上面,不用到时候资源也会得到释放。
Standalone模式在后续的文章补上。
参考链接
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html
关注公众号
- 点赞
- 收藏
- 关注作者
评论(0)