Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink源码剖析:Jar包任务提交流程

Flink源码剖析:Jar包任务提交流程

作者头像
用户1154259
发布于 2021-01-20 11:42:18
发布于 2021-01-20 11:42:18
2.4K00
代码可运行
举报
运行总次数:0
代码可运行

Flink基于用户程序生成JobGraph,提交到集群进行分布式部署运行。本篇从源码角度讲解一下Flink Jar包是如何被提交到集群的。(本文源码基于Flink 1.11.3)

1 Flink run 提交Jar包流程分析

首先分析run脚本可以找到入口类CliFrontend,这个类在main方法中解析参数,基于第二个参数定位到run方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
try {
    // do action
    switch (action) {
        case ACTION_RUN:
            run(params);
            return 0;
        case ACTION_RUN_APPLICATION:
            runApplication(params);
            return 0;
        case ACTION_LIST:
            list(params);
            return 0;
        case ACTION_INFO:
            info(params);
            return 0;
        case ACTION_CANCEL:
            cancel(params);
            return 0;
        case ACTION_STOP:
            stop(params);
            return 0;
        case ACTION_SAVEPOINT:
            savepoint(params);
            return 0;
        case "-h":
        case "--help":
            ...
            return 0;
        case "-v":
        case "--version":
            ...
        default:
            ...
            return 1;
    }
}

在run方法中,根据classpath、用户指定的jar、main函数等信息创建PackagedProgram。在Flink中通过Jar方式提交的任务都封装成了PackagedProgram对象。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected void run(String[] args) throws Exception {
    ...
    final ProgramOptions programOptions = ProgramOptions.create(commandLine);
    final PackagedProgram program = getPackagedProgram(programOptions);

    // 把用户的jar配置到config里面
    final List<URL> jobJars = program.getJobJarAndDependencies();
    final Configuration effectiveConfiguration = getEffectiveConfiguration(
            activeCommandLine, commandLine, programOptions, jobJars);

    try {
        executeProgram(effectiveConfiguration, program);
    } finally {
        program.deleteExtractedLibraries();
    }
}

创建PackagedProgram后,有个非常关键的步骤就是这个effectiveConfig,这里面会把相关的Jar都放入pipeline.jars这个属性里,后面pipeline提交作业时,这些jar也会一起提交到集群。

其中比较关键的是Flink的类加载机制,为了避免用户自己的jar内与其他用户冲突,采用了逆转类加载顺序的机制。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private PackagedProgram(
        @Nullable File jarFile,
        List<URL> classpaths,
        @Nullable String entryPointClassName,
        Configuration configuration,
        SavepointRestoreSettings savepointRestoreSettings,
        String... args) throws ProgramInvocationException {

    // 依赖的资源
    this.classpaths = checkNotNull(classpaths);

    // 保存点配置
    this.savepointSettings = checkNotNull(savepointRestoreSettings);

    // 参数配置
    this.args = checkNotNull(args);

    // 用户jar
    this.jarFile = loadJarFile(jarFile);

    // 自定义类加载
    this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
        getJobJarAndDependencies(),
        classpaths,
        getClass().getClassLoader(),
        configuration);

    // 加载main函数
    this.mainClass = loadMainClass(
        entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),
        userCodeClassLoader);
}

在类加载器工具类中根据参数classloader.resolve-order决定是父类优先还是子类优先,默认是使用子类优先模式。

executeProgram方法内部是启动任务的核心,在完成一系列的环境初始化后(主要是类加载以及一些输出信息),会调用packagedProgram的invokeInteractiveModeForExecution的,在这个方法里通过反射调用用户的main方法。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private static void callMainMethod(Class<?> entryClass, String[] args) 
    throws ProgramInvocationException {
    ...
    Method mainMethod = entryClass.getMethod("main", String[].class);
    mainMethod.invoke(null, (Object) args);
    ...
}

执行用户的main方法后,就是flink的标准流程了。创建env、构建StreamDAG、生成Pipeline、提交到集群、阻塞运行。当main程序执行完毕,整个run脚本程序也就退出了。

总结来说,Flink提交Jar任务的流程是: 1 脚本入口程序根据参数决定做什么操作 2 创建PackagedProgram,准备相关jar和类加载器 3 通过反射调用用户Main方法 4 构建Pipeline,提交到集群

2 通过PackagedProgram获取Pipeline

