【Hadoop】【Yarn】Yarn作业提交过程
步骤1,用户向Yarn提交应用程序,其中包括用户程序、相关文件、启动ApplicationMaster命令、ApplicationMaster程序等。
步骤2,ResourceManager为该应用程序分配第一个Container,并且与Container所在的NodeManager通信,并且要求该NodeManager在这个Container中启动应用程序对应的ApplicationMaster。
步骤3,ApplicationMaster首先会向ResourceManager注册,这样用户才可以直接通过ResourceManager查看到应用程序的运行状态,然后它为准备为该应用程序的各个任务申请资源,并监控它们的运行状态直到运行结束,即重复后面4~7步骤。
步骤4,ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5,一旦ApplicationMaster申请到资源后,便会与申请到的Container所对应的NodeManager进行通信,并且要求它在该Container中启动任务。
步骤6,任务启动。NodeManager为要启动的任务配置好运行环境,包括环境变量、JAR包、二进制程序等,并且将启动命令写在一个脚本里,通过该脚本运行任务。
步骤7,各个任务通过RPC协议向其对应的ApplicationMaster汇报自己的运行状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以再任务运行失败时重启任务。
步骤8,应用程序运行完毕后,其对应的ApplicationMaster会向ResourceManager通信,要求注销和关闭自己。
这个需要注意的是在整个工作流程当中,ResourceManager和NodeManager都是通过心跳保持联系的,NodeManager会通过心跳信息向ResourceManager汇报自己所在节点的资源使用情况。
上面介绍了一个作业从提交的结束在客户端,AM,ResourceManager以及NodeManager之间是怎么交互和流转的。下面将从代码层面介绍:
客户端的所有命令都是在YarnCLI接口中定义。总共有五种具体的Yarn客户端命令对其进行了实现:
ApplicationCLI extends YarnCLI
NodeCLI extends YarnCLI
ClusterCLI extends YarnCLI
QueueCLI entends YarnCLI
TopCLI entends YarnCLI
yarn命令脚本代码中定义了不同的命令的实现方式,例如application相关的:
ApplicationCLI NodeCLI ClusterCLI QueueCLI TopCLI里都有main方法,可以作为一个独立的进程运行。
Application 命令为例。例如yarn application -list查看所有的application列表,客户端进程的入口为ApplicationCLI
ApplicationCLI extends YarnCLI extends Configured implements Tool,因此会产生下面的调用链:
ApplicationCLI.main
---ToolRunner.runI()
------ToolRunner,run()
----------ApplicationCLI.run()
回归正题,下面我们分析下提交Yarn作业的整个流程。
我们使用客户端提交yarn作业一般命令为:yarn jar /opt/client/HDFS/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar pi -Dmapreduce.job.queuename=myqueue 10 1
使用这种方式提交yarn作业是,使用RunJar进行解析:
RunJar.main()开始放弃调用,最终也会调用到RunJar的run方法,该方法中主要做了下面几件事情:
1-获取jar包路径,并且构造JarFile对象;
2-在manifest文件中查找主类文件;
3-确保临时目录存在,权限足够。该目录用来解压jar包;
4-创建临时文件,位于上面的临时目录下面,确保能够创建;
5-删除上面创建的临时文件,确保能够删除;
6-将临时目录的删除动作作为jvm 钩子加入到ShutdownHook中,当进程推出的时候会自动删除上面创建的临时目录;
7-将jar包解压在上面创建的临时目录下面;
8-创建一个ApplicationClassLoader来加载jar包中的class
9-运行命令中jar包的main方法;启动进程;
我们知道,一个简单的mapreduce作业的实现需要包涵下面的部分:
Job job = Job.getInstance(conf, "Collect Female Info");
job.setJarByClass(FemaleInfoCollector.class);
job.setMapperClass(CollectionMapper.class);
job.setReducerClass(CollectionReducer.class);
job.setCombinerClass(CollectionCombiner.class);
// Set the output type of the job.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// Submit the job to a remote environment for execution.
System.exit(job.waitForCompletion(true) ? 0 : 1);
重点就在最后一句waitForCompletion()
Job.waitForCompletion
---Job.submit()
------submitClient.subimtJob()
submit中主要两件事:
1-connect() 创建一个客户端代理对象Cluster(属于Job类)用户和服务端RM简历RPC通信;
2-JobCommitter.submitJobInternal() 提交job
submitJobInternal中主要做了下面的事情:
1-检查作业配置的输入输出;
2-将mapreduce.application.framework.path路径配置的mr框架添加到分布式缓存中;
3-初始化并返回Staging目录,返回存放Job相关资源(例如job.xml,Splits文件等)路径的前缀;
4-获取命令行配置的job参数;
5-所有通过submitClient调用都是rpc调用,用过rpc调用想RM申请JobID;
6-在hdfs上面staging目录下创建一个以jonbid为名的目录;
7-获取hdfs的token;
8-如果配置了对mapreduce的溢写文件进行加密(默认是false),那么mapreduce.am.max-attempts设置为1,默认为2;
9-将-libjars, -files, -archives指定的文件上传到上面创建的staging目录;
10-获取job.xml的路径:./.staging/applicationID/job.xml ,并且将job.xml写入到该目录下面;
11-为job创建分片,将分片写入hdfs上的staging/applicationId/目录下面,分片(分区)的数量就是map的数量,但不能超过最大map数量(mapreduce.job.max.map)
12-调用SubmitClient.submitJob()
SubmitClient.submitJob()
---ResourceMgrDelegate.submitApplication()
------YarnClientImpl.submitApplication()
---------ClientRMService.submitApplication()
------------RMAppManager.submitApplication()
---------------RMAppManager.createAndPopulateNewApp()
------------------
- 点赞
- 收藏
- 关注作者
评论(0)