00:12
大家好,我是来自每日互动支付平台的志昂,我今天跟大家分享的主题是标签存算在每日支付平台的一个时间之路。今天整体内容主要分三块进行。第一块的话,我会整体介绍一下标签计算的一个核心逻辑,以及我们每日支出平台是如何存储这些标签的。第二块的话,我跟大家聊一下我们是如何基于阿帕奇的set来加速我们的标签计算。第三块,嗯,跟大家分享一下我们标签数据在写入click house过程的一个优化实践。首先我们来看第一块。嗯,这是我们每日自助平台的一个产品首页。每日制助平台呢?呃,我们对它的定位是技术数据智能操作系统。
01:06
呃,整个系统的话,提供从我们数据接入到再到我们的数据处理,以及最后到数据应用的一个一站式的一个工作台。其中呢?包含的模块是非常丰富的,包括像数据集成啊,记忆学习,以及我们今天所讲的标间平台。呃,目前呢,我们每日技术平台呢,已经对接了很多垂直行业,帮助客户进行一个大数据的建设和管理。呃,如果大家感兴趣,可以到我们的官网进行进一步的查看。呃,从从产品首页图也能看到,就是我们标签平台呢,它其实呃,主要是帮助我们企业构建标签以及画像体系的,呃。赋能业务,实现我们精细化的运营和精准的营销。
02:07
呃。标签平台呢?他提供的整体服务呢,就是可以从页面上能看到用户呢,在。页面上填写好,填写好自己的一个规则,构建自己的一些标标签规则。呃,这些标签规则呢,会进入到我们的一个标签计算引擎。引擎计算完的标签数据呢?它其实会回流到我们整体的一个每日互动平台,进行统一的资产化管理。另外一块的数据会进入到我们的一个ola引擎中,用来为我们的人群洞察提供数据的准备。下面看一下我们整个标签计算的一个核心链路。首先对于来源数据,无论是。呃,当然了,从整体上来看,呃,整个标签计算呢,它是分两块的,实体计算以及标签计算,呃,我们首先来看一下实体计算。
03:06
实体记账的输入来源呢,是我们的。实际的呃数据表这些表呢,其实并不是说直接是用户的表,那其实是经过我们治理平台的一系列的接入勘探的工作,接入到我们整个质数平台中,呃,我们对于呃已经达标了,这些数据属性表,时间表会首先会进行一个七源组的一个计算。七组计算,这是七元组,这是我们一个代号,嗯,大家只要呃知道这是其实是对我们这些表中的一个ID进行一个全量的reduce的这么一个操作,呃,以得到每一个唯一ID的一些属性事件的这么一个计算的过程。在进行七人组计算完之后呢,我们会做一个ID de的这么一个工程计算这块其实是主要是为我们后续呃。
04:02
洞察,做一些做一些bit map的数据的准备。在进行,在进行完全量以及增量的这些idex工程之后呢,我们还有很多其他的一些计算任务,比如说像全量的一些虚拟标签的计算,以及像事件属性生成b map的这么一个过程,呃,最终的数据都会是写入到我们click house中。在完成实体计算完之后呢。就会进入到我们标签计算的一个过程中,呃,整个标签计算它的输入。来源可以有很多,包括像上面计算出气源,计算出的气源组结果,呃,外部实体的一些引用标签结果,以及甚至是映射标签,比如说用户已经有一些自由的。呃,标签,那其实如果说想接入到我们平台中也是可以的。呃,在进行完标签计算之后呢,那这些标签。计算的这些结果,他会。
05:02
转成b map也会最终会写入到我们的click house中。呃,这一整块是我们整个的一个核心链路。呃,那其实在整个计算链路过程中呢,是有很大量的Spark的一个计算任务,那其实在优化这块的一个工程时,我们也是借鉴了,呃结合时间吴磊老师的一些设计原则,呃结合我们的实践进行了一个优化,呃首先一块,第一块是叫做坐享其成,那其实呃是这样,就是我们会把所有的SPA工程,就原始基于Spark克API开发的这些代码全部全部化,Spark circle化。其实呢,核心是想就是。呃,利用本身Spark circle带来的带来的红利,那其中比如说像呃,Catal list,它提供的这些位置下推以及内减支的能力。
06:02
的确是,呃,在性能上是有所提升的,以及在我们实践过程中也是明显,也是可见的。嗯,熟悉Spark的同学,呃,可能会知道Spark3.0呢,其实对Spark的优化其实有大大约40%左右。而对于像比如Spark ML以及streaming这一块,它其实只有10%几,那其实本身Spark circle化的,呃,这个方向其实是从社区层面是能看出来的。呃,我们也在逐渐推吧,就是嗯,Spark3.0其实也是也是也是在逐渐升级过程中,我们也是想用到比如说Spark3.0中的一些,呃,AQE啊,DPB这样的一个一些能力,来帮助我们进一步的去提高我们计算的效率。呃,第二块的话是一个叫能省则省,能拖就拖。省的话,其实呃,顾名思义就是说在进入到最终action计算之前,能先过滤数据就过滤数据,能能先做一些呃数据简单的一些过滤操作,那其实能帮助我们。
07:10
呃,能够进一步优化后面的呃计算。另外的话是沙uffle,其实沙uffle这个事情在整个大数据计算过程中其实是比较讨厌的,那其实对于沙uffle的话,我们呃尽量是避免,就是如果不能避免就尽量也是往后拖吧,呃我们实践过程中会呃使用的一些方式啊,将我们一些呃杀Le so merge join啊这种一些方式,我们也逐逐渐的转化成了基于broadcat的一些哈,Join。那关于人审则审这一块的话,后后面我们会有一个bit map的一个案例来进行进一步的讲解。呃,第三块的话是叫跳出单机思维,那其实这个东西在我们大数据计,这个在大数据计算领域其实还是。嗯,比较重要的,尤其是分布式,呃。
08:01
其中它有一块,比如比如说像循环语句中反复访问idd,呃,或者说临时变量缓存数据。转换的一些中间结果,再比如说像比如说我们图中所示的这些这个案例一样,对于像这些map,嗯,Map啊,或者说Fla map啊,这些分布式的一些算子里面,其实我们在写代码的过程中,并不是说呃,我想比如说我想六个U就是做这个事情,那其实呃在算子内部的所有的操作逻辑,其实已经在分布式的环境下了,所以说对于这种U的构建,那其实我们并不需要说对于每条数据都去做一次,那其实这样是很很浪费资源的,所以说我们可以把呃,U提到外面来,那在我们map这里面。免去大量的对象创建。这是我们呃,结合实际的一些工程优化原则做的一些事情。
09:03
呃,下面来看一下。整个标签计算在转b map上的一些优化案例,那其实这个也是对应前面讲的一些能省则省。的一些优化。呃,首先数据结构我们可以看一下,呃,数据结构的话,首先输入的数据结构是就是ID以及ex我们的标签,还有一个就是index,这个其实就是跟bit map的index有关系,另外输出的话就是输出我们每个标签所对应的map的一些数据。数据结构比较简单。呃,先看一下我们在优化之前做了些是是怎么做的,就是对于呃,我们输入数据,我们首先会做一个Fla map的打平操作。呃,同样的一个标签值,我们在每个分区中我们都会去,呃,录一个mid mid map的对象,之后呢,我们统一的进行一个reduce by key的操作。那其实这个。
10:00
过程中。就会有,就是可以看出有大量的对单条数据,我们会有大量的map对象的建立。呃,从而最终得到我们的一个结果。那看我们优化后是怎么做的。呃,优优化之后呢,我们在对Fla对原始数据进行Fla map的时候呢。我们并没有说对每一条数据都会去进行rolling bit map对象的监理,而是我们采用了great by k这样的算子,我们在一个分区之内保证相同的tag。他的。如b map的对象只有一个,呃,同一个分区内同一个同样的标签,只我,我们对b map数据只会做爱的操作,这样就免去我每条数据都会构建一个单独的roll bit map对象的这么的一件事情。从而最终使我们在。保证数据量以及资源配置一致的情况下,整体计算的效率提高了大约56%。
11:10
呃,前面讲的主要是计算以及一些通用的优化原则。这这边讲一下我们,呃。标签平台是怎么存储我们的标签数据的?呃,在引擎方面的话,我们选择了click house,主要是click house这块,对bit map的这些数据结构它支持比较好。嗯。右边这是我们一个实际的一个案例,就是。我们他这整个select语句是其实是从嗯,我们是一级的人群中,呃。通过一些标签。洞察出大约满足某一个标签的人群的一个这么一个数量,可以看到我们从级B级人群中,呃,挑选出大约两千三千万左右的一些人的,呃。
12:01
人的这些tag的时候。大约的处理时间只要四秒左右,其实已经呃,完全实现了。秒级洞察的这么一个能力。呃,下面一块是我会介绍一下。呃,使用阿尔法萨来加速标签计算。呃,首先看一下我们最开始标签计算是怎么做的。呃,我们服务端标签阶段的服务端首先会准备好我们的一些GQ语句,呃,GQ是我们每日注入平台基于Spark的一个统一SQ引擎,那其实呃在准备好这些GQ语句之后呢,就是服务端会把它会给我们的一个呃标记计算引擎。标签计算是基于我们即可去进行计算的,呃,准备好的这些SQL语句呢,呃,首先比如说对于我们标签是有依赖关系的这种。
13:07
这种场景,那我们标记上会去读我们的一个圈组数据之后,再基于规则进行一个标签的构建,构建完标签之后呢,比如说我们还有一些依赖关系。我们还需要再去再重复的去读原始的资源组,结果结合我们标签。一的结果来,算出我们最后最终第二个标签的结果来,就是我们图中的天二这种目录以死往复。那这样的话,我们可以看到其实有大量的数据在来回的呃杀,在来回的重复写。同样一个对去组计算结果数据我们也是重复读了很多次,这个效率是非常低的,呃,整体的计算时间呢,也是按天计。一旦标签一多,可能甚至都计算不出来。那我们后来就做了一些调研啊,就是引入了一个。
14:01
阿帕奇的。呃,阿Q赛是什么呢?它是一个用于呃优化异构数据源查询处理的这么一个框架,怎么理解呢?比方说我们有三份数据,呃,一份是在我们的文件,一份是在,比如说在MYSQL。还有一份,比如说在ES,那我们怎么去通过一个统一的搜狗去做这个。接入这三个圆呢,那其实Q赛就能够提提供这个标准化的能力。嗯,左边这张图呢。呃,是一个是cos的这个整体的架构图。呃,我们可以简单看一下。呃,首先通过呃,JD bc server这块引入到我们K的一个。优化以及验证的这么一个。呃,这么一个模块,那其实这个模块其实是跟我们呃Spark的开list是非常像的,它其实是做了一些呃ASD语法数的一个抽象。
15:01
呃,以及。以及一些嗯,原则的,以及以及一些基于一些原则的优化,那其实cat list是做了一些启发式原则的优化那。Co这边也是有自己的原则。优化的。之后的话会进入到我们的一个整体的查询优化引擎。呃,其实呃,除了本身的查询优化,其实可Q赛的最大的一个特色能力就是它的可插拔能力。嗯,可以看右边这张图,比如说呃,对于我们的have,它其实就已经用了Q,它只是用了Q的一个优化引擎。而像。Link,它其实是完全基于我们开赛建立了一套自己的S引擎。呃,介绍完q set之后呢,我们来看一下我们的标签计算是如何结结合Q进行呃计算的。首先对于我们原始的时间表、属性表,我们会进行建筑计算,这前面其实也提到过,切组计算的结果大家可以看,就是对于每个无论是事件还是属性,在全组中都是一列,并且列的方式都是基于呃多层map嵌套的。
16:13
这样的话,嗯,我们在读入千组数据之后,呃,我们是构建了一个基于呃科的一个udf来来进行处理的,那这个。Udf的话,在读到呃每七人组的每行数据之后呢,它会构造出这么一张呃原始来源表。之后基于来源表来做我们呃标签的一个SQL计算,同时呢,我们这里也能够说有一张中间以及结果表,这张表的话可以从我们呃中间计算出来的一些标签,这个就是为了适配我们标签呃存在依赖关系的这种场景。呃,从右边这张图也能看到,其实呃,引入Q赛之后呢,我们。
17:01
在标签计算的过程中不会说有重复的,这种数据的读取以及大量的IO操作其实显得更加简。简洁。OK,那我们其实在呃设计完之后,我们也是呃做了一个测试,呃我们使用了大大约10亿级的圈组数据,以及几百个标签规则,我们呃位入到我们这套的一个标签引擎中,我们发现整个处理速度呢,呃大约是每分钟2万条,然后整个耗时大约呃1.7。那其实呃,可能直接看这些数据没有什么直接的感受,那其实如果说这样的一个数据量位到我们最开始的标间计算引擎中,那其实他根本。说是跑不出来的。呃,如果说前面前面说的是由技术驱动来做了一些改造,那其实这边更多的是业务驱动啊。
18:01
为什么去做一些改造的事情,我们可以看一下这个背景,呃,首先有一张事件表,这个表里面记录了一个用户他在不同的时间段,在不同的水果店买的购买的行为,那其实在经过我们组计算之后。嗯。我们可以看到,呃,一个用户我们可以知道他是在每个哪个点去了哪个水果店,以及我们知道他买了什么,但是呢,呃,很明显我们发现这时候列与列关系是缺失的,这时候如果说呃,我业务方需要我知道,需要知道用户在比如说在张三水果店到底买了哪些水果,那这时候我。我的计算,我的SQL计算逻辑是没办法体现出来的。为此,我们。做了相应的一个改造。其实改造更多体现在从七元组。过程中就开始体现迁入的结果的话,我们并不是说像之前用多层map的嵌套结构,而是改成了呃,List的一个嵌套结构,其实记录的话是一些明细数据。
19:10
在嗯,计算完起源组之后呢,在呃之后会位入到我们那个科萨的udf中,这时候我们可以发现跟之前有所不同,之前的话起源组整个一张表结构进来,进到Q赛这边来的话,它也是一张表,那经过我们改造之后呢,其实进入到Q赛里面来。每对于千组的每一列,我们都变成了一张表。这时候我去,如果说七元组的面列数呃很多,那这其实对于我cos的处理。还是更复杂的,因为要渲染出以及预编译出很多的表结构。呃。呃,在对于中间跟结构表其实还是一样的,跟之前一样,只是说我们可能处理的表结构变多了。呃,整个变复杂了,但是呢,对于业务方而言,这样的话对他们来说更加的灵活,他们不用说对于某一列可以做,呃,他们在做呃,数据搜呃,SQ在构建的时候并不需要知道基于某个UDF去做一些SQL的操作,而是直接说对原始表进行写SQ就可以了。
20:18
那其实在经过改造之后呢,我们用同样的数据,呃,进行了一次测试。呃,我们发现这个处理速度已经降到了每分钟9000条。同时耗时也。呃,到了3.9个小时,其实这个是意料之中的,因为我们本身Q赛的计算。更加的复杂化了。那。我们后面还进行了进一步的一个处理优化吧,我们发现。有几个点,第一点就是说我们GQ在对UDF进行处理的时候,呃后面如果说跟上一些,嗯,其他的一些UDF,它本身这个UDF的计算可能会重复计算,那我们其实呃应对措施比较简单,就是说我们会把重复计算的这个呃,UDF在不改变业务行业的前提下,把它挪到了后面。
21:15
呃,第二块的话是一个udf中发自Jason解析类对象的这么一个使用,其实我们呃,通过测试发现其实有几个我们用到了这些,呃解析类的一些方法,我们发现耗时还是比较久的。那其实这块的话,我们都统一到预编译上,而不是说每一条数据进来,我都会去调用这这块解析的一些方法,呃,第三的话是我们,呃k set呢,对数组类型这种map map的转化处理,我们并不会说放到我们k set底层核心逻辑去计算。还是说我们会去放到其他的uf去处理本身我们保证Q赛去减少我们这些对象的一些创建跟封装,呃,经过这一一系列一些处理优化,我们同样的对之前这份数据进行了一个测试,呃测试之后呢,我们发现整个的处理速度,呃又到了我们之前,就是改造之前的一个2万条每分钟,然后耗时的话也很巧,正好也是一点个七个小时。
22:22
呃,在嗯,保证了灵活性的基础上,我们的一个处理速度已经也比之前。也和也是和之前Q赛的处理是相同的了。嗯。这块呃,介绍完了,那下面我们来看一下第三块,第三块是我们一个标签数据在写入click house的一个优化。嗯。最开始我们的一个标签平台。中的一些标签啊,行为啊,属性,这些数据都是基于我们house的一个外表引擎。
23:00
就是HGS外表已经建立了,那其实写入过程也很简单,我们其实。直接使用了ER into这样的语句来把我们的一个bit的数据来写入到click house中,呃,整个过程还比较简单。那随着我们数据越来越多,那我们只用me这种本地表演肯定是不够的。那我们就是引入了分布式表。同时呢,保证数据高可用的前提下,我们也是选用了呃KD的这样的一个表引擎,呃分布式表。的建立,其中我们可以看到,其实我们也是加入了一个物化列,来减少我们一个数据在计算过程中的一个呃,计算性能的消耗。呃,引入分布式表之后呢,并不是说我之前呃银色的语句就直接可以改成说往分布式表写就好了,那其实这样做呃是会有问题的。
24:04
呃,看右边这张图,那其实如果说我click对于click house,如果说我数据直接写入分式表,那其实它的底层它是会把数据全部会写入到一个其中一个分片上,呃之后的话,他会把呃这个分片的数据分发到其他的分片上。那所以说这其中其实它是会有单点问题的,因为呃,毕竟是所有的数据都会放到同一个分片的一个本地目录。所以呢,无论是k house官方的社区呢,还是呃,业界的一个实践都是推荐我们是写本地表啊,分布式,分布式表主要是用来读的。那嗯,是不是。那怎么写我们这个本地表呢?那其实我们并没有说,呃,上来就直接去写,我们去参,我们看了一下业界有没有什么比较好的一些经验,那的确是有找到,呃,我们这边引用了一个叫。
25:06
Can的这样的一个,呃,开源的这么一个。嗯,组件。其他诺呢,它是一个呃,非常应用的,它是支持一个超高性能那个分布式数据集的平台。呃,应该有很多公司之前都是有用过,他以前叫world job应该是在呃去年,去年的话他刚刚改名,呃目前的话也是进入了阿巴奇的孵化,呃我们从他们官网的这个示意图来看啊,就是他们是支持我们,我们这样的场景,也就是我们从这边的数据怎么去写入到我们的k house中。整体的话,它的一个时间策略是这样的,就是对于呃,同样的去读,读到我们内存中每个分区的数据,它并没有说是一条一条的,都会是往我们c house的分面上写,它是有一个bech的一个策略。
26:04
只有说到达了这个B的量,比如说他默认的是10万,到达这个量级,他才会去写一次click house的一个本本地表。另外呢,另外的话,他在呃分片的选择上也是采用了一个随机的一个策略。嗯,那我们就把它的这个整个实现引用到我们的数据场景下来。呃。我们有个数据场景啊,是这样的,就是说我们有540个分区的一个数据。这样的数据在引用这样的一个计算方式,我们发现的问题,呃,我发现我们b map导入click house速度非常的慢,同时有时候可能直接就报错,后来我们就分析了一下,我们发现。嗯。这个分区,呃,分区量太多,可能我们刚开始并不是很清楚这个分区有这么多,呃,后来经过一些一系列的数据洞察,我们发现540个分区存在,可能每个分区里面有一条数据都会存在,存在在呃,我们Spark的相同的一个分区里面。
27:07
那其实这样的话,对于一批数据在写入到我们可house的本地表上,它会产生,因为一条数据可能就会有个目录,那其实如果说即使1000条数据写进来之后,那可能里面有540个分区,那就是说540个目录,那如此往复。的话,因为我们数据是分散在整个呃集群里面的,所以说呃。这种方式直接写到我们下里面会。会让整个呃me呃,Click house是吃不消的,他的那个merge tree根本就merge不过来。同时的话,这种策略的话,有可能会把数据全部打到同样同样的一个下上面,整个我们的IO也是很有问题的。呃,对于这个场景我们做了一些优化,呃。对于click house同样的一个分区的一些数据,我们做了一些呃,Group by的一些操作,保证相同的一些分区数据尽量的落到我同样的一个Spark的一个分区上。
28:08
同时呢,我们的一个写入下的的一个策略,我们也改为了轮询,呃,基于我们的一个下载分片数进行取模的一个轮询策略,呃,这样改改造完之后,我们相比之前效率的话是提升了大约八到十倍。呃,本以为。那样就已经OK了,那其实我们后来又遇到另外一个数据场景,其实这个数据场景跟呃数据场景一恰恰相反,它的分区量就很少。这时候我们如果还是采用刚刚的方式做。做写入的话,我们发现了,就是很明显在这一层的话,就可能会存在大量的一个数据倾斜,我相同的分局可能全部全全部到了同样的一个。ID,嗯,ID的一个某一个分区里面。那这样的话。呃,写入过程中可能会也是会产生多次的一个失败情况,嗯,对于这样的数据场景,我们呃又进进行了进一步的一个优化,呃我们自己去自定的一个,呃,Spark的一个分支器,基于他的position来进行做的,呃这样的话是第一一方面。
29:17
是保证我们呃click相同的分区在写入之前。呃,在读入Spark内写入click号之前,就已经尽量的是分布在相同的一个呃,Spark分区里面。同时呢,为了避免数据倾斜的问题,我们也加了一个随机数的一个这么一个策略。另外的话,在为了。控制我们click house的写入并发,我们也是做了一个动态适配,呃,基于我们一个Spark的分均术,以及我们click click house集群的一个沙特数做了一个动态适配。这样的话,呃,采用这样的策略的话,其实面对其实前面两个场景都是没有问题的。
30:02
OK,那我们最后对今天讲了一个内容做个整体的总结,呃,第一块的话,我们呃聊到了一些Spark计算工程的一些优化原则。呃,这块的话,我们做的工作主要是把呃基于SPA扣的这些工程开发的工程全部是迁移到了基于SPA来做来做。这样的话,我们主要也是想用到SPA本身底层的一些呃优化。来提升我们本身Spark计算的一些性能。第二块的话,我们通过一个转b map的一个呃,计算案例。呃,有聊到整个计算过程中,其实还是尽量的减少我们GVM的对象的一些创建销毁,因为这块是比较消耗性能。另外的话,我们也呃简单介绍了一下我们自主平台是怎么去存储我们那个标签的,目前的话我们是基于呃,我们的O引擎是选择的时候click house,然后我们数据结构主要围绕bit map为主。
31:08
呃,第二块的话。介绍了,主要是介绍嗯,漫画Q赛的引进之后给我们带来的一些标签计算的性能提升吧,呃。除此之外,基于呃业务的一些需求,我们也重构了一些数据结构证,我们Q在计算保证灵活性的同时,也能够在性能上不会说。太慢。第三块的话,呃,我我们聊到了我们标签数据在写入click过程的一些优化实践,嗯。本身标签平台的话也是。在click使用上面也是从本地表,随着业务业务应用的规模扩大,我们也是逐逐渐逐渐迁移到了我们的分布式表,同时呢,为了嗯提高我们的一个存入性能,我们也是引入了一些bit的一些物化链。
32:02
呃,同时呢,在写入house的一个过程中,我们也是自定义了一些写入策略。呃,保证我们click house的整体并发度可控。保证整个cho集群的稳定性。好了,以上就是我今天分享的全部内容,如果大家对内容感兴趣的话,可以关注我们的公众号以及加入我们的社群进行一起探讨。啊,谢谢大家。
我来说两句