有的时候不想通过阻塞的方式卡任务执行状态,需要通过类似JobClient的客户端异步查询程序状态,并提供停止退出的能力。

要了解这个流程,首先要了解Pipeline是什么。用户编写的Flink程序,无论是DataStream API还是SQL,最终编译出的都是Pipeline。只是DataStream API编译出的是StreamGraph,而SQL编译出的Plan。Pipeline会在env.execute()中进行编译并提交到集群。

既然这样,此时可以思考一个问题:Jar包任务是独立的Main方法,如何能抽取其中的用户程序获得Pipeline呢?

通过浏览源码的单元测试,发现了一个很好用的工具类:PackagedProgramUtils。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static Pipeline getPipelineFromProgram(
        PackagedProgram program,
        Configuration configuration,
        int parallelism,
        boolean suppressOutput) throws CompilerException, ProgramInvocationException {

    // 切换classloader
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());

    // 创建env
    OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(
        configuration,
        program.getUserCodeClassLoader(),
        parallelism);
    benv.setAsContext();
    StreamPlanEnvironment senv = new StreamPlanEnvironment(
        configuration,
        program.getUserCodeClassLoader(),
        parallelism);
    senv.setAsContext();

    try {
        // 执行用户main方法
        program.invokeInteractiveModeForExecution();
    } catch (Throwable t) {
        if (benv.getPipeline() != null) {
            return benv.getPipeline();
        }

        if (senv.getPipeline() != null) {
            return senv.getPipeline();
        }

        ...
    } finally {
        // 重置classloader
    }
}

这个工具类首先在线程内创建了一个env,这个env通过threadload保存到当前线程中。当通过反射调用用户代码main方法时,内部的getEnv函数直接从threadlocal中获取到这个env。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ThreadLocal<StreamExecutionEnvironmentFactory> factory = new ThreadLocal<>();

public static StreamExecutionEnvironment getExecutionEnvironment() {
        return Utils.resolveFactory(factory , contextEnvironmentFactory)
            .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
            .orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
    }

再回头看看env有什么特殊的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class StreamPlanEnvironment extends StreamExecutionEnvironment {

    private Pipeline pipeline;

    public Pipeline getPipeline() {
        return pipeline;
    }

    @Override
    public JobClient executeAsync(StreamGraph streamGraph) {
        pipeline = streamGraph;

        // do not go on with anything now!
        throw new ProgramAbortException();
    }
}

原来是重写了executeAysnc方法,当用户执行env.execute时,触发异常,从而在PackagedProgramUtils里面拦截异常,获取到用户到pipeline。

总结起来流程如下:

3 编程实战

通过阅读上述源码,可以学习到:

1 classloader类加载的父类优先和子类优先问题 2 threadlocal线程级本地变量的使用 3 PackagedProgramUtils 利用枚举作为工具类 4 PackagedProgramUtils 利用重写env,拦截异常获取pipeline。

