Flink 1.10内核源码梳理-client提交作业流程源码梳理
【摘要】 以flink1.10开源代码未例子。梳理flink client提交作业的源码流程 1、/bin/flink run xxx用户通过flink client的命令行工具提交作业flink只是个shell脚本,原理就是调用如下类 org.apache.flink.client.cli.CliFrontend “$@” 2、CliFrontend的main()函数内部逻辑getConfigura...
以flink1.10开源代码未例子。梳理flink client提交作业的源码流程
1、/bin/flink run xxx
用户通过flink client的命令行工具提交作业
flink只是个shell脚本,原理就是调用如下类 org.apache.flink.client.cli.CliFrontend “$@”
2、CliFrontend的main()函数内部逻辑
getConfigurationDirectoryFromEnv(); - 获取配置文件的目录-FLINK_CONF_DIR环境变量
GlobalConfiguration.loadConfiguration(configurationDirectory) - 解析flink-conf.yaml文件
loadCustomCommandLines - 这个是为不同cli解析模式而设计的
cli = new CliFrontend(configuration,customCommandLines);
cli.parseParameters(args)
run(params) - 根据flink run这个关键参数走run流程
3、run(params)
run(params) - 此流程是client解析出作业流程并提交的主要流程
CliFrontendParser.getRunCommandOptions
CliFrontendParser.parse -解析参数
program = buildProgram(programOptions);
dependencyFiles
PackagedProgram.newBuilder()
effectiveConfiguration = getEffectiveConfiguration(commandLine, programOptions, jobJars);
executeProgram(effectiveConfiguration, program) - 核心的执行用户自定义的jar作业
ContextEnvironment.setAsContext(factory); - 设置classloader和上下文环境
program.invokeInteractiveModeForExecution(); - 用反射执行用户的用户自定义的jar作业的main函数
callMainMethod
mainMethod = entryClass.getMethod("main", String[].class);
mainMethod.invoke(null, (Object) args);
program.deleteExtractedLibraries();
4、核心的ExecutionEnvironment.execute()
用户写 jar作业最后都会调用env.execute()执行这个函数就会开始解析流图,并往集群提交
jobClient = executeAsync(jobName);
Plan plan = createProgramPlan(jobName);
xecutorFactory.getExecutor(configuration).execute(plan, configuration);
AbstractSessionClusterExecutor.execute - 以session部署模式为例子
jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
clusterClient = clusterClientProvider.getClusterClient();
clusterClient.submitJob(jobGraph) - 向集群提交作业
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)