Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【Flink】第二十六篇:源码角度分析Task执行过程

【Flink】第二十六篇:源码角度分析Task执行过程

作者头像
章鱼carl
发布于 2022-03-31 03:20:09
发布于 2022-03-31 03:20:09
79400
代码可运行
举报
文章被收录于专栏:章鱼carl的专栏章鱼carl的专栏
运行总次数:0
代码可运行

源码分析系列推荐:

【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失

【Flink】第十五篇:Redis Connector 数据保序思考

【Flink】第十六篇:源码角度分析 sink 端的数据一致性

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

【Flink】第二十五篇:源码角度分析作业提交逻辑

继上篇 【Flink】第二十五篇:源码角度分析作业提交逻辑 我们分析了Flink在执行execute提交作业前,将用户编写的业务UDF逻辑封装成List<Transformation>数据结构,然后,在执行execute提交作业中,又用递归算法将其绘制成DAG数据结构,并且进行了四层的DAG转换,最终,转换为可调度的ExecutionGraph。

本文接着分析Task被调度到TaskManager上后,Task是如何处理输入数据和输出数据。

依旧以socket window wordcount程序为例,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env
                .socketTextStream("127.0.0.1", 5555)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1)
                .print();
        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

这次笔者尝试优化之前文章的行文逻辑,将结论进行切分,然后一段结论再结合一段源码分析,本文的主要线索以wrodcount程序处理一次输入数据的过程为线索,在探索这个线索的过程中,以期能达到抽丝剥茧的方式为读者呈现。主要内容有两点:

  1. 从TaskManager的subtask线程如何执行调用到了用户的自定义UDF业务逻辑代码
  2. 输入分区和输出分区的对应关系

直接上flatmap算子的调用栈,如下

可以看到,栈底是Thread,这个Thread应该是MiniCluster启动的subtask的执行线程,在往上就是flink抽象的运行时角色的实例了,例如,Task,StreamTask,自底向上逐渐由面向Thread的层面过渡到面向flink的udf用户逻辑层面。

我们直接从Task调起StreamTask的入口看起,Task将接收到的输入数据给了StreamTask的processInput,

StreamTask又将其交给inputProcessor(StreamInputProcessor)处理。

而在StreamTask与StreamInputProcessor之间使用了Mailbox线程模型,它是一个单线程的模型,在此只做简单介绍,

先来看下这个改造/改进最初的动机,在之前 Flink 的线程模型中,会有多个潜在的线程去并发访问其内部的状态,比如 event-processing 和 checkpoint triggering,它们都是通过一个全局锁(checkpoint lock)来保证线程安全,这种实现方案带来的问题是:

1. 锁对象会在多个类中传递,代码的可读性比较差

2. 在使用时,如果没有获取锁,可能会造成很多问题,使得问题难以定位

3. 锁对象还暴露给了面向用户的 API

基于上面的这些问题,关于线程模型,提出了一个全新的解决方案 —— MailBox 模型,它可以让 StreamTask 中所有状态的改变都会像在单线程中实现得一样简单。方案借鉴了 Actor 模型的 MailBox 设计理念,它会让这些 action 操作(需要获取 checkpoint lock 的操作)先加入到一个 阻塞队列,然后主线程再从队列取相应的 mail task 去执行。

最后真正执行的是 MailboxProcessor 中的 runMailboxLoop() 方法,也就是上面说的 MailBox 主线程,StreamTask 运行的核心流程也是在这个方法中,其实现如下:

上面的方法中,最关键的有两个地方:

1. processMail(): 它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false

2. runDefaultAction(): 这个最终调用的 StreamTask 的 processInput() 方法,event-processing 的处理就是在这个方法中进行的

我们沿着StreamTask的线索继续探索,在processInput中,StreamTask将消息交给了StreamInputProcessor,而StreamInputProcessor是对StreamTask中读取数据的行为抽象,具体由StreamTaskInput完成,如下就是StreamInputProcessor调用StreamTaskInput的emitNext处理输入数据,

而StreamTaskInput是StreamTask输入数据的抽象,将输入数据反序列后交给StreamTaskNetWorkOutput。同时,StreamTaskInput有两个主要子类:

1. StreamTaskNetworkInput:使用InputGate读取数据

2. StreamTaskSourceInput:使用SourceFunction读取数据

那么,接着来看StreamTaskNetworkInput是如何处理StreamTask传递进来的输入数据的,

