00:00
现在已经知道整个flink啊,到底该怎么安装集群,到底是怎么样去做一些配置,这些都明白了,那接下来就是啊,最后一步就应该要启动集群了,那启动集群的命令非常简单,直接执行并下边的。Start cluster.sh啊,所以接下来我给大家在这儿也是直接启动一下。呃,我现在还是在这个flink一点十点一根目录下啊,所以我是先到并下边来,然后start cluster sh,对吧,我现在的这个集群大家知道是不是就只有一个job manager,一个task manager啊,对吧,那就这样的一个简单的状态啊,大家看这个启动其实非常明显,就是它起的是什么呢。起了一个sta session demon demon对吧,哎,这个这个东西大家会想到这主要是用来干什么呀。这个东西是不是主要就是用来这是我们当前这个集群模式集集群的一个session,一个绘画对吧。
01:04
这主要就是用来接下来我提交job,是不是就通过这个入口去提交啊,对吧,就相当于我这里边提供了这样一个接入点啊,然后另外还有一个是task demon,这就相当于是我们具体的那个。就是执行者了,对吧,Task manager的对应的那个东西了啊,所以这两个一个是对应job manager的那个,一个是对应task manager那个东西啊好,那接下来我们启动之后,你也可以GPS看一下当前我们正在运行的这些Java进程,呃,这里边GPM进程啊,这里边有一个task manager runner。啊,这就是启动task manager的这样一个东西,另外还有一个就是sta session对吧,这个这样一个接入点啊,集群的接入点,呃,那当前只要这么一行命令启动起来之后,我们怎么样去看到当前的集群的状态呢?啊,其实这个非常简单,大家看在。
02:01
Com目录下啊,我们找到当时的这个配置文件flink com yamo,之前我们讲的是前面这个common通用下边的配置,我们说一般也就是配置几个对吧,那后边还有什么呢?我们大概看一下啊,后边还有ha啊,高可用啊相关的一些配置对吧?如果说我们想要用到的话,你可以把这里面的一些放开大家看,有这个对应的那个存储地址HEFS对吧?呃,然后另外还有下面这个是。For tolerance,这是容错对吧,容错和checkpointing啊,做这个检查点操作啊,那当然这里面就得配置你的这个检查点到底怎么怎么做呢?这里边有一个概念叫state back end,这个概念叫状态后端,这个我们会放在后边讲到状态机制的时候给大家做详细讲解啊,啊就是我可以配置到底把这个状态存在哪做拆point存在那个远程对吧?远程的存储空间到底存在哪呢?这些都可以去做配置,后边我们给大家看这个,诶大家看在这儿,Rest and web front end。
03:11
这是。Rest接口和网页的前端对吧,一个网页UI,那这一部分是谁提供的呢?啊,这也是我们当前这个flink有一个组件提供的啊,它默认的端口是8081啊,所以当前其实我就应该在当前的这个。Job manager对应的这个节点上去访问它的8081端口对应的就能看到当前集群的效果了。那我们先把后面的这些配置也大概过了,后面就是一些高级选项了啊,这也没什么其他需要说的,所以接下来我们还是在页面上来看一下local host8081啊,我刚才那个冒号写错了啊,Local host。
04:01
8081。大家看这就是我们当前flink启动之后,Web前端页面啊,看到的这样一个监控的页面,这样一个flink的dashboard,那然后首先这里是一个overview啊,一个一个概览,一个整整体的一个查看,那首先这里边大家看到的是一个available task slots。Task,之前我们说过,这相当于是。当前能够去运行多线程程序的一个。一个划分开的资源对不对,有点像这样一个东西啊,一个槽位,所以这里面我们关心的是什么呢?就是有多少可用的这样的task槽位插槽。我们现在是只有一个,为什么一个呢?那我们当时那个集群是不是配置配的就只有一个啊,一个task manager,然后每一个task manager里边有一个槽位,那是不是总共就是一个啊,对吧?所以你看这里边就是总共的slot是一个,然后可用的现在也是一个,然后task manager是一对吧,一个task manager,那对应的这里面有这个当前正在运行的drop,还有这个已经结束的啊,取消的失败的各有多少个,现在都是零没有。
05:19
下面是这个正在运行的列表啊,具体的一些参数就都有了啊,当然现在我们是什么东西都没有啊啊,那接下来就是job了,Job这里边就是具体的正在运行的和已经结束的,其实就是overview里边,下面的这两个列表在在的详细的展开,那后边呢,这里边有task manager相关的一些信息。这里大家可以看一下我当前的task manager是什么样子呢?一个slot对吧,然后当前可用的一个,然后另外诶当前大家看就可以看到我当前的这个机器大概的一个配置是什么样子啊啊就是CPU是四个核心,然后内存7.7GB啊,那就是我当天是一个8G内存对吧?然后接下来是GVM的堆大小,大家看这里边配出来是多少。
06:07
512兆对吧,然后另外还有是flink managed memory,这个我们当时说这个主要是存放什么呢?是不是就是处理过程当中我们用到的那些状态是不是都是放在这里的呀?这是一个对外内存对吧?对外内存里面的一部分,所以大家看它跟它是不是分开的呀。这俩并不包含在里边,对吧?这个堆应内存设了512兆,这个managed memory也设了512兆啊,那当然当时我们还记得那个总共设的是多少来着,对啊,1728对吧,大概是这样一个数,所以剩下还有一些,那就是应该也是对外内存对吧?那就是除了这个flink manager的啊,管理起来的这些之外,还有一些就是原始的用户可以直接管理的一些啊,这个对外内存,另外还有一些就是本身GVM进程,它自己占用的一些对外内存,都是包含在我们分配的那个总内存里的。
07:10
啊,所以这里边就是,呃,如果我们给那个默认的1728的话啊,这里边得到的其实就是这样一个512 512的这样一个分配,对吧?所以这是当前做的这样一个操作,然后接下来还有就是照manager啊,其实你看这里面点进来之后是不是还可以看到它的启动日志啊,大家看这里边是不是有logs啊,就如果说启动失败的话,你也可以在这里边看它的这个启动信息对吧?啊,这都是没问题的啊啊,那同样job manager这边你也可以看到当前照manager对应的这些信息对吧?Logo host1024兆,呃,6123对吧?Test manager这个1728兆啊这些都是可以看到的,然后这里边还有一个这个它启动对应的这个日志,另外s TD out这是什么?对标准输出,那大家想到我们不是有那个控制台打印吗?是不是就应该打印在标准输出啊,哎,所以到时候我们可以到这儿来查看结果啊好,说了这么多,这都是一些可以查看,可以去监控的一些信息,那我们当前如果要现在还没有job呀,我要提交job,怎么提交呢?
08:17
大家看,当然就是最后一个选项了,Submit new job对吧?然后在这里我是不是可以直接at new啊,那关键是现在我的job在哪呢?对,Idea里边我们是不是得需打包一个抓抓包出来,然后上传就可以在这里执行啊,所以接下来大家看一下这个完整的流程啊,那比方说我现在就把当前这个任务要做一个打包处理,对吧?呃,然后打包处理执行的时候,可能还会涉及到这个并行度的问题,那大家可能会想到这里边我是直接在代码里面就把它设死了,设成一了,对吧?啊,这个我不要这么这么死板啊,我先把它放开。我这里边不涉及,呃,然后接下来大家可能会想到,呃,那我提交这样一个drop上去的话,它的并行度应该是几呢?
09:08
诶,大家会想到这里边我执行开发环境啊,就是我刚当前这个开发环境里边运行的时候,默认并行度是四对吧,那我如果要到了集群环境里边默认也是四吗?是我的开发环境的这个吗?对,大家注意啊,到了集群环境里边默认的并行度是。我们集群配置文件里边的那个默认并行度大家还记得吧,拍出来怎么点default那个我们配的是一对,所以这里边的这个是有一个区别的啊,使用的是这样的一个区分,然后这里还要给大家稍微多说一句的是在这个flink里边啊,这个并行度其实有很多方非常灵活的配置方式,大家看这里我是在这个就是因为啊这个环境全局直接给当前的程序配了一个并行,都是一,那其实我还可以干什么呢?
10:02
我这里边有每一步是不是不同的操作啊,前面这是读取数据对吧,后边这是flat map做了一个转换,然后后边KBY这个分组,按照哈希code重分区之后是不是做了一个sum操作啊,做了一个计算,然后最后还做了一个print,做了一个打印输出。这里面其实每一步具体的操作都可以设置set parallelism。就是我这里可以设一个一对吧,上面可以设一个二,大家看这都是完全没问题的。可以去这样去定义,哎,那有些同学可能会想到这不对啊,你这个并行图,这不是相当于多线程运行吗?难道说我这一个代码,它它这个是还要拆开不同的线程去运行吗?还真的是。大家注意啊,真正我们分布式执行的过程当中,我是整个完整的这个代码,所有的这些操作都放到一个分区上去,要执行吗?
11:03
显然不是,之前Spark也不是对吧,Spark是划分stage对不对,而我们现在不划分stage了,现在只要有什么就可以了,是不是只要有每一步操作就可以了呀,对吧,我只要知道这一步,诶你做了一个Fla map,然后下一步做了一个sum,再下一步做了一个呃,Print,那大家就会想到这每一步操作是不是就是可以用一个线程来跑的呀,然后我这个线这个任务提起来之后,我是不是就等待数据进来就可以了,这里只要你前面source,这里边socket文本流读进来数据它是不是?呃,处理完数据之后就把它传递给下一步的这个任务,这个任务只要接收数据是不是它就是这个线程跑就运行啊,运行得到的结果是不是它就传递给再下一步的这个数据,呃,这这个任务对吧,所以数据其实就是在不同的任务之间。做传输的先后发生的任务之间做传输,每一个任务是不是相当于都可以占用一个线程来执行啊,那所以是不是每一个都可以设置它的并行度。
12:07
我都可以并行啊,都可以多线程啊,所以大家看这个就是使用起来非常灵活,特别灵活啊,这就是我们这一部分代码里面并行度的设置啊,然后另外呢,我们现在既然是要打包了,那那怎么打包呢?在这个ma里边啊,方便的就是这里边是不是有一些生命周期啊,哎,我这里边可以先先做一个编译啊,当然之前如果说呃,我们直接已经运行过的话,应该是已经有编译出来,有有class文件的对吧?啊,那这里边我们最好是先做一个编译,生成对应的class文件,然后点。Package对吧,大家看这里边target下边这里已经有这个了,对吧,已经已经有这些对应的这个class文件了啊,然后接下来我点一下这个package,这就是做一个打包操作,然后我们生成的那个它默认有一个呃包名啊,就是叫做。
13:00
Flink tutorial,大家看1.0step short点这对吧,哎,那所以接下来是不是我把这个提交上就完成了,好所以接下来我们在这儿啊,直接ADD new去提交一下,这个我要去找一下对应的目录啊。我们当前在这个big data下边,Link tutor在他的下边,就是它对吧?好,我把它提交上来。哦,那大家可能会想,这个提交上来之后,这这就运行起来了吗。没东西啊。我们缺了什么东西了?啊,大家注意这个提交这个A只是把我们当前的jar包上传到了当前的这个flink集群而已,对吧?大家想一下,我要是真正去提交的话,当时我们运行是不是还有那个参数的呀,你执行还要有参数的呀,另外我们当前这个项目里边是不是有两个程序两这里有两个类啊,那谁是我们的入口类呢?那你这里都没指定对吧?啥都没指定,所以这里边其实我要点一下,诶大家看接下来是不是有各种各样的配置项。
14:05
Entry class入口类啊,那比方说现在我就用这个stream啊,流式处理的这个我copy这个reference啊,直接把它copy过来,然后下边大家看program arguments很熟悉是不是参数啊,对吧?Host local host,然后杠杠PORT7777,然后这里边有一个选项是parallelism,这是并行度,大家看又来了一个这个设置并行度的地方,这个并行度是我们在提交作业,提交job的时候可以配置的一个并行度,对吧。哎,那这里边我问一下大家,假如说我这里边并行都给一个三。那我们当前执行这个任务的时候,到底这个应该是什么样子呢?是以谁为准呢?以代码里边的这个并行度为准,还是以这里的为准,还是以另外还有一个位置。集群的那个默认并行度对吧,那以谁为准呢?哦,这个大家要注意啊,现在我们配置的这些,它的优先级是以对肯定是代码这里边是写死的对吧?以这里这是我们最最优先的一个一个选取啊,如果每一个任务他自己后边单独设置了这个并行度的话,就以这个为准。
15:23
如果这里面没配,比方说Fla map这里没配对吧,那以谁为准呢?因为每个任务可以不一样,对吧,他以我们是不是前面全局还可以给一个环境里边配这个并行度啊,也是代码里边对吧?哎,就是以这个为准,那这里面又没给呢。这里又没给的话,对以提交job的时候,这里边给的这个并行度为准,那如果这里边我们又不给呢,这个parallelism如果又不给呢,那就以对集群配置文件里边那个默认明晰度为主,所以大家看这就是一级一级推过来的对吧?啊,每一步都是这样推过来的,我先给一个这个三大家看一下啊,因为这里可以收plan对吧?来看一下啊,我直接收plan看一眼。
16:10
大家看这里面得到的这个结果是什么呀?这个结果大家能看得懂吗?这是一个执行计划对吧?这里边的每一个框是不是就代表了我们的一步操作,一个任务啊,类似于就是一个任务,这里跟我们那个代码里边写的那个每一步操作非常的类似,几乎就是一对应的,对吧?啊,那首先大家看一下前面这个source socket stream,这是不是就是从那个so文本流读取数据源啊,啊读取数据啊,大家看它的并行度是一。这是为什么呢?我们好像这里面没有任何没没有设呀,对吧,那我们当时说如果这不设的话,就用全局的,全局也没设的话,就用我们提交时候的,那我们提交的不是三吗。这给大家解释一下啊,这是因为socket文本流本身这个算子决定的这个任务读取当前的socket文本流,大家想它可以多线程同时去读吗?这肯定不能,对吧,你你当前那个socket就一个端口嘛,对吧,然后就发一个数,那就是只要有有一个读了这个数就没了,对吧?所以你当然它不可能是多个这个线程同时连接到这个端口,就同时消费这个数据,所以不存在并行的这种场景,所以这里边呢,默认这个算子默认它的并行度,你设不设都是一。
17:32
啊,这是一种特殊的情况,好,那这个我们先不看了,然后后面这个flat map,它接下来是不是读进来之后就是Fla map呀,Map这步操作为什么它的大家看这里边paraism并行度三为什么是三呢。对,代码没设,这里边任何地方都没设对吧,那用的是不是就是哎提交时候给的那个并进度,那是三对吧?好,然后我们接下来看,大家看这里边后面有一个哈希,这里边大家看是不是有一个哈希啊,然后后面这步操作叫kid aggregation。
18:07
就是根据建分组之后的一个聚合操作,这指的是什么?是不是就是KBY之后做的这个sum操作啊,所以这个大家要注意一下啊,KBY并不是一部真正做计算的操作,大家发现了吧。KY其实只代表了一个什么呀?是不是类似于我们之前的那个杀否的那个过程啊,所以KY它其实只是指定了我当前数据要做朝下游传输的时候,分配到哪一个下游任务,我现在是按照它的K的哈希值做了一个传输,对吧。然后传递过来之后,是不是做聚合啊,做了一个sum,那这个sum大家看并行度是二,原因就是对代码里边直接定死了,并行度是二。那同样最后一步是think对吧,那是不是这里边think并行度一也是代码里边已经设好的呀,那所以大家看这就是这个原理,原理就是这样来做的。
19:05
啊,所以这个过程其实还是一目了然啊,啊,那这里面就有一个问题,就是假如说我这里面给一呢,或者说如果我不给呢,不给的话应该是什么,不给是不是默认就是。哎,那大家看一下这个手烂啊,这大家能理解吗?这个大家看前边的这个操作,大家看它是把socket读取数据和后面Fla map直接合并在一起了,哎,这里面有一个很诡异的操作啊,为什么这俩可以合在一起呢?大家看有一个小箭头是吧?好像两个任务合在一起了,哎,这是相当于就是我们说的啊,两个简单的操作合成一个的操作,为什么可以这么做呢?这个我们放在后面运行架构给大家做解释,首先我们现在看到就是它俩合在一起了,然后它们的并行度是都是一,这个可以理解吗?因为代码里边诶,首先这个socket文本流是必须就是一对吧,这个我们知道,然后这个Fla map呢没设置,然后提交这个任务的时候是不是也没设置啊啊,那是不是就相当于是用默认的集群里边的配置对吧?啊就用那个集群里边默认的那个,呃,并且就是一啊做做一个提取,那就是一了,那后面这两个都一样,都是代码里边指定的对吧,指定是几就是集。
20:24
啊,所以这其实就是当前我们对于这个呃,运行的一个过程啊,所以接下来我们就把这个来直接做一个submit,大家看啊,下面还有一个选项,这个叫c point pass,这个主要是说的保存点,保存点呢,呃,这个我们可以到后面再给大家说啊,它主要是用来类似于像检查点一样,检查点大家知道是存盘之后。就是发生故障的时候可以恢复对吧?保存点也类似,只不过呢,Checkpoint是一个自动存盘,而c point是一个啊手动存盘对吧,就是我想要存盘的时候单独给它存一份啊啊所以就是假如说我当前想要从之前我手动存盘的某个位置去启动的话,你这样给一个c point就可以了,我们现在没有对吧,没有就不管了啊,我直接sum把它这个提交上去。
21:15
现在就可以,大家看已经提交在这儿了,对吧,当前的这个状态是create,诶大家看到下面这个一共有多少个任务呢?我们最终执行的任务一共有多少个呢。大家看一共有四个对不对,为什么是四个任务呢?对,大家注意啊,是不是并行度,这里边代表的其实最终执行的任务就应该是对并行度是几,是不是就有几个并行的任务啊,所以当前这步操作是不是就应该有两个任务,哎,所以大家看总共应该就是四个任务对吧?你算上这个并行的话,总共就是四个任务,然后这里面就就会涉及到一个问题,大家看这里边一直在转圈圈对吧。为什么会转圈圈呢?这这里提交成功了吗?
22:03
当年这个drop drop提交成功了吗?大家可能会觉得这里面有所怀疑,有问题对吧?那我们可以直接在这儿给他发一个数据试试啊,我们看看这个这个里边有没有数据,对吧。哈,Flink,我们发一条数据。大家看这里面他能接收到数据吗。还在转圈圈对吧,一直都没有数据接收到,那这里面为什么呢?为什么会出现这种情况呢。啊,所以大家其实会发现当前我们这个提交是失败了的,对吧。没有提交成功,那如果说我们看到回到最初的这个overview这里面来来的话,大家其实会发现我当前的可用的资料还有吗?发生了一点变化,这里边是不是没了,说明这个lo是不是已经分配给他了,但是为什么他还没跑起来呢?
23:00
诶对,大家其实想到了,是不是可能是slot不够啊,因为当时我们说这里边的这个执行的话,一共有四个任务,我们当时是不是说每一个任务每一步操作都可以是放在不同的线程上执行啊,那大家想你现在要多线程执行的话,四个任务只有一个slot,那肯定跑不起来,对不对,所以这个转圈圈它其实在在干什么呀,在等待分配更多的资源对不对啊,他在等待你给我更多的smart,要不然我跑不起来哦,所以这就是刚刚才我们这个这个任务的这个问题啊,其实这里边是没有真正提交成功的。啊,那算了,那我们我们把它停掉吧,但是如果等足够多的时间,它也会超时报错对吧,那我们不要等了,那怎么取消它呢?大家看上面右上角有一个大大的cancel draw对吧?直接点这个OK,这样的话我就可以把它直接取消掉。啊,那所以当我们把这个,诶大家看变成这个黄色的了,对吧,取消掉之后,Running这边就没有了,Completed这里边是不是就有一个啊,然后我们再看overview,大家看资源是不是释放了。
24:12
对吧,又变成一了,然后这个running的没有了,这就是这个提交job的一个过程啊。
我来说两句