00:00
然后我们接下来再给大家专门解释一下,这里边最重要的这一步啊,就是为什么这里边后面我们拆成这个并行化执行版本,这个大家已经了解了,对吧?哎,那这里边为什么某些条件下满足某些条条件下这个有些任务可以合并成一个大的任务呢?哎,那这里边就是满足的这个条件到底应该是什么呢?这首先要给大家从任务之间的数据传输形式讲起啊,那么在代码里边大家会发现啊,程序里边不同的算子前后发生的这个不同的算子是不是可以设置不同的并行度啊?哎,比方说你看前面这个S,我设的并行度是一对吧,然后后边这个Fla map设的并行度就是二,它是完全可以不一样的,然后另外还有一个呢,就是不同的算子之间,它的这个数据传输模式本来也可以是完全不一样的,那这个指的是什么呢?就是说首先啊,大家会想到,如果要是map,或者说flat map像filter这样的任务的话,它有一个特点就是啥呀。
01:03
对,大家想就是我当前的这个简单的这个这个计算这个转换啊,它是不是可以维护着当前分区里边的元素的顺序啊,对吧,你就是做map,做filter,那是不是就是来一个处理一个,来一个处理一个,是不是如果我当前接下来当前分区就有下一步操作的话,我是不是直接直接还在当前分区继续做就可以了,大家看这个过程并不会打乱它的顺序,也不会进行重分区的这个调整,对吧?所以这样的任务的话。呃,就是假如说是这样的算子的话,我们就说它的这个数据传输形式是one to one的这样的一个传输形式。就是当前这个map,它做完计算之后传递到下游的时候,我并不需要做任何的这个重分区对应的这个操作,对吧?只要比方说我一个map后面接了一个filter,那是不是直接就是map做完了之后,本区内如果有filter操作的话,我是不是直接在当前这个分区直接filter就可以了啊,所以这就是所谓的这个one to one操作。
02:10
啊,那另外还对应的有一个。啊,就是所谓的redistributing redistributing的话指的就是分区会发生改变对吧?啊,那所以当前就是每一个算子任务啊,它其实是会根据接下来我要做的这个操作,我要去选择对应的分区,这里就涉及到了一个这个重分区的过程,对吧?所以比方说最经典的一个例子就是KBYKBY,它是基于什么重分区的哈希code,对K的那个哈希code做一个重分区啊那呃,就是我先算一下这个当前K哈code,然后呢,呃,可能要做一个曲模运算对吧?针对我当前下游并行的这个子任务到底有几个,然后去选取一个对应的分区,然后发送到对应的分区里面去。那另外还有一些flink里边还有一些其他的这个,呃相当于是这个数据传输和这个呃做做做这个呃转换啊分区呃分重分区的这样的一些操作,就是比方说像这个broadcast broadcast,那就相当于是。
03:13
直接是广播出去了,对不对啊,然后另外还有就是re balanceance re balance指的是什么呢?啊,这个说是随机重新问句啊,其实不是完全随机,其实指的是。Reb指的是什么呢?轮询,那就是大家看这里边啊,我们可以看一下之前在这个提交job之后,大家看到的这个。看到的这个直行图。执行的计划对吧,大家看这里边,这里边有一个连接的这个线对不对,这个线大家看这里边直接连起来写了一个forward,这代表什么?Forward其实代表他俩之间就是one to one的这种传输,对不对,就是一一对应,是不是直接转发呀,然后大家看rebance rebance指的是什么?
04:00
Rebance指的就是我们刚才说的这个再平衡,就是轮巡的这种方式,所谓的轮询方式就是这里边,如果我,呃对应的啊,下游有多个并行子任务的话,那是不是我当前这个任务来了数据之后就是第一个任务,呃,第一个数据发送到第一个分区,第二个数据来了,是不是我就发到第二个啊,第三个来了发到第三个对吧?啊,第四个来了,如果我只有下游只有三个并行子任务的话,那第四个是不是翻回头来继续发第一个啊,啊轮巡的一个卧程,这个叫re balance,那中间这条连线如果是哈希的话,这就代表什么?对,基于哈希扣的充分句,对吧?啊,所以这就是其实代表了他们之间的不同任务之间的数据传输的模式。所以大家其实也发现了啊,这如果对应这个Spark里边的概念的话,这就相当于什么呀,对,大家看这one to one,这不就相当于是窄依赖吗?对,那redistributing那就相当于是宽依赖对吧?啊,所以这个概念大家就相通了,这里大家重点要区分的一个概念就是Spark里边我们是有一个杀uffle的过程和这样一个概念的,对吧?诶,那对于这个flink而言,它没有严格意义上提出的这个uffle的概念,但是它其实也有一个算子啊,也有一个一个算子,就是我们在做重分区的时候,有一个那个方法就叫点杀否。
05:24
那这个点沙Le指的就是什么呢?指的就是完全随机,这个就是完全随机了啊,Re balanceence是轮循去选择下一个分区对吧?那沙Le就是完全随机啊,直接去选择下一个分区啊,所以这个其实也是一个redistributing啊,重分区的一个过程啊,那大家稍微注意的就是Spark里边杀杀uffle,我们指的是洗牌对吧?那所以它是不是必须要。我们一个stage完成,是不是所有数据都已经搞定了,数据都到这儿了,我们整个一副牌都收集齐了,然后去洗牌,这个才有意义。而对于flink而言,我们现在有一副牌收集齐的过程吗?没有,对吧,数据是源源不断来的呀,我现在处理的是不是只有一张牌,一张牌对吧?只有这个来了,这个来了对吧?所以大家想,那flink里边你如果要说这个沙佛的话,是相当于什么,或者说说这个重分区相当于什么?
06:17
相当于是发牌对不对,哎,就有点儿像发牌的一个过程,就是来了一张牌,诶,我我指定我到底要发给谁,我直接一张牌发出去对吧?所以大家看这个过程就跟呃,这个Spark里边稍微有一点区别,本质上还是批处理跟流处理的区别啊,所以大家要把这个过程理解清楚啊。那所以有了数据传输形式的这样的一个概念之后,那大家回忆一下前面我们讲到的这个合并,合并,呃,前后的两个任务到底应该是怎么样可以合并在一起呢?对,那大家是不是想到,假如说我这里边是宽依赖啊,类似于宽依赖这样重分区操作的话,它能直接合并在一起吗?当然不能合并,你如果合并在一起,是不是接下来它就必须在当前分区内做下一步操作啊,那你要重分区啊,这个数据得发送到其他的任那个分区任务里面去,那你还在当前任务当前分区做,那当然就不对了嘛,所以是不是必须得是。
07:16
窄依赖的这种形式啊,对吧,必须是one toone这样的操作,另外这里边还有一个要求就是啊,那有同学可能想到,那如果是窄依赖,就是比方说map filter flat map这样的算子,就一定可以把它们合并在一起吗?哎,这个还是没准的啊,所以这里边我们给大家提出这样的一个概念啊,啊就是在flink里边,我们采用了一种叫做任务链或者叫算子链,Operator chin这样的一个优化技术,它就是在特定情况下,我可以把满足条件的前后两个或者说几个任务直接合并成一个大的任务。然后这样就可以减少一些本地通信开销,大家想是不是这样,就假如说我只是这个本地做操作的话,那我是不是就不需要包装成两个任务,大家想如果包装成两个任务的话,我是不是任务之间做这个数据传输,还要有一个通信的过程啊,可能还要做这个序列化,反序列化,对吧?做一个包装,做一个传输,然后再呃这个获取到,然后再解析,那所以如果说我现在是把它变成一个任务链的话,那就是一个任务了。
08:23
一个任务的不同操作,是不是我直接用这个本地转发的话,那就相当于是一个本地过程调用啊,就相当于方法调用一样,对吧?诶,这个过程就节省了很多通信开销,那它的要求是什么呢?要求就必须得是并行度相同的弯to one操作,这里边所说的one twoone,大家可以认为是就是一个窄依赖的要求对吧?就要求必须是你像这个map filter Fla Fla map这些都可以作为这样的一个,呃呃,就是展依赖啊,One to one的这样的一个原则,然后另外还得要求它什么呢?对,要求并行度必须相同,大家可会看到,如果说我这里边啊,大家会想到我做完聚合之后,这个sum之后,然后再输,再输出这个过程其实是one to one对不对?
09:11
我当前是不是在在聚合完成之后,在当前分区又可以直接输出啊,但是大家会发现这里边我要做一个重分区,为什么呢?是不是并行度调整了呀。大家会想到,如果说我并行度二变成一,或者一变成二,你说你如果要是直接在本地分区去做这个输出的话,这个是不是就不对啊啊,这个二变成一的话,在在本地的话,这个是不是就有一些分区,它就直接就找不到了,输出不了了,对吧?那如果要是变大呢,一变成二的话,你如果直接在本地,那是不是就相当于我后面设了并行度是二,事实上只有一个分区在工作啊,哎,所以这个就没有意义了,对吧?所以他一定要有一个rebance这样一个轮询啊,啊,就让我们后面的这个并行任务要能工作起来,一定要有这样一个过程。
10:01
所以如果说你想要合并前后发生的两个任务的话,那就必须得是one to one操作还得是呃,并行度相同对吧?啊,所以这个可以给大家快速的把这个再再测试一下,给大家看一眼这个效果啊,我们可以把这个cancel掉。呃,然后我们在这个sum这里再看一眼。当前做一个提交啊,Host local host刚刚PORT7777啊,那大家看之前我们都是默认并行度的话,本来是一对吧,哎,那所以前面这个都是一,后面这个也是一,这里就不能合并了,那其实大家想到假如说我这里边给一个并行度是二呢啊,当然现在给一个并行度二也没什么用,为什么呢?对大家看给一个并行度是二的话,大家说首先啊,这里边它俩并度相同了,为什么不能合并?因为它是基于哈西扣的充分区对吧,对吧,你这即使并行度相同,它也不是one two one操作啊,所以这个不满足条件对吧?哎,不能合并,然后后面这个为什么病毒没没改过来呢?那因为这是在代码里边,我们是不是设死了呀,对吧?如果说这个在代码里边我们做一个调整的话,那其实是可以可以大家看到它可以合并的啊,那另外还有一个问题,就是大家回忆一下之前我们这个并行度是一的时候啊。
11:30
并行度是一的时候,是不是本身前边它俩是可以合并在一起的呀?诶为什么这里边它俩不能合并呢?它不本来不是forward的吗?这不是one one操作吗?并行路又相同,为什么不能合并呢?对,这里还得追加一个条件,就是如果它俩是不同的slo共享组的话,那大家想你还能合并吗?那是不是绝对不能合并啊,对吧?所以这这另外其实还应该有一个条件,就是必须得是同一个共享组对吧?Slot共享组啊,所以这里其实我们用前面的这个去做一个提交。
12:04
这个也可以看得很明显,Local host port 7777。大家看直接在这里边,是不是我前面就直接合并在一起了呀,早些时候我们没有设共享组的话,他俩就会直接合并在一起,那如果说这里边我给一个并行是二的话,给一个是二的话。就会怎么样,大家看前面这里边是不是就拆开了,为什么会拆开,因为我们说代码里边是不是那个socket必须得是并控式一啊,所以大家大家看它这个一到二并行度又不相同,是不是又变成了一个rebll又拆开了呀啊,所以这就是这个合并这个slot共享组的一个原则啊啊啊,那所以最后我们再来给大家简单的分析一下当前的这个流程的话,给大家看一下这个例子啊。当前这个例子。S Fla map,然后K之后聚合,最后think,大家看并行度呢,前面S是一,后边都是二,那到底是哪一步可以合并这个operator chain这样的一个任务链呢?
13:08
首先我们看S和fly map之间可以合并吗?并行度不相同,那是不是它的这个传输模式应该是re balance啊,哎,这个当然不能合并,然后Fla map和后面这个KBY之后的这个聚合能合并吗?并行度是相同了,但是它俩对是基于哈希code的重分区KBY了,对吧?所以它不是one to one操作,诶,所以这个传输模式是哈希的这种重分区的这个传输模式,这也不能合并,而后边是不是它俩的并行度也相同,而且又是one to one操作,那接下来是不是可以合并啊?所以大家看前面我们这个例子,就是最终是它俩合并在了一起。所以大家根据这个分析的话,是不是最终我们根据这个代码生成的任务一共是有合并任务链之后是不是有五个任务啊,拆开之后这个执行图里面有五个任务,那最后我们要几个共享,呃,这个几个slot才能把它这个运行起来呢?对,如果不设置共享组的话,那是不是就是相当于只要两个就可以运行起来啊,对吧?那就比方说这里边我只要有一个。
14:18
包含了这个SS任务,那剩下的是不是放在另外的一个lo里边执行就可以了,前后的不同的任务是不是可以共享slot呀?啊,这就是我们把前面提出的问题就彻底都解决掉了。那关于这个任务链呢,其实还得给大家稍微的说一点点扩展的内容就是。有同学可能想到了,就是默认情况下,对于弗link而言,这里是不是相当于他一定要合并这个任务链啊?大家想只要满足这个条件的是不是一定要合并,那假如说我不想合并的话怎么办呢?就比方说这里边我假如说这俩它本来就是这个满足并行度相同,而且又是one图one操作窄依赖,诶它默认是不是一定要合并,那我不想合并怎么办?诶有的同学可能想到了,那我设置共享组啊,前面设了共享组是不是相当于这里它就不能合并了,对吧,刚才大家看到了啊呃,但是这里边有一个问题,就是说设置共享组是不是相当于最后的结果是当前这个共享组里边,它一定就是是不光是当前的这个任务不能合并了,而且是不是他一定要单独的分配一个slot呀,多占一个slot资源啊,那我现在我的想法可能是slot共享的话,我还想让他可以共享,但是呢,哎,对,我不想让它,就是就是合并成一个任务,我想把它拆开。
15:41
哎,那这个怎么办呢。到哎,对,有同学可能就想到了,我可以是不是在中间给它做一个重分区操作啊,哎,这是一个方法对吧,可以直接做这样的,比方说中间我直接点re balance来,然后做一个这个重分区,对吧,我直接直接点这个沙uffle,直接做一个重分区,那是不是就把中间的这一个它的这个传输的过程,这个传输方式就相当于打乱了呀,哎,这是可以做到这样的一个操作的,那另外还有一个操作给大家说一下这个API里面的调用啊。
16:15
大家知道这个就是像这个,呃,假如你要做这个对应的重分区的话,其实非常简单啊,你比方说像像这儿这个做完sum之后,你要做重分区的话,我是不是直接可以点杀啊,对吧,大家看直接直接这样做就可以了啊,那后边是不是跟这个,呃,就是print这个过程,你就相当于做了一个杀后之后,然后才去传到那个printer这个操作里边,对吧?啊,那另外这里面要给大家说的是还有另外的一个操作叫做disable training。Disable指的就是说我不管你到底是怎么样的一个一个数据传输方式,因为大家知道我如果做这个重分区的话,相当于改变了它的那个数据传输方式,对吧?啊,相当于是做了一个这样的一个额外的追加的一个调整的,那我也可以所有的都不更改,直接disable。
17:06
意思就是说不管是什么条件,当前的这步操作,Sum操作不要参与到任务链的合并过程当中了,那它的这个特点其实就是什么意思呢?它不光是跟后边断开了,大家想是不是它跟前面如果能合并的话,是不是跟前边也要断开断开啊,对吧,因为它是disable嘛,当前这步操作就完全不不参与啊,那所以这是每一步操作我都可以单独指定,那假如说我有这个想法,就是说直接所有的任务啊,我都不要合并这个任务量,对吧?我看他那个合并就觉得奇怪,我就想每一步操作都是单独的一个任务,那怎么办呢?对,那也可以在环境里边直接调这个Env.disable operator training。对吧,这就是全局,接下来这个所有的任务都去禁用了一个任务链,对吧,就都不再去合并了。
18:00
这个大家也可以下来之后试一试啊,啊,当然有同学可能想,那你这个是直接把它禁用,那是前后都断开,假如说我就觉得我的这个需求就特别的诡异啊,特别奇怪,我要求就是前边不断后边断。后边要单独开对吧,那怎么办呢?哎,那我可以在它的后边。在它的后边。这步操作单独来一个,大家看有一个操啊,但当然这里边就是我这个已经是最后一步操作了,对吧,所以就没有对应的这样的一个操作了啊大家看,如果说我在前面这个sum的话,我可以直接做一个start newri对吧?Start new train是什么意思呢?开始一个新的任务链合并对吧?诶所以这这个意思,其实其实是啥意思啊,就是从这个,就是假如说我这里边有一个这个sum操作啊,是它前边是不是就断开了,然后从它后边开始,是不是我要开始一个新的这个任务啊,所以就是它跟后边该怎么合并,怎么合并,从从前边呢?啊,这就是两条不同的这个任务链的这个配置了,对吧,前边该怎么合并怎么合并,那后边我这里边中间是要断开的啊,所以还有这样的一个操作啊,叫做STEM。
19:18
这就是感兴趣的话,大家下来之后可以去测试一下,然后提交提交到这个集群上,看一下它的执行计划是不是符合我们的预期啊,啊,它的那个slot到底是需要占用多少个,然后它的任务到底是怎么划分的,是不是分开了啊,这都是大家可以下来之后做的测试的内容。
我来说两句