00:00
下来我们再讲另外一个概念,就是所谓的状态后端,哎,那状态后端到底是什么东西呢?因为我们说在flink里边有一整套这个关状态管理机制,那大家就想到了每来一条数据之后,我们那个任务里边如果有状态的话,那相当于是不是都会去读取状态,然后更新状态啊,那这个过程当中,这就涉及到了一个状态的读写操作,状态的访问,大家想对于这个状态访问而言,那是不是就是你这个状态到底放在哪里,到底怎么样去访问,这就对于我们的这个处理数据的延迟至关重要啊。哎,你像我们传统的这种事物型处理的话,数据或者说我们所依赖的那个状态,它是存在哪的?是不是都在那个关系数据库里啊,那现在如果说我们这个都直接放在关系数据库里的话,那大家想啊,这个数据量大的时候,你再去做连表查询,那肯定搞不定了,对吧?那如果这个直接在关心数据库里边做这件事情能搞定的话,我们也不会发展这样的大数据处理框架了嘛,所以。
01:06
在当前flink的架构设计里边,我们的每一个任务的状态,它其实都是在本地内存里边直接去维护的,正常情况下啊,都是在内存里面直接访问,这也是保证它低延迟,就是保证它快的一个重要的特点啊,那呃,大家就会想到,那既然它平常是都在这个内存里边保存着啊,呃,那我们平常使用的时候就直接当成一个本地变量拿出来用就可以了,但是呢,我们还得了解清楚它的这个状态类型到底是什么样啊,另外是不是前面我们说如果是K的state的话,是不是还要带着K相关的这个上下文信息,原数据要做一个保存啊,还得做这种管理对不对?另外还涉及到一个问题,就是说我们后续不是还要做这个容错保证吗?容错保证的话,当前这个状态是不是不仅仅是放在内存里边,还得定时间做快照,要落盘存在这个就持久化的存储空间里边。
02:07
去啊。那这就涉及到很多问题了,另外还有一个问题就是,呃,有可能我们在做这个数据传输的过程当中,对于那个状态啊,做完计算之后的那个结果,是不是还要做这个序列化,反序列化对吧?啊,或者你那个要做快照落盘的时候也得序列化嘛,啊,所以这里面就涉及到很多很多的问题。所有这些状态的存储、访问和维护,我们知道flink都是由底层的这个状态管理机制帮我们搞定的,那它具体来讲是谁?哪个组件来帮我们搞定这件事呢?这个组件是一个可插入的组件,在flink架构里边把它叫做状态后端state back end啊,那当然大家会发现这个状态后端,呃,它主要负责的事情呢,就是两件事儿了,一件就是本地的状态管理,对吧?啊,就当前这个状态内存里边怎么存,然后你这个上下文K是什么对吧?怎么样去分配,怎么样去读,怎么样去写,这都是由状态后端来来决定的。另外还有一件事就是。
03:12
做快照容错对吧?哎,怎么样去保存检查点写入到远程存储里面去,如果遇到遇到故障的时候,怎么样再从那个保存的checkpoint里边恢复出状态,重新执行,这都是状态后端做的事。啊,那大家其实就发现了这个状态后端,那要这么说的话,它到底是状态后端是task manager还是drop manager呢?对吧?这它到底是什么东西呢?我们之前不是说了运营架构的这个四大组件吗?那这个状态后端我们说是一个可插入组件,这到底又是个什么玩意儿呢?啊,所谓的可插入组件,其实就是它没有固定的配置,是一个可以调整的一个结构,对不对,就是逻辑上我们把它叫做状态后端,那具体使用的时候呢,它可以是不同的东西,所以接下来我们来给大家说一下,可以去你做一个选择,选择一个想要用的状态后端,那在flink里边呢,给我们提供了三种不同类型的状态后端。
04:14
啊,那大家看一下这个三种类型分别是什么样的状态后端啊,首先最简单的就是memory state back end,顾名思义memory state back end,那就是内存级的状态后端,它的特点就是说都在内存里面做,对吧?啊那之前我们说过所有的状态为了快,它不就是存在本地内存吗?诶,没错啊,就是所有的这个状态啊,它把这个监控状态,呃,但当然大家知道对于那个呃算子状态来讲,它是不是也是这个内存里边的一个,我们就当是那个本地变量来使用的嘛,对吧,只是多实现了一个那个做checkpoint那个接口而已,所以所有的状态它都作为当前内存里边的一个对象来进行管理啊,然后这里边说的是把它存储在task manager的GVM堆上,这个是在早期的版本里边,它是都放在堆内存里的,当然了后期的版本其实k set啊,它已经放在了堆外内存对吧,前面我们看到那个在。
05:14
集群配置内存的时候,它其实是分开去配的啊,但是不管怎么样,它都是在。内存里边对不对?那这个内存大家要注意它是在谁的内存呢?状态存在谁的内存上?是drop manager还是task manager状态存在?哎,有同学说存在job manager的内存上,那你如果存在job manager内存上的话,我们干活的人是task manager对吧?他干活的时候处理数据不是要访问这个状态吗?那是不是还要从drop manager那边去请求数据啊,我们这么设计这个合理吗?那这就显然导致我们这个要发很多网络请求对吧?然后数据还得再做序列化传输,这肯定就慢了嘛。所以我们现在的设计它是内存及状态后端,它是把状态都作为本地的这个内存里边的一个对象,直接存在了task manager的内存里面,干活的人直接保存。
06:11
那另外还有一件事是他还要做存盘那个检查点保存吗?那检查点存在哪里呢?那检查点是存在了job manager的内存里边,所以你看它为什么叫memory的内内存集呢?就是不管是本地状态还是检查点是不是全放在了内存里边啊啊那大家想它的特点是什么?特点就是快嘛,内存就是快对吧?啊那它的呃这个缺点,这是优点了啊,缺点也非常的明显,那就是不稳定对吧,你想你这个全放在内存的话,那是不是掉电要丢就丢了呀?啊特别是你这个拆炮,你给大家想这个你本来就是要做故障恢复的,结果你直接放在了内存里边,尽管是job manager的内存,那假如job manager挂了呢,是不是相当于我们现在就是所有的检查点都没了呀,哎,所以这个整体来讲啊,Memory state back end,它一般不会用在生产实际里边,因为这个容错性相对来讲就是不够稳定,对吧,啊,就是这个出现故障。
07:12
的问题,代价可能比较大,所以它一般是用来做测试啊,大家开发测试的时候用它比较多快嘛,那大家就想到了实际生产环境当中用什么呢?再看第二种状态后端,这个叫FS stay back end,顾名思义这个就应该是文件级的文件系统,对吧?File system啊,C back end状态后端,那它的特点就是。把这个checkpoint存盘,呃,做的这个检查点啊,它不要放在内存里了,它是直接存到了远程的持久化文件系统上,当然了,这里这个文件系统大家能想到一般我们会放在哪啊,对HTFS对吧,你去指定一个GFS的这个路径啊,我们直接放上去就可以了,那对于本地状态呢。我们干活的时候不是还得访问那个本地状态吗?这个怎么办呢?还是放在task manager自己的那个内存里面对吧?啊,所以这个就是它既保证了本地访问的速度,读写速度是不是还是很快啊,诶那你如果要是它怎么样,你就挂了,内存丢了,丢了没关系,我们当时的那个状态是不是检查点是保存在了远程的这个直文件系统上啊,那个可以完全可以给它恢复出来,然后我们继续做就可以了,对吧?啊,所以文件系统的状态后端整个来讲是应用比较多的,它的特点也非常明显,就是本地访问很快速,然后呢,呃,对于这种实际生产来讲,又有很好的容错,保证你发生故障之后很容易可以恢复出来。
08:42
那另外还有一种状态后端叫做rocks DB back,这个大家看到了一个新的概念,叫rocks DB rocks DB是什么呢?大家可能知道这是,呃,这应该是Facebook啊,基于谷歌的那个level DB,然后类似的研发出的一种no CQ的数据库,对吧,它底层的存储呢,也类似于就是相当于一个这个KV这样的一个存储啊,那么它的特点是本身啊,我们可以有这个首首先是先放在内存里边啊,就是内存里边我们可以去设置这样的一些kVA的保存,然后它如果数据量特别大的话,又可以直接落盘,直接把它放在硬盘里面。
09:24
那大家想这个是不是就特别,呃,能够能够容纳的这个数据量就特别特别大了呀,哎,所以它一般是应用在什么什么场景下呢。大家想,如果说我那个文件系统啊,File system这个back end,它在使用的过程当中有没有什么缺点呢?诶,它的缺点就是当前我的这个处理的这个状态都放在task manager的内存上,那假如说我这个内存越来越多,越来越大,因为有一些我们的那个状态的设计,确实是随着时间的进展会越堆越多的,对吧,那数据量越来越大之后,内存放不下了,那最后是不是就OM了呀,对吧,你分配的这个不够,那最后你就你就只能是再扩容,呃,就是出现故障挂掉之后再扩容重新启动,只能是做这种操作了,那假如说我们现在预算就不够,你扩不了容了,那怎么办呢?
10:20
还有另外一种方式,对,你可以换一个状态,后端用rocks DB rocks DB的话,大家就会想到,那就是平常我这里边内存做一个,这个内存就相当于变成一个缓存了,对吧?它大量的状态是怎么样呢?做序列化之后,直接就存在了本地的RODB里边啊,所以大家会想到它的代价是什么。代价是不是就是对读写速度,访问速度就会相对来讲慢一点,但是如果本身你那个内存啊,里面这个缓存里面有数据的话,它也可以很也可以很快,但是假如说要涉及到你访问硬盘的话,那就那就慢了,对吧?啊,所以整体来讲速度会慢一点,但是它的好处带来的是。
11:04
这是不是可以认为你在生产实际当中就根本不会OM了,对吧?你直接硬盘嘛,这个管够啊,一般情况下你肯定不会超过自己的这个,呃,就是硬盘存储的空间的啊,所以这个大家在实际生产项目当中就看自己的选择了,有一些项目里边,如果说你要保存的这个状态并不不是很多,也不会随着时间的推移这个增长特别快啊,那而且对于这个实时性的要求比较高,对吧,我们的资源也足够,那是不是你用file system就很好啊,那如果有些场景下,你的这个状态呢,是特别特别大,然后它的那个数据呢,又会随着时间的推移不停的增长,不停的增长,那大家想是不是你用RODB会好一点啊啊对吧,你就牺牲一点它的这个处理的性能啊,稍微的慢一点,但是你可以保证稳定性,这就是看大家具体的选择了,好,这是理论上怎么样去选择状态后端我们还是给大家在代码里边啊,就是这个,首先大家。
12:04
应该能想到我们在配置文件里边,是不是可以配置整个集群的对应的那个状态后端啊,啊,这个我们在回过头来看一眼啊,还是回到弗link下边,大家还记得com下边有这个弗link康压模对吧?啊我们用这个。VI啊,看一眼,呃,首先前面是我们讲到的common通用的这些配置项,这个我们就不说了,后边呢,其实前面我们过过。大概的看过一次,后边讲到这个for tolerance and checkpointing的时候,这里面是不是有有一个配置项,就叫做state.back end呀,然后大家看这里边你可以写的这个字段有哪几种呢?哎,它不叫memory,它叫。Job manager,为什么他写job manager呢?是不是就是因为我们当前的那个拆pod往哪存啊。呃,对啊,因为大家想它本地的这个状态的话,是不是默认都是以内存为主啊,内存快嘛,对吧,都是以这个test manager内存为主,实在放不下的话,RODB是你可以这个呃往rock DB里面写啊,那所以这里边最关键的就是你那个checkpoint往哪写,所以大家看它是drop manager,然后file system,还有这个rock DB对吧?啊,然后在这里边默认情况下,其实应该是什么呢。
13:22
默认file system对吧?啊,然后你看这个下边这里是不是就有这个对应的checkpoint的对应路径啊,如果我们设置这个file system这样的一个状态后端的话,一般我们都会给一个HTFS的一个路径啊,直接保存在远程的HTVHDFS上就可以了哦,然后下边这里边呢,还有一个选项叫做stay back and.incremental这个是什么意思呢?对,就是增量化的进行checkpoint保存的这个选项默认是false,因为有一些场景下面它不支持这样的增量化保存啊,比方说你像这个文件系统,大家想你这个增量化就不太好弄对吧?啊,那有一些它就支持,比方说RODB,它就支持这样的增量化的存储,所以在有一些场景下可以把这个打开。
14:09
然后另外还有一个选项叫job manager execution fill over strategy region啊,这个选项它其实是1.9版本之后新引入的一个配置项啊,它主要说的是什么呢?呃,就是假如说你在执行这个,呃,就是出现这个故障要重启的时候,它有一个策略是用这个region的方式重启,啥意思呢?就是区域化对吧?就之前我们的那个重启策略是一旦我们现在不是有那个并行的任务吗?然后它分别执行在不同的slot上,可能分别属于不同task manager,对吧?那之前是只要有一个任务挂掉,接下来我重启的时候就是所有的全部停掉,然后所有的重新启动。对吧,所有的再重新加载自己的那个状态,所以大家想这个是不是效率有点低啊,所以现在的话就是我划分片区相当于对吧,我把他们那个之间的关系先解析出来,假如说我当前这个挂了,它只影响这几个任务的话,那是不是我别的任务都不用停啊,该怎么做还还是继续做,我只把这些重启一下,然后恢复状态,是不是继续做就完事了啊?所以这就是。
15:20
当前用到的一个这个区域重启的一个策略啊,啊,这就是配置文件里边的一些集群的默认配置,那大家可能想到,如果说我不想在这个集群里边用那个默认的配置,我想针对当前的这个代码啊,当前的这个drop,每一个drop就单独的配置一下它的这个状态后端,那怎么去配呢?啊,这个也很简单,代码里面可以设对吧?啊这这里面给大家还是简单的写一下这个测试啊,这个因为跟状态相关,我们就放在这个state下边。State test。四当前是状态后端,State back end,呃,这个不光是stay back end,后边我们把所有的状态相关,比方说这个checkpoint啊,后面讲到checkpoint相关的机制我们都放在这儿吧,对吧?啊,这个我就叫做容错吧,Fault tolerance。
16:17
对吧,呃,容错机制相关的一些配置啊,都放在这里。呃,整个的这个,呃,这个整体还是一个相同的架构啊,我先把这个架子搭起来,Throws exception啊,然后接下来我们把前面的这些东西都拿过来,对吧,这个我就直接用上面这个好了。环境配置好对吧,然后数据流放过来。呃,下边还应该有这个en nv execute把它执行起来啊,主体来讲的话就都是这些啊,这就是我们随便写了一个流流处理的处理程序啊,然后主要要做的配置呢,是不是应该都在en nv里边去做配置啊,对吧?大家想想都是这样的啊,所以这里边我们讲一下这个状态后端配置也非常简单,Env data and set,前面我们不是讲过这个set吗?这里边有一个set state back end对吧?啊,那么这个set end里边必须要传的就是一个state back end,那back end具体来讲大家看它是一个interface,那是不是就得你自己去实现这样一个接口啊啊,我们这里面不用自己实现link给我们底层的实现已经有这么几种,对吧?我直接去可以去new一个memory step back end对吧?
17:43
啊,当然了,就是memory back end里边你可以不传参数,大家看是不是也可以传参啊,对吧,你可以传这个布尔类型,这个指的是是否开启异步做快照的那个机制,对吧?什么叫异步快照?就是我当前做这个快照的时候,大家会想到我做快照的时候,理论上来讲,我是不是后面就不能再去,呃,对执行对应的任务啊,啊,这就相当于是一个同步快照的过程,那有时候我想你这个耽误我执行任务了,这是不是相当于我的这个时效性就低了呀,相当于我暂停了嘛,那我是不是可以同时做快照,一边做快照一边后面继续执行任务啊,那这种方式就是异步对吧,异步做快照。
18:26
啊,那当然后面还有一些就是比方说我给一个max state size对吧,当前的这个大小,呃,另外就是还可以给这两个参数对吧,还可以给多个参数,这个大家大概知道就行了啊,这个我就不详细写了,呃,然后另外还有就是可以去拗一个是不是FS stay back and呀,但这里边你需要去给一个对应的那个checkpoint的uri对吧?啊,一般我们都是HTFS的路径啊啊,这个我也不详细写了,那最后还有一个rock DB对吧?诶,但是大家看到这里面好像没有rock DB stand。
19:01
因为RODB是啊对,是另外的一个组件,我们是不是需要再引入相关的依赖啊,啊,所以这一部分我们可以把这个rock DB,这个也是flink官方提供的啊,这个就叫做flink state back end rock DB,然后后面2.12这是啊,对那个scan的版本对吧,下面一点十点一跟的是当前的flink的版本,我把这个依赖直接引入。Po文件里边追加dependency。呃,然后有了这个依赖之后,接下来我们在这儿。就可以看到。先把这个刷新一下啊。我们把这个呃依赖引入,接下来stay back,哎,现在有了对吧。接下来大家看一下。Rock接下来就是rocks DB对吧?诶大家有了对吧,Rocks DB state back就放在这里就可以了,当然里边呢,是不是也必须要有一个存储,呃,当前这个checkpoint的一个路径啊,对吧?啊RODB的话,大家就是看到它本身啊,就是当前的这个,呃。
20:15
设置好的,呃,我们这个RODB之后,当前本地的状态就都会往这个RODB里面去放,那至于说这个内存做缓存的话,里边你就是绕CB的一些配置了,对吧?然后这里边可以传参,传的参数呢,这就是当前它的这个checkpoint存到远远程的存储空间,那大家看这个远程的存储空间是不是还是一个uri,一般情况我们还是可以指定一个he FS的这样的一个file system呀,所以这就相当于把之前我们讲的那些东西都糅合在一起了,对吧?啊,所有的这个file system和这个rock cb的处理啊,就放在一起了,后面还可以有其他的参数,因为之前我们说过它是不是还可以有一个。允许开启增量化执行这个拆point呀,所以大家看这里边还可以传一个布尔类型的值,如果你把这个设成处的话,就是增量化的去做这个拆po对吧?这就是当前我们对于这个状态后端的代码里边的配置啊,当然这里边大家看到这个这个方法是已经DEP depreated,对吧?它现在推荐的使用方法是直接set,呃,就是我们直接用那个set step back end,然后里边传一个这个stay back end就完事了,但是现在呢,我们基于这个就是外边这个stream execution environment啊,它并没有直接去set step back end这样的一个方法,所以我们现在能调的还是env,直接调这个方法去传就完事了啊,以后的话可能它会直接变成这个类里边的一个静态方法,对吧,你就不要先去创建出实例,然后再去做这个事情,这就关于状态后端的配置。
我来说两句