这里把流数据元素的抽象StreamElement划分为了四类,与我们在之前介绍的一致:

  • Record
  • Watermark
  • LatencyMarker
  • StreamStatus

在此我们先顺着调用栈的线索,进入OneInputStreamTask,

OneInputStreamTask持有了OneInputStreamOperator对输入进行处理,而OneInputStreamOperator我们在之前已经介绍过,它其实就是用户的UDF业务逻辑的封装,在这里因为我们进入的是FlatMap的调用栈,所以,运行时的实例是StreamFlatMap,所以继续进入这个类的处理元素的方法,

在这里,我们就和wordcount自定义的FlatMapFunction对接上了,他调用了userFunction的flatMap接口运行wordcount中的分词逻辑,即最终执行了如下wordcount代码,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word : sentence.split(" ")) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}

讲完本文的第一个内容再来看看第二个:输入分区和输出分区的对应关系

以上是一个典型的物理执行计划,

在数据输出方面,主要包含两个核心抽象:

  • ResultPartition:是一个Task的输出的抽象,包含若干ResultSubPartition。
  • ResultSubPartition:下游请求数据是请求ResultSubPartition,而不是ResultPartition,负责实际上存储Buffer

ResultPartition数量决定因素主要是上游并行度。

ResultSubPartition数量决定因素主要是:下游并行度 + 上游数据分发模式

另外,关于Buffer,我们在【Flink】第八篇:Flink 内存管理 中已经介绍过Flink的内存模型。在Flink中Java对象的有效信息被序列化,在内存中连续存储,保存在预分配的内存块上,内存块叫作MemorySegment,即内存分配的最小单元。很多运算可以直接操作序列化的二进制数据,而不需要反序列化。MemorySegment可以在堆上:Java byte数组;也可以在堆外:ByteBuffer。Task算子处理完数据后,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。其实现类是NetworkBuffer。一个NetworkBuffer包装了一个MemorySegment。

在数据输入方面,主要包含两个核心抽象:

  • InputGate:是一个Task的输入数据的抽象,包含若干InputChannel,主要包含SignleInputGate和UnionInputGate两个实现类
  • InputChannel:实际负责数据消费的是InputChannel,主要包含LocalInputChannel,即数据本地性;RmoteInputChannel,即跨网络数据交换Flink选择了Netty

