【Hadoop】【Yarn】Yarn作业提交过程

举报
沙漠里的果果酱 发表于 2023/08/11 16:01:19 2023/08/11
【摘要】 步骤1,用户向Yarn提交应用程序,其中包括用户程序、相关文件、启动ApplicationMaster命令、ApplicationMaster程序等。步骤2,ResourceManager为该应用程序分配第一个Container,并且与Container所在的NodeManager通信,并且要求该NodeManager在这个Container中启动应用程序对应的ApplicationMast...

步骤1,用户向Yarn提交应用程序,其中包括用户程序、相关文件、启动ApplicationMaster命令、ApplicationMaster程序等。

步骤2,ResourceManager为该应用程序分配第一个Container,并且与Container所在的NodeManager通信,并且要求该NodeManager在这个Container中启动应用程序对应的ApplicationMaster。

步骤3,ApplicationMaster首先会向ResourceManager注册,这样用户才可以直接通过ResourceManager查看到应用程序的运行状态,然后它为准备为该应用程序的各个任务申请资源,并监控它们的运行状态直到运行结束,即重复后面4~7步骤。

步骤4,ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

步骤5,一旦ApplicationMaster申请到资源后,便会与申请到的Container所对应的NodeManager进行通信,并且要求它在该Container中启动任务。

步骤6,任务启动。NodeManager为要启动的任务配置好运行环境,包括环境变量、JAR包、二进制程序等,并且将启动命令写在一个脚本里,通过该脚本运行任务。

步骤7,各个任务通过RPC协议向其对应的ApplicationMaster汇报自己的运行状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以再任务运行失败时重启任务。

步骤8,应用程序运行完毕后,其对应的ApplicationMaster会向ResourceManager通信,要求注销和关闭自己。
 
  这个需要注意的是在整个工作流程当中,ResourceManager和NodeManager都是通过心跳保持联系的,NodeManager会通过心跳信息向ResourceManager汇报自己所在节点的资源使用情况。

上面介绍了一个作业从提交的结束在客户端,AM,ResourceManager以及NodeManager之间是怎么交互和流转的。下面将从代码层面介绍:

客户端的所有命令都是在YarnCLI接口中定义。总共有五种具体的Yarn客户端命令对其进行了实现:
ApplicationCLI      extends YarnCLI
NodeCLI               extends YarnCLI
ClusterCLI             extends YarnCLI
QueueCLI             entends YarnCLI
TopCLI                  entends YarnCLI

yarn命令脚本代码中定义了不同的命令的实现方式,例如application相关的:

ApplicationCLI   NodeCLI    ClusterCLI  QueueCLI   TopCLI里都有main方法,可以作为一个独立的进程运行。

Application 命令为例。例如yarn application -list查看所有的application列表,客户端进程的入口为ApplicationCLI

 ApplicationCLI extends  YarnCLI extends Configured implements Tool,因此会产生下面的调用链:

ApplicationCLI.main

---ToolRunner.runI()

------ToolRunner,run()

----------ApplicationCLI.run()

回归正题,下面我们分析下提交Yarn作业的整个流程。

我们使用客户端提交yarn作业一般命令为:yarn jar /opt/client/HDFS/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar pi -Dmapreduce.job.queuename=myqueue 10 1
使用这种方式提交yarn作业是,使用RunJar进行解析:

RunJar.main()开始放弃调用,最终也会调用到RunJar的run方法,该方法中主要做了下面几件事情:

1-获取jar包路径,并且构造JarFile对象;

2-在manifest文件中查找主类文件;

3-确保临时目录存在,权限足够。该目录用来解压jar包;

4-创建临时文件,位于上面的临时目录下面,确保能够创建;

5-删除上面创建的临时文件,确保能够删除;

6-将临时目录的删除动作作为jvm 钩子加入到ShutdownHook中,当进程推出的时候会自动删除上面创建的临时目录;

7-将jar包解压在上面创建的临时目录下面;

8-创建一个ApplicationClassLoader来加载jar包中的class

9-运行命令中jar包的main方法;启动进程;

我们知道,一个简单的mapreduce作业的实现需要包涵下面的部分:

Job job = Job.getInstance(conf, "Collect Female Info");
job.setJarByClass(FemaleInfoCollector.class);
job.setMapperClass(CollectionMapper.class);
job.setReducerClass(CollectionReducer.class);
job.setCombinerClass(CollectionCombiner.class);
// Set the output type of the job.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// Submit the job to a remote environment for execution.
System.exit(job.waitForCompletion(true) ? 0 : 1);

重点就在最后一句waitForCompletion()

Job.waitForCompletion

---Job.submit()

------submitClient.subimtJob()

submit中主要两件事:

1-connect() 创建一个客户端代理对象Cluster(属于Job类)用户和服务端RM简历RPC通信;

2-JobCommitter.submitJobInternal() 提交job

submitJobInternal中主要做了下面的事情:

1-检查作业配置的输入输出;

2-将mapreduce.application.framework.path路径配置的mr框架添加到分布式缓存中;

3-初始化并返回Staging目录,返回存放Job相关资源(例如job.xml,Splits文件等)路径的前缀;

4-获取命令行配置的job参数;

5-所有通过submitClient调用都是rpc调用,用过rpc调用想RM申请JobID;

6-在hdfs上面staging目录下创建一个以jonbid为名的目录;

7-获取hdfs的token;

8-如果配置了对mapreduce的溢写文件进行加密(默认是false),那么mapreduce.am.max-attempts设置为1,默认为2;

9-将-libjars, -files, -archives指定的文件上传到上面创建的staging目录;

10-获取job.xml的路径:./.staging/applicationID/job.xml ,并且将job.xml写入到该目录下面;

11-为job创建分片,将分片写入hdfs上的staging/applicationId/目录下面,分片(分区)的数量就是map的数量,但不能超过最大map数量(mapreduce.job.max.map)

12-调用SubmitClient.submitJob()

SubmitClient.submitJob()

---ResourceMgrDelegate.submitApplication()

------YarnClientImpl.submitApplication()

---------ClientRMService.submitApplication()

------------RMAppManager.submitApplication()

---------------RMAppManager.createAndPopulateNewApp()

------------------

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。