Apache Flink On Yarn模式高可用(HA)集群部署

举报
Jonathan.Wei 发表于 2019/01/25 16:19:05 2019/01/25
【摘要】 本文介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),采用HDP 2.6.3以及Apache Flink 1.7.1。

apache.png

本文介绍如何部署Apache Flink On YARN(也就是如何在YARN上运行Flink作业),采用HDP 2.6.3以及Apache Flink 1.7.1。

Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色,可以更好对集群资源进行调度和控制。
此处不对HDP安装做讲述,需要安装HDP的可以通过HDP官网安装指南进行安装。

官方文档QuickStart中包含两种Flink启动方式:

  • 启动一个YARN session(Start a long-running Flink cluster on YARN)

  • 直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。
    在讲解运行方式之前,我们先来讲解Flink基于HDP之上的On Yarn安装。

安装

下载Apache Flink安装包

从Apache Flink官网下载地址(http://flink.apache.org/downloads.html)下载对应版本的安装包并解压

curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.8-SNAPSHOT-bin-hadoop2.tgz

与Hadoop集成

Flink On Yarn模式需要用户配置与Hadoop集群,设置HADOOP_CONF_DIR以及HADOOP_CLASSPATH。
将如下代码添加到~/.bash_profile配置文件中

$ vi ~/.bash_profile

export HADOOP_CONF_DIR="/etc/hadoop/conf"
export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn启动前环境变量
oop-yarn-client/lib/*"

source .bash_profile文件引入环境变量并检查变量是否设置正确

source ~/.bash_profile 
echo $HADOOP_CONFIG_DIR
echo $HADOOP_CLASSPATH

配置

yarn-session.sh配置

由于HDP是运行Hadoop任务以及访问HDFS都是使用hdfs用户,我们需要在yarn启动前指定HADOOP_USER_NAME变量,flink才不会因为权限问题而无法启动。

$ vi /usr/local/flink-1.3.3/bin/yarn-session.sh

#!/usr/bin/env bash
...
bin=`dirname "$0"`
bin=`cd "$bin"pwd`

# get Flink config
"$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

export HADOOP_USER_NAME=hdfs

JVM_ARGS="$JVM_ARGS -Xmx512m"

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

export FLINK_CONF_DIR

$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

注意:HADOOP_USER_NAME参数必须在JAVA_RUN之前配置,否则程序运行之后无法读取到该环境变量

conf/flink-conf.yaml(HA配置)

要启动HA群集,需要在conf/flink-conf.yaml添加以下配置:

高可用性模式(必需):必须在conf/flink-conf.yaml中将高可用模式设置为zookeeper才能启用高可用模式。 或者,此选项可以设置为Flink应该用于创建HighAvailabilityServices实例的工厂类的FQN。

high-availability: zookeeper

ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper服务器的复制组,它提供分布式协调服务。

high-availability.zookeeper.quorumaddress1:2181[,...],addressX:2181

每个addressX:port指的是一个ZooKeeper服务器,Flink可以在给定的地址和端口访问它。

Zookeeper root(推荐): 
ZooKeeper root节点,在该节点下放置所有集群节点。

high-availability.zookeeper.path.root: /flink

Zookeeper Cluster-id(推荐): 
cluster-id ZooKeeper节点,在该节点下放置集群的所有必需的协调数据。

high-availability.cluster-id: /default_ns # important: customize per cluster

存储目录(必需):JobManager元数据保存在文件系统storageDir中,只有一个指向该状态的指针存储在ZooKeeper中。

high-availability.storageDir: hdfs:///flink/recovery

storageDir存储JobManager故障恢复所需的所有元数据。

配置主服务器和ZooKeeper quorum后,您可以像往常一样使用提供的集群启动脚本。他们将启动HA群集。请记住,调用脚本时必须运行ZooKeeper quorum,并确保为要启动的每个HA群集配置单独的ZooKeeper根路径。

除HA配置外,还需要配置最大尝试次数conf/flink-conf.yaml:

yarn.application-attempts: 10

这意味着在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。

由于我们是基于HDP创建的Hadoop集群,已有现成的zookeeper集群,所以这里我们使用现有的zookeeper进行HA配置,配置如下:

high-availability: zookeeper
high-availability.zookeeper.quorum: flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs://ns1/flink/recovery
yarn.application-attempts: 10

配置hadoop yarn-site.xml

配置Application最大的尝试次数

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
  </description>
</property>

当前YARN版本的默认值为2(表示允许单个JobManager失败)。

java.lang.IllegalAccessError: 
tried to access method问题

hdp平台需要去掉uber shaded hadoop的包,同时添加mapreduce的包到yarn应用classpath,否则会出现如下问题:

Exception in thread "main" java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Objectfrom class org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
删除/重命名
rm -f /root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar
添加mapreduce的包到yarn应用classpath

进入ambari界面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath添加

/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*

修改后,需要重启yarn相关组件,ambari界面会有指示如何重启,一键搞定.

Flink日志配置

Flink默认包含两种配置方式:log4j以及logback
不配置的情况下运行flink集群或者运行flink job会提示建议移除其中一种。

org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/root/flink-1.7.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.

直接移除或者重命名都可行。
例如:

$ mv logback.xml logback.xml_bak

示例配置:

vi /usr/local/flink-1.3.3/conf/log4j.properties
log4j.appender.file.append=true
log4j.appender.file.MaxFileSize=100M  #最大文件大小
log4j.appender.file.MaxBackupIndex=10  # 最大备份索引大小

启动Flink

本节主要介绍Flink的两种启动方式。

启动一个长期运行的Flink集群

启动一个长期运行的flink集群通过yarn-session.sh执行部署。

yarn-session.sh使用指南
$ ./bin/yarn-session.sh
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
                                     as typing Ctrl + C.
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

主要参数讲解:
1、-n 指定TaskManager数量
2、-jm 指定JobManager使用内存
3、-m 指定JobManager地址
4、-tm 指定TaskManager使用内存
5、-D 指定动态参数
6、-d 客户端分离,指定后YarnSession部署到yarn之后,客户端会自行关闭。
7、-j 指定执行jar包

部署一个长期运行的Flink on Yarn实例
bin/yarn-session.sh -n 8 -s 5 -jm 2048 -tm 4096 -nm pinpoint-flink-job

实例说明:

  • 8个TaskManager

  • 每个TaskManager5个slot

  • 每个TaskManager内存4g,

  • 指定application名称为pinpoint-flink-job

注意:部署长期运行的flink on yarn实例后,在flink web上看到的TaskManager以及Slots都为0。只有在提交任务的时候,才会依据分配资源给对应的任务执行。

406BA6AB-1271-4A45-8862-93DF0BE4542C.png

提交Job到长期运行的flink on yarn实例上

执行任务提交命令:

$ bin/flink run ./examples/batch/WordCount.jar --input hdfs://xdata2/tmp/LICENSE-2.0.txt --output hdfs://xdata2/tmp/wordcount_result.txt

指定输入文件:hdfs://xdata2/tmp/LICENSE-2.0.txt
指定输出文件:hdfs://xdata2/tmp/wordcount_result.txt

命令运行日志如下:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-01-24 16:05:26,059 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
2019-01-24 16:05:26,059 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
2019-01-24 16:05:26,358 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 40
2019-01-24 16:05:26,358 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 40
YARN properties set default parallelism to 40
2019-01-24 16:05:26,618 INFO  org.apache.hadoop.yarn.client.AHSProxy                        - Connecting to Application History server at vigor-dc-38/192.168.2.38:10200
2019-01-24 16:05:26,628 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-01-24 16:05:26,628 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-01-24 16:05:26,638 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Looking for the active RM in [rm1, rm2]...
2019-01-24 16:05:26,773 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Found active RM [rm1]
2019-01-24 16:05:26,779 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'vigor-dc-41' and port '39925' from supplied application id 'application_1548213441093_0011'
2019-01-24 16:05:27,186 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Starting execution of program
Program execution finished
Job with JobID 7ab3cf90748c8d05c7aa2e7cbce85730 has finished.
Job Runtime: 8979 ms

提交后可以在Flink web页面上看到提交的任务信息及执行情况。

885A91EE-A988-4105-9F9F-0C598355625D.png

检查任务结果

使用hadoop命令查询执行结果信息

[root@vigor-dc-38 flink-1.7.1]# hadoop fs -cat /tmp/wordcount_result.txt
...
above 1
acceptance 1
accepting 3
act 1
acting 1
acts 1
add 2
addendum 1
additional 5
additions 1
advised 1
against 2
agree 1
agreed 3
agreement 1
all 3
...

在Yarn上运行单个Flink任务

若你想在Yarn上启动Flink用于单独任务执行,可以直接通过bin/flink run的方式来实现。
示例:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

Yarn会话的命令行选项也可以用于./bin/flink。使用y或yarn(对于长参数选项)作为前缀。

命令执行后,yarn会为任务单独启动一个flink on yarn实例,用于运行flink任务,在flink web界面上可以看到该任务。

885A91EE-A988-4105-9F9F-0C598355625D.png

查看后段执行结果:

Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
...

总结

Flink on Yarn两种部署方式可以根据自身的需求自行选择。可选择单独一种,也可以两种结合使用。
重要任务建议单独运行一个实例,其他的任务可以使用长时间运行方式,将多个任务部署到上面,不用到时候资源也会得到释放。

Standalone模式在后续的文章补上。

参考链接

https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html


关注公众号


0.jpeg


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。