关于pipeline如何提交到集群、如何运行,就后文再谈了。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-01-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink1.10任务提交流程分析(一)
Flink任务常见的提交方式通过flink run命令方式提交,如果我们想自己通过API方式实现任务提交,那么就需要了解flink run执行过程,本篇主要透过源码分析其提交流程。(注:基于1.10.1分析)
Flink实战剖析
2022/04/18
6450
一文搞定 Flink Job 的运行过程
之前我们知道了Flink 是如何生成 StreamGraph 以及 如何生成 job 和 如何生成Task,现在我们通过 Flink Shell 将他们串起来,这样我们就学习了从写代码开始到 Flink 运行 task 的整个过程是怎么样的。
shengjk1
2021/04/25
2.2K0
一文搞定 Flink Job 的运行过程
Flink命令行 - 1.10
结合Flink官方文档,整理关于Flink命令行的操作参数,包含命令行接口和Scala Shell
Eights
2020/07/10
2.8K0
Flink命令行 - 1.10
Flink1.4 并发执行
本节介绍如何在Flink中配置程序的并行执行。一个Flink程序由多个任务(transformations/operators,data sources和sinks)组成。一个任务被分成多个并发实例来执行,每个并发实例只处理任务输入数据的一个子集。一个任务的并发实例的个数称为并发度(parallelism)。
smartsi
2019/08/07
1.2K0
聊聊flink的Parallel Execution
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java
code4it
2019/02/12
3K0
聊聊flink的Parallel Execution
Flink Client 实现原理与源码解析(保姆级教学)
这次我们的目的是,在本地的 IDEA 中去 debug flink-clients 代码,然后远程提交给 flink standalone 集群上去执行,看一看 flink 客户端在提交代码之前都干了什么。
kk大数据
2021/02/07
2.5K0
一文搞定 Flink Job 提交全流程
前面,我们已经分析了 一文搞定 Flink 消费消息的全流程 、写给大忙人看的 Flink Window原理 还有 一文搞定 Flink Checkpoint Barrier 全流程 等等,接下来也该回归到最初始的时候,Flink Job 是如何提交的。
shengjk1
2020/07/13
2.6K0
Flink1.10任务提交流程分析(二)
在Flink1.10任务提交流程分析(一)中分析了从flink run开始到任务提交到集群前的流程分析,对于不同的提交模式Flink中使用不同的PipelineExecutor,本篇基于yarn-per-job模式分析向yarn-cluster提交任务的流程。(注:基于1.10.1分析)
Flink实战剖析
2022/04/18
6770
一文搞定 Flink Task 提交执行全流程
这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么
shengjk1
2020/07/14
1.4K0
flink指定jobid
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
路过君
2022/06/01
8810
flink on yarn部分源码解析
转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/9403427.html
sanmutongzi
2020/03/05
1.1K0
Flink 源码阅读环境准备,并调试 Flink-Clients 模块
读文档和读源码的目的是不一样的,就拿 Apache Flink 这个项目来说,如果你想知道 Flink 的使用功能,设计思想,实现原理,看官方文档就足够了;如果你想了解的就是具体细节,比如说 StreamGraph 是怎么生成的或者是 Exactly Once 究竟如何实现的,那么就需要去阅读源码了。
kk大数据
2021/02/24
9630
Flink 源码阅读环境准备,并调试 Flink-Clients 模块
Mapreduce 任务提交源码分析1
提交过程 一般我们mapreduce任务是通过如下命令进行提交的 $HADOOP_HOME/bin/hadoop jar $MR_JAR $MAIN_CLASS hadoop脚本中有如下代码 elif [ "$COMMAND" = "jar" ] ; then CLASS=org.apache.hadoop.util.RunJar //... 略 exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@" 可以看到hadoop命令提
囚兔
2018/02/08
1.1K0
Flink深入浅出: 应用部署与原理图解(v1.11)
Flink在1.11版本新增了一种部署模式,目前支持三种:Session 模式、Per job 模式、Application 模式,这三种模式主要在集群管理、资源隔离、用户main方法执行位置几个方面有所不同。
用户1154259
2020/10/26
1.3K0
Flink深入浅出: 应用部署与原理图解(v1.11)
听说你熟悉Flink-On-Yarn的部署模式?
Flink提供了两种在yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式,本文分析两种模式及启动流程。
王知无-import_bigdata
2019/07/15
2.9K0
听说你熟悉Flink-On-Yarn的部署模式?
追源索骥:透过源码看懂Flink核心框架的执行流程
写在最前:因为这篇博客太长,所以我把它转成了带书签的pdf格式,看起来更方便一点。想要的童鞋可以到我的公众号“老白讲互联网”后台留言flink即可获取。
老白
2018/08/01
10.3K4
追源索骥:透过源码看懂Flink核心框架的执行流程
【Flink】第二十五篇:源码角度分析作业提交逻辑
【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失
章鱼carl
2022/03/31
9480
【Flink】第二十五篇:源码角度分析作业提交逻辑
详解flink 1.11中的新部署模式-Application模式
目前对于flink来说,生产环境一般有两个部署模式,一个是 session模式,一个是per job模式。
大数据技术与应用实战
2020/09/15
2.4K0
Flink并行度
本节介绍如何在Flink中配置程序的并行执行。FLink程序由多个任务(转换/操作符、数据源和sinks)组成。任务被分成多个并行实例来执行,每个并行实例处理任务的输入数据的子集。任务的并行实例的数量称之为并行性。
Spark学习技巧
2018/08/01
2.6K0
Flink并行度
Flink源码解读系列 | 任务提交流程
Flink在1.10版本对整个作业提交流程有了较大改动,详情请见FLIP-73。本文基于1.10对作业提交的关键流程进行分析,不深究。 入口: 依旧是main函数最后env.execute();
大数据真好玩
2020/09/22
9210
相关推荐
Flink1.10任务提交流程分析(一)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验