00:00
诶,我们已经知道flink里边检查点算法的一个基本的思想思路啊,大家发现它是引入了一个叫做barrier的特殊的数据结构来跟在这个某一个数据后边,大家就发现这就表示,呃,是不是每一个任务收到barrier的时候,就表示之前的那个数据做完了呀,哎,所以之前我们说的同一个时间点处理完同一个数据之后做快照就是靠这个来保证的,哎,那有同学说,那要这么说的话,这就很简单了呀,呃,在实际应用的时候,这就是流失数据嘛,来一个数据处理一个,一旦要遇到那个barrier的时候,我就开始保存状态,这就完事了吗?然后接下来大家想这个bar处理完之后怎么办,处理完之后就丢了吗?哎,船往下游船对不对,这就是我们流失处理的特点啊,数据处理完了之后,它有输出传到下游watermark,如果说诶这边我对应的这个时间更新了之后啊,它是不是要广播发送到下游啊,那接下来这个be大家想是不是我也要朝下游传,因为我做完了是不是我得让下游的任务也知道,做完前一个数据现在就该保存状态了。
01:10
哎,所以这个这个思想是非常容易想到的,那为什么这里面我们还要专门去讲它的算法呢?不就是一个一个做就完了吗。这里面关键就在于大家还记得之前我们讲的是分布式快照算法,对吧,分布式的一大特点是任务要并行对不对?哎,所以接接下来就有这个问题了,大家想一下,之前我们想的是一条流上的话,这个barrier没问题啊,就是一个一个往下游传就对了,比方说现在大家看这样的一个一个状态,我们现在这是。这是两条流对吧?啊,或者大家也可以认为这是本来是一条流,然后这个SS任务读取的时候卡夫卡有两个分区啊,那大家想我这里边并行度是二的话,是不是就相当于有两个并行的任务啊好,我们现在为了区分,我就把它叫成蓝流和黄流吧,对吧?蓝流黄流大家看都一样啊,里边的数据都是自然数一二三四五六七八九十啊,就开始一个一个往后来,每一个数都是跟在后边按照顺序来的啊,然后现在后后边做的这个操作还是一样,呃,是按照奇偶性做了一个重分区对吧?呃,然后奇数求和,偶数求和,有两个并行的sum任务,然后后面呢,是两个S任务。
02:23
大家看一下这中间是不是涉及到这个数据要做一个重分区传递啊,啊所以大家看一下现在这个状态是处理到什么情况了呢。接下来是当前的S任务,蓝流和黄流,这是不是相当于是处理完蓝三皇四啊,这状态就是偏一亮嘛,三四对吧,所以我当前处理的这个状态应该是蓝三皇四,当前这个园这边消费完了,然后后边大家看。这个S任务读完了之后,后边这个求和任务他就真的已经全处理完了吗?没准对吧,当前这个时刻,这个偶数求和它是求完啥了呢。
03:03
他是不是只有一个二求完了呀。只有一个二,我们看蓝二还在路上,那是不是只处理了一个黄二啊,大家看黄二已经没了,是不是这里已经输出了呀,输出了一个求和黄二对吧?啊,这是当前这个偶数求和的状态是二,奇数求和状态是五,然后你看它后面的这个输出是蓝二黄五,这是啥意思呢?这表示的是每一个这个颜色,它对应的是大家想是不是蓝色,蓝流里边数据来了之后,我这边可能会会有一个求和,然后输出啊,哎,那我输出的这个就用蓝色来表示,如果是黄流里边的数据来做了计算输出,我就用黄色来表示,所以这里面的蓝二表示。大家想奇数求和,这是不是应该是135对吧,那我这里是不是有两个一,两个三两个五这样过来的,那这个二其实应该是对前面是不是一加一啊,哎,所以既然它这是蓝二,所以这个顺序应该是先来了一个。
04:01
黄一,然后来了一个蓝一,是不是就加出了一个蓝二输出了,那前面应该还有一个一的输出,对不对?那一去哪了呢?黄一去哪了呢?对,大家想是不是他既然没写,是不是就已经THINK2是不是已经输出了呀?哎,所以你看这就是流式处理啊,所有的任务都都同时在做,做自己的事情完全并行不悖,该干什么干干什么。啊,然后后面还有一个黄五,黄五自然就是后边有。是不是再加了一个三啊,这个三当然就是黄三对吧啊,那蓝三蓝三是不是还没加上啊,大家看蓝三还在路上呢,对吧?啊,所以当前这个是状态是五。好,那现在问题就来了,如果现在我们这样的两条流啊,看起来像是两条流,然后后面又做了这个整合,这样一个计算,两或者是两个分区,这种情况下。Barrier又该怎么传递呢?好,接下来我们一步一步来看,首先首先我们要看就是当前这个checkpoint,就是barri到底是怎么产生的,你像之前那个watermark的话,我们知道里边有一部操作叫ign time sample and water mark,对吧,我们把那个时间戳提取出来,然后周期性的啊去,或者是非周期性的啊,生成一个water mark,就就是在那一步产生出water mark来,插入到数据流里面去的。
05:21
那现在大家看这个barrier是在什么地方产生的?都是在SS任务这里产生的。由谁来控制它的产生呢?啊,这就是之前我们说的job manager是不是除了做这个任务图的,呃,就生成任务图,做这个任务分发管理,是不是还要做这个检查点的协调啊,做一些中央调控的一些工作啊,所以这里边发发起这个当前检查点,启动这个检查点保存的操作是由job manager发出的。它这里边发出一个信息,这个信息里边带着一个当前的这个二,这是表示啥呢?注意这不是数据二啊,也不是时间戳啊,就是当前检查点的ID就是当前,这是二号checkpoint对吧?啊就是这样的一个因,因为大家想那个drop manager那边是不是我们这个拆point的也是随时都要保存啊,啊对吧,他那边保存的可能检查点多了,他一定得有一个ID来知道这是几号,这是几号对应那个关系才能才能不找错啊,所以照manager那边发出这个指令,这个信息它是同时发给所有的并行S任务的,所以我们看到蓝色的SS和黄色的S,这个SS1SS2同时收到了这个信息。
06:40
SS任务在接收这个信息,大家看是不是就相当于插在了那个三和蓝三蓝四之间和黄四黄五之间,接下来就要干什么,是不是就要插入这个barrier了啊,所以当前这个SS任务是在做这件事,那同时大家看后边后边这个萨任务有影响吗?
07:01
没有任何的影响,你看前面我们这个这里面不是这个even在在加这个状态是二嘛,啊,当然这个状态没变,那是因为这个数据还没过来对吧?啊,他这个处理有点慢,但是你看后面这个黄二就没了。黄二是不是就相当于已经输出到S1里边去了,然后接下来下边的这一个,呃,黄五这里边也稍微有点变化,那就是哎,大家看蓝二是不是已经输出进去了,对吧,当前的状态是他还没读进来,还没处理完啊啊,所以后面也是该怎么做怎么做啊,完全不歇着,接下来我们就看当前的SS任务,这里边是不是就产生了一个。BARRIER2号,Checkpoint barrier,那这里面问题,呃,问题就来了,就是south任务这里边既然是有了一个barrier,我们说是不是收到barrier的时候,这就表示要做这个checkpoint的保存啊,那做这个checkpoint的保存,它要保存的状态就是当前的偏移量。当然就是蓝三黄四直接把把这个东西做一个保存,是不是写到这个状态后端对应的那个远程存储空间去啦,然后做完之后呢,他还要给job manager一个返回确认信息,带着当前的那个ID,就是向job manager报告说报告我的二号checkpoint你的快照完成。
08:22
那照manager那边是不是就知道哦,你已经存到那儿了,那我只要知道你保存的那个地址,我这边是不是就做一个记录啊,有一个原数据记原原数据记录就是当前哦,这个SS任务的二号checkpoint,它的那个状态保存到了,诶某个空间某个地址。这个照搬值里面就知道了啊,这就是我们的一部分快照已经放在这儿了,然后接下来他不是处理完这个了吗?那接下来我们数据处理是处理完一个要往下游发送,那它处理完barrier怎么办呢?对,他是不是也要朝下游发送啊,那发送的原则是什么。
09:01
这就是我们说的数据传输的原则嘛,你是KBY呢,还是reb呢?轮询发送呢?还是广播发送的broadcast呢?还是只发送到就是第一个分区global呢?哦,大家想一下,当前我们应该是怎么发送。Walmark当时的发送方式是广播,下游全有,因为walmark是不是要通知下游所有的任务,我当前这个分区时间是不是已经到这个这个时间点了,下游所有人,呃,所有的那个分区都应该知道,对吧?诶,那当前这个barrier表示的含义是什么?是不是表示我当前这个分区就是蓝三之前的这个数据处理完了呀。对吧,我们说是处理完同一个数据之后的那个标记嘛,那所以他是不是也应该告诉下游的所有任务,我当前蓝流SS1蓝三已经处理完了,那是不是也要广播啊。啊,所以这其实解决了之前我们的一个问题啊,就是大家想前面我们讲这个check po保存的时候,你看你看这个保存的时候啊,我们当时是把这个,呃,就是按照处理完五之后那个状态做了一个保存,但大家想五这个数据是不是只会发给奇数求和的这个这个算子不会发给偶数求和啊,那你说我偶数求和这里面怎么知道它是五做完了呢?
10:25
我前面只做完了四,只加完了四,对吧,那我怎么知道四加完了之后,他就不会来五,呃,直接就是六了呢?大家想现在我把那个五之后的barrier直接广播下去,是不是他就知道了呀,对吧?他只要看到这个barrier是不是我就知道,哦,原来你前面五已经做完了,只不过他没来我这个分区,他来了别的分区。大家想想是不是可以起到这个作用,哎,所以接下来就把之前的这个问题也解决掉了。那呃,有了这个想法之后,我们接下来就很简单,Source任务保存完了状态向job manager发起一个通知,然后接下来把自己的barrier带着帮你的ID的barrier向下游广播出去传递。
11:09
那大家看在SS任务做这些保存和这个确认传递的这个工作的过程当中啊,如果我们这里边是同步的话,那大家知道他不能再继续消费动消费数据了,对吧?啊这个是要停下来的,但是后面的任务会停吗?大家看根本没停对不对,跟之前如果做对比的话,你看之前我们那个萨一本不是二吗,现在状态变四了。是不是之前我们没有的那个蓝二在路上的蓝二现在已经进来了,处理完了呀,哎,现在是不是输出了一个蓝四,然后同样下边之前这个不是黄五吗?现在是不是之前的那个蓝三原先在路上,现在是不是也进来了,对吧?哎,这个变化就是这个蓝三,蓝二和蓝三都已经叠加进来,这里边的偶数奇数的这个状态变成了四和八。输出了,这就是当前的这个状态。然后接下来一个一个barrier朝下游传递,下一步是不是就是sum这个操作要接收到barrier了?诶,那大家想一下,这就带来了另外一个问题,我们说一个任务接收到barrier的时候,是接收到barrier就表示我要开始做快照了,对吧?但现在这个some even接收的barrier是不是有两个啊,蓝色的这个蓝流是不是要发出一个蓝色的barrier,黄流是不是也会给他发一个,因为是广播嘛,也会发一个黄色的bar瑞啊。
12:32
那他是以接到谁的这个Barry为准,我就开始保保持那个保存这个当前的状态呢。问题又来了,对吧?像之前我们那个watermark也有这样的问题,Watermark的处理方式是取那个最小的作为当前状态,对吧?那现在这个barrier这又没有大和小的问题,那我们这里边要怎么做呢?两诶这个大家想到这还是要回归到我们当时讲的barrier的本质,Barrier的本质说的是。
13:03
检查点分界线对吧,就是在他之前的数据全要带带来的那个状态变化,全要包含在当前的这个保存的checkpoint里边,那所以我们就看当前,如果我这个,呃,就是当前这个checkpoint,我当前对应的那个数据是哪个数据处理完的,因为有两个SS任务,所以我是不是要对应两个数据啊,就是蓝三黄四对吧,我是不是后面的任务都应该处理完蓝三黄四之后的状态。这个就是同一个时间点,对不对啊,那接下来这个sum任务,如果我收到一个蓝色的barrier的话,它能保证的是什么。是不是蓝三处理结束了,那能保证黄色处理结束吗?不能啊,你看黄四还在路上呢,对不对?啊,所以大家想到那接下来,那我收到黄色的be就可以保证结束了吗?是不是我同样也得等那个蓝色的眼来了之后才行啊,所以接下来我们的操作其实是要。
14:02
等待。它的上游所有分区给他发送的barrier都到齐了之后,我才能去保存当前的状态,去做一个去做一个这个拆换的保存啊,所以这个过程我们有时候会把它叫做所谓的分界线对齐,就是下游的任务啊,它是会等待所有的输入分区的Barry都到达。到达了之后我再去保存,那这里就是又涉及到另外一个问题啊,就假如说我们在这儿,这个比方说蓝二先来了对吧,蓝二来了之后我是要等那个就是蓝色的Barry对吧?蓝色Barry来了啊,不是说那个数据栏二蓝色bar来了之后,我是要还等那个黄色的Barry来了之后对齐才能保存,那所以这个蓝色的bar来了之后,是不是我得等着啊。不做任何操作,那假如说我是要等那个皇四嘛,对吧,皇四完了之后的那个黄色Barry,假如在皇四之前。
15:02
因为大家想这个分区传输嘛,这个顺序没准是吧,两两个这个分区的这个数据,谁先谁后,这个是完全保证不了顺序的,假如说。大家看这个状态在这个。皇四来之前,这个蓝四也来了,大家看那个皇四首先来了,大家想这个皇四来了之后,是不是我要正常处理啊,我等的就是他对吧,所以你看这里边黄色来了之后,我直接叠加在这儿变成状态变成八,然后输出一个黄八。那假如在这个Barry来之前,蓝四就又来了呢?因为之前我们这个SS任务是不是没停着呀,哎,你这儿发出这个bery之后,后面继续读这个蓝四嘛,蓝四蓝五继续读,读完了之后就往下发,那你这儿这个蓝二在这儿等着,我这儿不等啊,继续把蓝四就发下来了,那是不是有可能蓝四就直接到了萨一里面要做计算啊,问题来了,蓝四要叠加进来吗?
16:00
蓝四要不要直接叠加在这个八上面算成12。这就又回到checkpoint要保存的状态的定义,我们是同时处理完同一份儿数据的状态,我们要的是蓝三黄四,那不是说蓝四黄四对吧?那如果蓝四的状态已经放在里边的话,是不是我们这个分界线就相当于失效了?它是在分界线之后,它应该属于下一个checkpoint,所以它是不是不能被直接处理啊,所以大家要注意这里边我们的原则是对于当前barrier已经到达的分区,像上面这个蓝色分区里边。接下来继续到达的数据,蓝四先要做一个缓存,不能处理,先缓存在这儿,而对于那个没有到达的那个分区,黄色的,刚才那个黄色来的时候是不是就马上就要叠加进去啊,哎,所以大家看这里边是有这样的一个问题啊,Barry对齐,然后呢,对于到已经到达的分区和没有到达的分区,接下来的数据是要分别处理,有不同的处理策略。
17:05
哦,那大家知道这个下面的这个odd是不是也是一样啊,Odd这里边我是先来了一个黄色的barrier,大家看这个其实在实际处理场景里面经常会出现,为什么呢。是不是有可能我这就是同一个slot啊。大家知道吧,这个我们之前不是说前后发生的这个任务可以共享slot吗?那大家想我如果在同一个slot的话,是不是当前这个蓝色的bar肯定先来啊,而且这个数据发的特别快对吧?那甚至有可能后面这个slo我有可能甚至还跨了task manager,那你想这个发送就特别慢对不对?而下面这个some out是不是有可能就是当前就是在同一个slo里边,是不是直接这个barrier就先收到了啊,他收到这个黄色的barrier之后,是不是还得同样还得等这个蓝色的barrier啊啊,所以继续在等,好,那所以大家看这个前面我的这个SS任务没停的啊,这个蓝色已经读到了五蓝,然后这个黄色都已经读到六了,对吧?啊,然后继续再往后面发送数据。
18:02
啊,所以接下来。我是需要在这个some even大家看到终于等到的,蓝色和黄色的这个barrier对齐,黄色barrier也来了,对吧,这个时候是不是真的可以保存下来状态了?所以我保存的状态是当前是八对吧,那同样下边这个三也是,如果我如果这个五先来了的话,是不是我得缓存啊,不能加对吧,就黄五来了的话不能加,那蓝蓝色的这个beer来了,是不是接下来我就可以保存了,所以我要保存的状态就是。蓝八黄八,大家看是不是最后就把这个存储在我们的状态后端对应的那个存储空间去啊,然后它保存完了之后,接下来是不是又可以通知job manager,好,我也已经做完这个状态保存了,然后接下来同样是不是把他的barrier继续向下游广播出去啊,哎,这里边大家看它这个跟think就是直传,不会再去没有那个数据传输了,跟THINKK2没有直接数据传输了,大家知道这里边并行不相同,而且又是直传,它其实就是一个任务对不对啊,所以这里边就直接给他就完了,呃,就是把这个当前的这个bar直接传递到后边啊,那么当前的这个状态就保存完了。
19:17
然后呢,剩下的这个数据呢,大家要注意是不是我先要把。当时缓存的那个数据按照顺序一个一个先做处理啊。因为当前那个缓存的数据先来的嘛,我们流式处理你总得有个先来后到呀,来一个处理一个嘛,所以我要先把这个蓝四缓存的蓝四叠在那上面,那是不是应该先得到一个12,然后接下来诶,那是不是就是正常处理了,后面来什么数数据继续叠加就完事了。所以后边你会看到首先来了一个12对吧,蓝色的12输出,然后后面是不是黄六也叠加进来变成了18呀,啊,这就继续继续往后走就行了,前面的萨斯任务也没停,这已经读到了蓝八黄九啊,那下面的这个三奥的这个基数求和,这也是在之前八的基础上先叠加了黄五输出了黄13。
20:08
又叠加了蓝五,是不是输出了蓝13蓝18呀,哎,这就是当前的这个处理的一个流程啊,这就是我们做这个检查点啊,分布式进行检查点保存的一个过程。最后还有一步,那就是呃,大家想那个最后think任务其实也一样啊,就是他收到barrier的时候也要去保存自己的状态,如果有的话,然后保存完了之后,是不是要向John manager那边汇报一下呀?啊,那所有的任务都完成之后,大家想John manager那边是不是可以收到所有任务保存完成的确认信息,那大家想这个时候job manager是不是就相当于可以知道我这里边。大家看这个差不用专门再去拼了是吧,是不是只要确认所有都完成,他只要记录下来对应的那个位置,这就算拼好了呀。就我们的合照就已经拼好了,对吧,好了啊,所以接下来这就他可以向所有的任务再去确认当前拆的正式完成,接下来如果发生故障的话,你就从这个状态去做一个恢复就可以了。
21:16
蓝三黄四,这是我们的SS读取出出来的任呃这个状态,然后如果要恢复的话,这里边的后面的萨姆的状态是不是蓝八黄八呀,大家来算一下,看看是不是这个蓝八黄八就是当前处理完蓝三黄四之后的状态。这个偶数求和的话,大家想蓝三黄四里边应该是哪哪些数要求和就是蓝二是不是就是蓝二,蓝二再加上黄二再加上黄四,偶数是不是应该是八没问题对吧?哎,那奇数呢,奇数是蓝一黄一对,然后蓝三黄三。
22:00
哎,然后后面五五都没有了,对吧,所以是不是刚好也是八呀。所以两个偶数奇数求和都是八,没有任何问题,对不对?哎,这就是当前我们最终保存的checkpoint的状态。
我来说两句