一个InputChannel对应上游一个ResultSubPartition。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 章鱼沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
[源码分析] 从源码入手看 Flink Watermark 之传播过程
本文将通过源码分析,带领大家熟悉Flink Watermark 之传播过程,顺便也可以对Flink整体逻辑有一个大致把握。
罗西的思考
2020/09/07
2K0
一文搞定 Flink 消费消息的全流程
当 Flink 程序启动,leader、blobServer 等都创建完毕,当 ExecutionGraph 构建完成,提交成功之后。就到了,task 正式执行的阶段了。这个时候,一条消息是如何流转的呢? 首先,进入了 Task 的 run 方法
shengjk1
2020/06/04
1.5K0
一文搞定 Flink 消费消息的全流程
追源索骥:透过源码看懂Flink核心框架的执行流程
写在最前:因为这篇博客太长,所以我把它转成了带书签的pdf格式,看起来更方便一点。想要的童鞋可以到我的公众号“老白讲互联网”后台留言flink即可获取。
老白
2018/08/01
10.3K4
追源索骥:透过源码看懂Flink核心框架的执行流程
【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑
【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失
章鱼carl
2022/03/31
8300
【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑
聊聊flink KeyedStream的reduce操作
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
code4it
2018/12/29
4.2K0
聊聊flink KeyedStream的reduce操作
聊聊flink的BoltWrapper
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper.java
code4it
2018/11/25
8880
聊聊flink的BoltWrapper
Flink原理 | Flink中的数据抽象及数据交换过程
场景描述:Flink作为一个高效的流框架,为了避免JVM的固有缺陷(java对象存储密度低,FGC影响吞吐和响应等),必然走上自主管理内存的道路。
王知无-import_bigdata
2019/12/05
2.2K1
Flink原理 | Flink中的数据抽象及数据交换过程
flink源码分析之kafka consumer的执行流程
线上flink任务稳定运行了两个多月了,突然之间收到了消息堆积较多的报警,kafka上看到的现象是消息堆积较多。问过业务人员得知,对应的流表在前一天重新刷了一遍数据,在我们的这个任务中有两次维表关联,而且内层有一个split操作会造成外层维表关联的数据量膨胀(最大可能为80倍,即split之后产生了80条新记录)。开始了问题分析之路。
山行AI
2021/04/29
3.3K0
flink源码分析之kafka consumer的执行流程
【Flink】第三十三篇: 任务线程模型
线程模型能帮助我们更深刻的理解Flink任务执行原理,更精确的控制Flink程序,这些是使用Flink解决复杂问题、写出高性能和高可用程序的基础。
章鱼carl
2022/03/31
2.2K0
【Flink】第三十三篇: 任务线程模型
一文搞定 Flink Checkpoint Barrier 全流程
上文中,我们一起了解了 一文搞定 Flink 消费消息的全流程,接下来呢,我们一起来看一下 checkpoint barrier 的全流程。
shengjk1
2020/06/21
1.3K0
用Java实现samza转换成flink
在大数据处理领域,Apache Samza和Apache Flink是两个流行的流处理框架。虽然它们都能处理实时数据流,但在架构、API特性和使用场景上有所不同。随着技术的演进,开发者可能需要将基于Samza的应用迁移到Flink,以利用Flink在吞吐量、延迟和高级功能方面的优势。本文将详细介绍如何使用Java将Samza应用转换成Flink应用。
编程小白狼
2024/12/31
1050
flink线程模型源码分析1之前篇将StreamTask中的线程模型更改为基于Mailbox的方法
本文中关于将StreamTask中的线程模型更改为基于Mailbox的方法主要译自如下两处:
山行AI
2021/03/11
2.9K0
flink线程模型源码分析1之前篇将StreamTask中的线程模型更改为基于Mailbox的方法
flink分析之Task的生命周期
之前有想过系统地来一番flink源码分析系列,谁曾想工作中需要完成的需求有些多,完整的flink源码分析系列只能一再往后拖了。之前公众号后台有想学习flink的朋友留言想看更多学习flink的资料,现在先发一些之前收藏的关于flink相关的文章,其中大多翻译自flink社区,希望能给大家带来一些帮助。本文[1]主要围绕flink任务的生命周期展开。
山行AI
2021/04/16
1.6K0
flink分析之Task的生命周期
Flink重点难点:内存模型与内存结构
Java 虚拟机在执行Java程序的过程中会把它在主存中管理的内存部分划分成多个区域,每个区域存放不同类型的数据。下图所示为java虚拟机运行的时候,主要的内存分区:
王知无-import_bigdata
2021/09/22
1.5K0
一文搞定 Flink Task 提交执行全流程
这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么
shengjk1
2020/07/14
1.4K0
轻装上阵Flink--在IDEA上开发基于Flink的实时数据流程序
链接:https://pan.baidu.com/s/12rXlY_z_Fck8-NRXdZ5row
壮壮熊
2022/08/18
5700
轻装上阵Flink--在IDEA上开发基于Flink的实时数据流程序
Flink UDF自动注册实践
通过上面得方法,发现在检查完类的实例化之后,便是对该类进行注册使用,分别针对Table API和SQL API两种不同形式去进行注册。
王知无-import_bigdata
2019/07/23
1.7K0
Flink UDF自动注册实践
Flink入门宝典(详细截图版)
本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。
用户6070864
2019/09/18
9110
Flink入门宝典(详细截图版)
Flink源码解读系列 | Flink中接收端反压以及Credit机制
可以看到每个task都会有自己对应的IG(inputgate)对接上游发送过来的数据和RS(resultPatation)对接往下游发送数据, 整个反压机制通过inputgate,resultPatation公用一个一定大小的memorySegmentPool来实现(Flink中memorySegment作为内存使用的抽象,类比bytebuffer), 公用一个pool当接收上游数据时Decoder,往下游发送数据时Encoder,都会向pool中请求内存memorySegment 。因为是公共pool,也就是说运行时,当接受的数据占用的内存多了,往下游发送的数据就少了,这样是个什么样的情况呢?
大数据真好玩
2020/09/22
9340
Flink源码解读系列 | Flink中接收端反压以及Credit机制
Flink入门宝典(详细截图版)
本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。
大数据流动
2019/09/29
8260
Flink入门宝典(详细截图版)
相关推荐
[源码分析] 从源码入手看 Flink Watermark 之传播过程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验