00:00
好,所以现在我们的主要任务都集中在了实现这个自定义的处理函数,对吧。呃,那个叫叫什么名来着,Marketing marketing by China。他需要去实现一个,哎,这个大家注意是K的process function吗?是K外国,但是现在它的处理是已经开窗之后的处理了,所以它其实不是基于kid stream,而是基于window stream,对,所以这里边要实现的这个process function就应该是一个。是一个process window function,对不对,Window function。
01:00
好,呃,这个里边大家记得又是四个四个K对吧?呃,一个in,一个out,一个K,还有一个这个window w,所以这里边我们的in in是什么?是不是这里边定义好的这样的一个数据类型啊啊,这里边应该是一个,哎哟,这个稍微有点麻烦,是一个二元组,里边还包了一个二元组,对吧,所以是外边这是一个long类型里边的那两个,一个channel,一个behavior,都是string,对。那么out类型相对简单一点,是我们想好的那个样例类,对吧?呃,Marketing view count,另外K的话就是那俩string了。最后还有一个window,是time window。好啊,这我们先把这个类型先写好,检查一下上面没有问题了,对吧。
02:03
接下来在这里具体去做实现啊,那其实这个过程大家会想到就是。我们一定要实现这样一个process方法,那么process的时候其实已经能够拿到所有的数据了,对不对?呃,拿到所有的数据的话,这个其实非常简单了,我们想要的是什么呀。其实想要的就是,如果这么看的话,我们前面你你后面如果是要用这个process的话,那其实前面连这个EL都没必要,对不对。呃,大家想想是不是这样,呃,根本没有必要有这样的一个一个计数的东西,我们到最后达到所有的数据,是不是直接判断它的大小就可以了,对吧?我们的所有数据是不是都在elements里边啊,它的它的长度,它的size是不是就代表了我们当前所有的,呃,这个就是统计出来的那个count值啊,对吧?呃,当前这个渠道里边,你点击是多少,或者是那个下载是多少,都在这个里边了,呃,当然我们现在用了process function,好处就是说你可以呃去直接把size拿出来也可以,还可以做驱虫,对不对?因为有时候有可能一个用户他他在那儿狂点,点了半天,算了好几次,这个其实应该也要驱虫,做做排除的,对吧?所以在这个过程当中,我们拿到了全量数据,能做的事情就非常非常多了啊,当然这里边我们就用最简单的给大家不做去重了啊,前面大家已经比较熟悉了,所以这里面。
03:36
那我们就把关心的那些参数都拿到,最后不是要包成一个marketing,呃,View count吗?它不是有一个start ts吗?Start ts应该是。是不是我们从哪里去找,是不是content。点window,点。他是不是有get and,还有get start啊,这个拿到的是一个一个long类型,我们最后要的是一个string,那是那是得把它转一下对吧?啊,所以我们可以比方说time STEM。
04:21
我们把它做一个转换是不是就可以了啊,当然这里边转就是本身这个转换啊,这个还还还不行,对吧,你这里边还得去。Get time对吧,还得拿到当前的这个时间,然后还得去大家看这个get,诶get time不对,Get time这里边又是一个那个,呃长整型了,大家回忆一下,当时我们在哪个代码里有用到了这个转换呢。是在hot itemsto里边,我们下边这里是不是曾经做过这个转换啊,对吧?呃,这里其实直接把它这样一转,是不是直接可以作为就是判到字符串里面去了啊,当然这里边本身它还不是这个字符串类型,所以我们需要。
05:16
把它应该这个to string就可以了,对吧,To string,然后最后应该得到的就是一个string,同样的and ts,你有一个time stamp。Contact window get to string。对吧,呃,然后接下来还有什么来着。还有哦,就是China behavior还有count对吧?那channel behavior都在哪里取啊,对,我们现在是不是把它都放在K里边了呀,所以现在我们channel就是K的,是不是下划线一啊,那么behavior就是。
06:11
K的下划线二,哎,另外还有一个count,那就是element,是不是点size啊,大小就是,所以这个其实输出非常简单,我们最后就out.collect marketing user behavior这里边,哎,是不是就。哎,怎么了?来大家是说什么,哪里有问题哦,这里边不是那个marketing user behavior啊,应该是market count,对吧?对,这个样利列一个是输入输出搞混了,所以这里边我们要的是start,就就是前面那个三参数一个一个填进来是不是就完事了,对吧,Channel behavior count。
07:11
哎,这样就没问题了,所以大家看这个非常简单啊,或者我们想做这个,呃,就是大家做浴聚盒的话,因为这个过程是相当于把所有元素都缓存下来了,最后一获取size直接取出来对不对啊,这个有个好处是你想做去虫也简单,这里边是不是直接把它遍利一遍,然后塞到size里边就就驱重了呀啊,当然如果要数据量特别大的话,我们还是得呃做这个布容过滤对吧?呃,就是这些大家都串起来就知道该怎么做了,呃,当然这里边如果想做这个浴具盒的话,我们是不是可以不要直接一个process,而是可以定义一个aggregate对吧?前边做一个浴具盒来一个元素就加一,来一个元素就加一,是不是跟我们前面那个做法一样,然后后边再把它包装成一个样例类啊,Window function输出是不是也是一样的啊啊,我们这就用不同的方法给大家做了一个这样简单的介绍而已。
08:07
好,然后现在我们来输出一下吧,运行,看一下这个运行的结果怎么样。大看到这里边运行起来已经输出结果了,这里边输出的是什么呢?诶大家看到这这里边是我们的,呃,第一个窗口的统计对不对?大家看50秒,呃,就是以五十五点三十五分50秒作为呃,这个window and呃,这是一个小时的时间窗口,然后大家统计这个,呃,微博的录的有56,微博的click有46对吧?呃,诶这个登录的居然比click还还多啊,这个数据测试数据生成的还是有点问题对吧?呃,这个逻辑那就没办法了,这个因为我们随机生成的嘛,呃大家可以如果要更加符合这个对实际的话,可能还得加一些权重之类的考虑,对吧?这概率上要调整一下啊,那当然按照这个的话,大家其实把我们每一个窗口的数据输出,你如果要是写到一个red,或者写到其他一个数据库里边的话,另外一个应用去查询,把它显示在屏幕当中,是不是就可以给别人做监控了。
09:14
对吧,看到当前,呃,到底实时的这一个,呃,当前的这个APP推广的渠分渠道的这个状态是什么样,这就是这一部分内容,但是如果我们这里不停的话,它其实会一直生成新的数据做统计。好,我们先把它退出,然后接下来除了这一部分之外呢,那肯定还有一个总量的统计,刚才是分渠道的对吧,那肯定就是还有不分渠道,我们把所有的全合到一起,那到底是一个什么样子的啊,这个当然也是值得关注的一个一个数据,所以我们另外简单的在实现一个所谓的不分渠道的。APP推广、市场推广统计。
10:00
呃,这个叫APP marketing啊,直接就叫APP marketing好了。然后这里边其实前面是不是全部照抄啊,对吧,然后幺零类不用定义了,都用这边的,这里我们先去创建执行环境,然后下边这个呃,按照这个呃不是卸载去做filter,然后map成自己想要的对应的这个结构,这里这里的map是不是我们就不需要这样去map了呀,对吧,我直接先抄到这里啊,哎,或者我直接全抄过来吧,这个好像没什么,稍微改一改就知道了。我们先把这些该引入的东西先引入。呃,然后大家仔细看一下,这里边我们做map的时候,是不是只需要啊,就根本不需要这样。
11:03
考虑它的channel和这个behavior了,如果真要map的话,是不是直接一个大K就可以了,对吧?啊,直接按照这个去做K办,然后我们开窗做统计就可以了啊,然后接下来这个process我们就叫做。Marketing count对吧,这里边叫total吧,所有的全量的一个输出啊,这里边我们也把这个删掉啊。所以呃,那大家会想到在这个过程当中,其实如果我们还跟刚才那个一样的话,大家可能会觉得这个全量数据全放到这里边有点不合适,那大家觉得什么样的方法可能会更更合适这个场景一点呢?是不是还是增量聚合啊,对吧?那这不就是来一个数统计一个吗?呃,一个count嘛,所以我们这里边还是没一个,比方说叫count AG。
12:01
然后后边我们叫这个弯。呃,Marketing count total对吧,那在这里去实现count a。他就应该是一个aggregate。Function啊,这是一个。Java的interface对吧,大家还记得它的那个类型是什么吗?输入的,然后累加器这是状态对吧,还有一个输出的,它的输出是不是会给到后边的这个window function啊啊,所以这里边我们的输输入是。这里的输入是什么?是不是这个元组类型啊,一个string,一个long。然后累加器就是就是浪对就是抗嘛,所以输出是不是也是浪啊。诶,这里边对,这里边没错了,对吧?好,我们把该复写的都复写一下,爱的时候来一个数就加一啊这这就完全一样啊,嗯,上面可以改成什么了。
13:14
改成。改换一个啊好,这里边不是元组了啊,是我们本身这个数据改成元组了,对不对?呃,这里大家注意一下啊,这里边我们本身的这一个,呃,就是K本身还是string,但是大家注意这里边我们要K吗?这里边是不是就是输入的数据类型啊,所以说in还是一个元组对不对,是刚才的那个二元组,如果要是有K的话,那是不是就又变成string了,单独的一个string就好了,好,然后接下来创建的时候是不是就0L对吧?Get的时候直接把cuumul累加器输出就可以了,当然这里边如果要合并的时候A加B对吧?啊,那这个就很简单,最后的这个marketing total是不是也要去实现一个window方式啊?
14:11
诶,这里边大家就记得window function的话。Scale window function对吧?这里它是不是得有一个in,有一个out,还有一个KR啊,还有一个window,这里我们得把那个确定好,首先它的输入,输入其实是前面语句和函数的输出,对,所以是long对吧?然后这里边它的输出就是我们的。定义好的样例类类型对不对,Will count,然后接下来对P的话,那就是string了,这就是我们已经定义好的这个一个K对吧,另外还有一个window time window。好。是不是上面类型就完全对了啊,现在就没问题啊,然后这里边我们要复写的是一个apply方法,那么大家想想,在这个apply方法里边,我们需要拿到什么东西呢?
15:10
是不是还是那几样啊,哎,这个我直接直接把这个copy过来都可以啊,是不是就这些啊对吧,当然这里边可能略有不同,哎,有哪些不同呢?比方说这里边这个不是上下文了。这里边你从哪里去拿,这里边是不是直接从window拿就可以啊,方法里面是不是直接有window,直接拿window的。然后。还有什么呢?这里边的channel跟behavior是不是都没用了呀,都不要了对吧?然后这里边诶,这里边多了一个C,那么这个呃,Count我们从哪里去拿呢?是不是相当于从这个input里边去拿啊。呃,当然这里边我们是不是input应该是一个,然后直接把我们之前得到的这个预句合出来的那个结果next拿出来是不就完事了,哎,这就是我们当前的这个值。
16:14
那当然了,这里边这个channel这两个没有吗?我们就直接给一个,呃,自定义的一个值就可以了,比方说这里我们叫APP marketing,后面这个我们叫total,对吧,总数的一个统计,然后给一个count,这样就可以了。所以在这个过程当中,其实并没有特别多的其他的东西出现,如果只是开窗做一个聚合的话,这样也就够了,大家可以把这个我们讲过的所有方法啊都复习起来,就都作为大家武器,武器库里边的武器啊,十八般兵器都要样样精通。好,我们测试一下吧,看看现在这个状态是什么样的。
17:02
我们这里面的数据源就还用了刚才那个by channel时候的那个,呃,自定义的测试源,对吧,还是直接去创建了这样的一个方式。好,大家可以看到这里边已经有这个结果输出了。诶,大家可以看到现在好像输出的就就就内容很少,对不对,为什么输出的很少啊。是不是现在就是标准的十秒钟一个窗口的结果输出啊,对吧?所以这个就是大家看标准的这个统计,当然一开始的统计出来的这个数据数量比较少,对不对,后边就越统计越多,因为我们统计的是一个小时的这个数据嘛,所以这个数据肯定是一直增长的,增长到一定程度,如果到了一小时之后的状态,再去划窗的时候,那基本上可能就趋于稳定了,因为我们前面的那个生成数据的速率几乎是一样的。大家也可以想到它后边大概的这个出现的状态是什么样的啊,这就是我们这部分代码。
我来说两句