Flink任务常见的提交方式通过flink run命令方式提交,如果我们想自己通过API方式实现任务提交,那么就需要了解flink run执行过程,本篇主要透过源码分析其提交流程。(注:基于1.10.1分析)
提交入口
查看bin/flink脚本可以看到提交入口类为:org.apache.flink.client.cli.CliFrontend,传入的参数就是flink 命令后面的参数,查看main方法:
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. $FLINK_HOME/conf
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. 加载flink-conf.yaml
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. 初始化所有的提交模式的参数解析器
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
//初始化执行入口
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
int retCode = SecurityUtils.getInstalledContext()
//parseParameters 会根据不同的类型:run、info、list、modify等执行不同的流程
.runSecured(() -> cli.parseParameters(args));
System.exit(retCode);
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
}
CustomCommandLine 表示的是一个命令行的参数解析的接口,其实现有FlinkYarnSessionCli、DefaultCLI,FlinkYarnSessionCli解析per-job或者session模式参数,DefaultCLI解析standalone模式参数。程序会根据传入的参数选项选择合适的参数解析器,通过其isActive方法其匹配,然后调用applyCommandLineOptionsToConfiguration解析参数。
RUN流程
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
//savepoint恢复参数
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
//将参数封装在CommandLine中
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
//实例一个ProgramOptions对象,包含了jar路径、用户程序入口类、用户程序参数、classpath等
final ProgramOptions programOptions = new ProgramOptions(commandLine);
// 帮助命令
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
if (!programOptions.isPython()) {
// Java program should be specified a JAR file
if (programOptions.getJarFilePath() == null) {
throw new CliArgsException("Java program should be specified a JAR file.");
}
}
//代表程序,包含jar、参数等信息
final PackagedProgram program;
try {
LOG.info("Building program from JAR file");
program = buildProgram(programOptions);
}
catch (FileNotFoundException e) {
throw new CliArgsException("Could not build the program from JAR file.", e);
}
//程序所需要jar信息,主要是用户jar包
final List<URL> jobJars = program.getJobJarAndDependencies();
//获取有效的配置信息,在这里会根据不同的参数解析器获取有效的配置信息
final Configuration effectiveConfiguration =
getEffectiveConfiguration(commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try {
executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}
在getEffectiveConfiguration方法中,会根据参数选择不同的参数解析器,例如在per-job模式会使用 -m yarn-cluster,那么就会选择FlinkYarnSessionCli参数解析器,在这个过程中有一个重要的参数配置:execution.target,目标执行器,决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job、remote,这个参数的配置也是通过不同的提交模式来配置的。
执行Program流程
executeProgram 方法直接调用ClientUtils.executeProgram方法:
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program) throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
//用户创建程序执行的上下文
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
executorServiceLoader,
configuration,
userCodeClassLoader);
//会将factory赋给ExecutionEnvironment中变量
ContextEnvironment.setAsContext(factory);
try {
//调用程序main方法
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
PipelineExecutorServiceLoader 用户Executor执行器的选择,参考Flink1.10基于工厂模式的任务提交与SPI机制;
ContextEnvironmentFactory用于创建程序执行的上下文ExecutionEnvironment,可以理解为其封装了程序与外界之间的交互方式,例如per-job模式还是standalone模式、需要的资源大小等等,同时也会根据其类型创建不同StreamExecutionEnvironment(看下文详解)。对于客户端提交方式创建的是ContextEnvironment类型的ExecutionEnvironment。
Main提交流程
program.invokeInteractiveModeForExecution方法用户调用用户程序的main方法,在main方法中会调用StreamExecutionEnvironment.getExecutionEnvironment 获取合适的StreamExecutionEnvironment:
//StreamExecutionEnvironment.java
public static StreamExecutionEnvironment getExecutionEnvironment() {
//threadLocalContextEnvironmentFactory、contextEnvironmentFactory默认都为空,所以会调用createStreamExecutionEnvironment方法
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
.orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment);
}
private static StreamExecutionEnvironment createStreamExecutionEnvironment() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
return new StreamContextEnvironment((ContextEnvironment) env);
} else if (env instanceof OptimizerPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
return createLocalEnvironment();
}
}
//ExecutionEnvironment.java
public static ExecutionEnvironment getExecutionEnvironment() {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(ExecutionEnvironmentFactory::createExecutionEnvironment)
//在本地local的模式,创建LocalEnvironment
.orElseGet(ExecutionEnvironment::createLocalEnvironment);
}
在ClientUtils.executeProgram 中分析到,会通过ContextEnvironment. setAsContext( factory)给threadLocalContextEnvironment Factory与contextEnvironmentFactory赋值,那么调用ContextEnvironmentFactory. createExecutionEnvironment 得到一个ContextEnvironment。
最终StreamExecutionEnvironment. getExecutionEnvironment 得到一个内部封装了ContextEnvironment 对象的StreamExecutionEnvironment对象。
Execute流程
待main方法执行用户代码流程之后会调用StreamExecutionEnvironment.execute方法,接着会调用executeAsync(StreamGraph)方法:
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
//根绝提交模式选择匹配的factory
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
//选择合适的executor提交任务
CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (Throwable t) {
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
ExceptionUtils.rethrow(t);
// make javac happy, this code path will not be reached
return null;
}
}
这里就是上一篇讲到的根据SPI机制加载出所有PipelineExecutorFactory,然后选择匹配的factory,匹配的条件就是符合上文提到的execution.target参数的factory,对于yarn-per-job就是YarnJobClusterExecutorFactory,最终会获取到YarnJobClusterExecutor类型的Executor去向yarn提交作业。
总结
本文主要分析了flink run的开始到提交到集群前的流程,我认为可以简化为三步:
下一篇将会以yarn-per-job提交模式为例分析其具体提交过程。