本文由 bilibili 大数据实时平台负责人郑志升分享,本次分享核心讲解万亿级传输分发架构的落地,以及 AI 领域如何基于 Flink 打造一套完善的预处理实时 Pipeline。本次分享主要围绕以下四个方面: 一、B 站实时的前世与今生 二、Flink On Yarn 的增量化管道的方案 三、Flink 和 AI 方向的一些工程实践 四、未来的发展与思考
GitHub 地址https://github.com/apache/flink欢迎大家给 Flink 点赞送 star~
说起实时计算的未来,关键词就在于数据的实效性。首先从整个大数据发展的生态上,来看它的核心场景辐射:在大数据发展的初期,核心是以面向天为粒度的离线计算的场景。 那时候的数据实效性多数都是以运算以天为单位,它更加注重时间和成本的平衡。
随着数据应用,数据分析以及数据仓库的普及与完善,越来越多的人对数据的实效性提出了更高的要求。比如,当需要做一些数据的实时推荐时,数据的实效将决定它的价值。在这种情况下,整个实时计算的场景就普遍诞生。
但在实际的运作过程当中,也遇到了很多场景 ,其实并没有对数据有非常高的实时性要求,在这种情况下必然会存在数据从毫秒,秒或者天的新的一些场景,实时场景数据更多是以分钟为粒度的一些增量计算的场景。对于离线计算,它更加注重成本;对实时计算,它更加注重价值实效;而对于增量计算,它更加注重去平衡成本,以及综合的价值和时间。
图片 2
在三个维度上,B 站的划分是怎样的?对于 B 站而言 ,目前有 75% 的数据是通过离线计算来进行支撑的,另外还有 20% 的场景是通过实时计算, 5% 是通过增量计算。
图片 3
对于实效性问题 ,其实早期遇到了很多痛点 ,核心集中在三个方面:
图片 4
除了实效性的问题 早期还遇到了 AI 实时工程比较复杂的问题:
图片 5
在这些关键痛点的背景下,我们集中针对 Flink 做了生态化的实践,核心包括了整个实时数仓的应用以及整个增量化的 ETL 管道,还有面向 AI 的机器学习的一些场景。本次的分享会更加侧重增量管道以及 AI 加 Flink 的方向上。下图展示了整体的规模,目前,整个传输和计算的体量,在万亿级的消息规模有 30000+ 计算核数,1000+ job 数以及 100 多个用户。
图片 11
先来看一下整个管道早期的架构,从下图可以看出,数据其实主要是通过 Flume 来消费 Kafka 落到 HDFS。Flume 用它的事务机制,来确保数据从 Source 到 Channel, 再到 Sink 时候的一致性,最后数据落到 HDFS 之后,下游的 Scheduler 会通过扫描目录下有没有 tmp 文件,来判断数据是否 Ready,以此来调度拉起下游的 ETL 离线作业。
图片 7
在早期遇到了不少痛点:
图片 6
在稳定性上也遇到很多问题:
Image1
在如上的痛点下,核心方案基于 Flink 构建了一套万亿级的增量管道,下图是整个运行时的 DAG 视图。
首先,在 Flink 架构下,KafkaSource 杜绝了 rebalance 的雪崩问题,即便整个 DAG 视图中有某个并发度出现数据写 HDFS 的堵塞,也不会导致全局所有 Kafka 分区的堵塞。此外的话,整个方案本质是通过 Transform 的模块来实现可扩展的节点。
图片 2
Flink On Yarn 的整体架构,可以看出其实整个管道视图是划分以 BU 为单位的。每个 Kafka 的 topic,都代表了某一种数据终端的分发,Flink 作业就会专门负责各种终端类型的写入处理。视图里面还可以看到,针对 blinlog 的数据,还实现了整个管道的组装,可以由多个节点来实现管道的运作。
图片 1
接下来来看一下整个架构方案核心的一些技术亮点,前三个是实时功能层面的一些特色,后三个主要是在一些非功能性层面的一些优化。
图片 3
业务的开发主要是通过拼装字符串,来组装数据的一条条记录的上报。后期则是通过了模型的定义和管理,以及它的开发来组织的,主要是通过在平台的入口提供给用户去录制每一条流、每个表,它的 Schema ,Schema 会将它生成 Protobuf 的文件,用户可以在平台上去下载 Protobuf 对应的 HDFS 模型文件,这样,client 端的开发完全就可以通过强 Schema 方式从 pb 来进行约束。
来看一下运行时的过程,首先 Kafka 的 Source 会去消费实际上游传过来的每一条 RawEvent 的记录,RawEvent 里面会有 PBEvent 的对象,PBEvent 其实是一条条的 Protobuf 的记录。数据从 Source 流到的 Parser 模块,解析后会形成 PBEvent,PBEvent 会将用户在平台录入的整个 Schema 模型,存储在 OSS 对象系统上,Exporter 模块会动态去加载模型的变更。然后通过 pb 文件去反射生成的具体事件对象,事件对象最后就可以映射落成 parquet 的格式。这里主要做了很多缓存反射的优化,使整个 pb 的动态解析性能达到六倍的提升。最后,我们会将数据会落地到 HDFS,形成 parquet 的格式。
图片 5
前面提到管道会处理上百条流,早期 Flume 的架构,其实每个 Flume 节点,很难去感应它自己处理的进度。同时,Flume 也没办法做到全局进度的处理。但是基于 Flink,就可以通过 Watermark 的机制来解决。
首先在 Source 会基于消息当中的 Eventime 来生成 Watermark,Watermark 会经过每一层的处理传递到 Sink,最后会通过 Commiter 模块,以单线程的方式来汇总所有 Watermark 消息的进度。当它发现全局 Watermark 已经推进到下个小时的分区的时候,它会下发一条消息到 Hive MetStore,或者是写入到 Kafka, 来通知上小时分区数据 ready,从而可以让下游的调度可以更快的通过消息驱动的方式来拉起作业的运行。
图片 4
下图右侧其实是整个 cdc 管道完整的链路。要实现 MySQL 数据到 Hive 数据的完整映射,就需要解决流和批处理的问题。
首先是通过 Datax 将 MySQL 的数据全量一次性同步到的 HDFS。紧接着通过 spark 的 job,将数据初始化成 HUDI 的初始快照,接着通过 Canal 来实现将 Mysql 的 binlog 的数据拖到的 Kafka 的 topic,然后是通过 Flink 的 Job 将初始化快照的数据结合增量的数据进行增量更新,最后形成 HUDI 表。
整个链路是要解决数据的不丢不重,重点是针对 Canal 写 Kafka 这块,开了事务的机制,保证数据落 Kafka topic 的时候,可以做到数据在传输过程当中的不丢不重。另外,数据在传输的上层其实也有可能出现数据的重复和丢失,这时候更多是通过全局唯一 id 加毫秒级的时间戳。在整个流式 Job 中,针对全局 id 来做数据的去重,针对毫秒级时间来做数据的排序,这样能保证数据能够有序的更新到的 HUDI。
紧接着通过 Trace 的系统基于 Clickhouse 来做存储,来统计各个节点数据的进出条数来做到数据的精确对比。
图片 12
前面提到,改造成 Flink 之后,我们是做了每分钟的 Checkpoint,文件数的放大非常严重。主要是在整个 DAG 当中去引入 merge 的 operater 来实现文件的合并,merge 的合并方式主要是基于并发度横向合并,一个 writer 会对应一个 merge。这样每五分钟的 Checkpoint,1 小时的 12 个文件,都会进行合并。通过种方式的话,可以将文件数极大的控制在合理的范围内。
图片 6
实际运作过程当中经常会遇到整个作业堆积比较严重的问题,实际分析其实主是和 HDFS 通信有很大的关系。
其实 HDFS 通讯,梳理了四个关键的步骤:初始化 state、Invoke、Snapshot 以及 Notify Checkpoint complete。
核心问题主要发生在 Invoke 阶段,Invoke 会达到文件的滚动条件,这时候会触发 flush 和 close。close 实际和 NameNode 通信的时候,会经常出现堵塞的情况。
Snapshot 阶段同样会遇到一个问题,一个管道上百条流一旦触发 Snapshot,串行执行 flush 和 close 也会非常的慢。
核心优化集中在三个方面:
图片 7
实际在管道多条流的情况下,有些流的数据并不是每个小时都是连续的。
这种情况会带来分区,它的 Watermark 没有办法正常推进,引发空分区的问题。所以我们在管道的运行过程当中,引入 PartitionRecover 模块,它会根据 Watermark 来推进分区的通知。针对有些流的 Watermark,如果在 ideltimeout 还没有更新的情况下,Recover 模块来进行分区的追加。它会在每个分区的末尾到达的时候,加上 delay time 来扫描所有流的 Watermark,由此来进行兜底。
在传输过程当中,当 Flink 作业重启的时候,会遇到一波僵尸的文件,我们是通过在 DAG 的 commit 的节点,去做整个分区通知前的僵尸文件的清理删除,来实现整个僵尸文件的清理,这些都属于非功能性层面的一些优化。
图片 8
下图是 AI 方向在实时架构完整的时间线。
图片 9
回顾一下整个 AI 工程,它的早期的架构图其实体现的是整个 AI 在 2019 年初的架构视图,其本质是通过一些 single task 的方式,各种混合语言来组成的一些计算节点,来支撑着整个模型训练的链路拉起。经过 2019 年的迭代,将整个近线的训练完全的替换成用 BSQL 的模式来进行开发和迭代。
图片 11
在 2019 年底,其实又遇到了一些新的问题,这些问题主要集中在功能和非功能两个维度上。
图片 12
归根结底,集中在三个方面:
完整的实验链路,背后其实是包含实时的一条工程加离线的一条工程链路组成,线上的问题很难去排查。
图片 13
在这样的一些痛点下,在 20 年主要是集中在 AI 方向上去打造实时工程的雏形。核心是通过下面三个方面来进行突破。
图片 14
我们在特征工程中遇到了一些难点。
图片 15
这里看一下我们怎么去做实时特征,图中的右侧是最典型的一些场景。比如说我要实时统计用户最近一分钟、6 小时、12 小时、24 小时,对各个 UP 主相关视频的播放次数。针对这样场景,其实里面有两个点:
在两个痛点下,针对滑动窗口,主要是改造成为 Group By 的模式,加上 agg 的 UDF 的模式,将整个一小时、六小时、十二小时、二十四小时的一些窗口数据,存放到整个 Rocksdb 当中。这样通过 UDF 模式,整个数据触发机制就可以基于 Group By 实现记录级的触发,整个语义、时效性都会提升的比较大。同时在整个 AGG 的 UDF 函数当中,通过 Rocksdb 来做 state,在 UDF 当中来维护数据的生命周期。此外还扩展了整个 SQL 实现了数组级别的维表查询。最后的整个效果其实可以在实时特征的方向上,通过超大窗口的模式来支持各种计算场景。
图片 16
接下来看一下离线,左侧视图上半部分是完整的实时特征的计算链路,可以看出要解决同样的一条 SQL,在离线的计算上也能够复用,那就需要去解决相应的一些计算的 IO 都能够复用的问题。比如在流式上是通过 Kafka 来进行数据的输入,在离线上需要通过 HDFS 来做数据的输入。在流式上是通过 KFC 或者 AVBase 等等的一些 kv 引擎来支持,在离线上就需要通过 hive 引擎来解决,归根结底,其实需要去解决三个方面的问题:
图片 17
分区有序的方案其实主要是基于数据在落 HDFS 时候,前置做了一些改造。首先数据在落 HDFS 之前,是传输的管道,通过 Kafka 消费数据。在 Flink 的作业从 Kafka 拉取数据之后,通过 Eventtime 去提取数据的 watermark,每一个 Kafka Source 的并发度会将 watermark 汇报到 JobManager 当中的 GlobalWatermark 模块,GlobalAgg 会汇总来自每一个并发度 Watermark 推进的进度,从而去统计 GlobalWatermark 的进展。根据 GlobalWatermark 的进展来计算出当中有哪些并发度的 Watermark 计算过快的问题,从而通过 GlobalAgg 下发给 Kafka Source 控制信息,Kafka Source 有些并发度过快的情况下,它的整个分区推进就降低速度。这样,在 HDFS Sink 模块,在同时间片上收到的数据记录的整个 Event time 基本上有序的,最终落到 HDFS 还会在文件名上去标识它相应的分区以及相应的时间片范围。最后在 HDFS 分区目录下,就可以实现数据分区的有序目录。
图片 18
数据在 HDFS 增量有序之后,实现了 HDFStreamingSource,它会针对文件做 Fecher 分区,针对每个文件都有 Fecher 的线程,且每个 Fecher 线程会统计每一个文件。它 offset 处理了游标的进度,会将状态根据 Checkpoint 的过程,将它更新到的 State 当中。
这样就可以实现整个文件消费的有序推进。在回溯历史数据的时候,离线作业就会涉及到整个作业的停止。实际是在整个 FileFetcher 的模块当中去引入一个分区结束的标识,且会在每一个线程去统计每一个分区的时候,去感应它分区的结束,分区结束后的状态最后汇总到的 cancellationManager,并进一步会汇总到 Job Manager 去更新全局分区的进度,当全局所有的分区都到了末尾的游标时候,会将整个 Flink 作业进行 cancel 关闭掉。
图片 19
前面讲到整个离线数据,其实数据都在 hive 上,hive 的 HDFS 表数据的整个表字段信息会非常的多,但实际做离线特征的时候,需要的信息其实是很少的,因此需要在 hive 的过程先做离线字段裁剪,将一张 ODS 的表清洗成 DW 的表,DW 的表会最后通过 Flink 运行 Job,内部会有个 reload 的 scheduler,它会定期的去根据数据当前推进的 Watermark 的分区,去拉取在 hive 当中每一个分区对应的表信息。通过去下载某 HDFS 的 hive 目录当中的一些数据,最后会在整个内存当中 reload 成 Rocksdb 的文件,Rocksdb 其实就是最后用来提供维表 KV 查询的组件。
组件里面会包含多个 Rocksdb 的 build 构建过程,主要是取决于整个数据流动的过程当中的 Eventtime,如果发现 Eventtime 推进已经快到小时分区结束的末尾时候,会通过懒加载的模式去主动 reload,构建下一个小时 Rocksdb 的分区,通过这种方式,来切换整个 Rocksdb 的读取。
图片 1
在上面三个优化,也就是分区有序增量,类 Kafka 分区 Fetch 消费,以及维表 Snapshot 的基础下,最终是实现了实时特征和离线特征,共用一套 SQL 的方案,打通了特征的流批计算。紧接着来看一下整个实验,完整的流批一体的链路,从图中可以看出最上面的粒度是整个离线的完整的计算过程。第二是整个近线的过程,离线过程其实所用计算的语义都是和近线过程用实时消费的语义是完全一致的,都是用 Flink 来提供 SQL 计算的。
来看一下近线,其实 Label join 用的是 Kafka 的一条点击流以及展现流,到了整个离线的计算链路,则用的一条 HDFS 点击的目录和 HDFS 展现目录。特征数据处理也是一样的,实时用的是 Kafka 的播放数据,以及 Hbase 的一些稿件数据。对于离线来说,用的是 hive 的稿件数据,以及 hive 的播放数据。除了整个离线和近线的流批打通,还将整个近线产生的实时的数据效果汇总到 OLAP 引擎上,通过 superset 来提供整个实时的指标可视化。其实从图可以看出完整的复杂流批一体的计算链路,当中包含的计算节点是非常的复杂和庞多的。
图片 2
下阶段挑战更多是在实验协作上,下图是将前面整个链路进行简化后的抽象。从图中可以看出,三个虚线的区域框内,分别是离线的链路加两个实时的链路,三个完整的链路构成作业的流批,实际上就是一个工作流最基本的过程。里面需要去完成工作流完整的抽象,包括了流批事件的驱动机制,以及,对于算法在 AI 领域上更多希望用 Python 来定义完整的 flow,此外还将整个输入,输出以及它的整个计算趋于模板化,这样可以做到方便整个实验的克隆。
图片 3
整个工作流上在下半年更多是和社区合作,引入了 AIFlow 的整套方案。
右侧其实是整个 AIFlow 完整链路的 DAG 视图,可以看出整个节点,其实它支持的类型是没有任何限制的,可以是流式节点,也可以是离线节点。此外的话,整个节点与节点之间通信的边是可以支持数据驱动以及事件驱动的。引入 AIFlow 的好处主要在于,AIFlow 提供基于 Python 语义来方便去定义完整的 AIFlow 的工作流,同时还包括整个工作流的进度的调度。
在节点的边上,相比原生的业界的一些 Flow 方案,他还支持基于事件驱动的整个机制。好处是可以帮助在两个 Flink 作业之间,通过 Flink 当中 watermark 处理数据分区的进度去下发一条事件驱动的消息来拉起下一个离线或者实时的作业。
此外还支持周边的一些配套服务,包括通知的一些消息模块服务,还有元数据的服务,以及在 AI 领域一些模型中心的服务。
图片 4
来看一下基于 AIFlow 是如何最终定义成 Python 的工作流。右边的视图是一个线上项目的完整工作流的定义。第一、是整个是 Spark job 的定义,当中通过配置 dependence 来描述整个下游的依赖关系,它会下发一条事件驱动的消息来拉起下面的 Flink 流式作业。流式作业也同样可以通过消息驱动的方式来拉起下面的 Spark 作业。整个语义的定义非常的简单,只需要四个步骤,配置每节点的 confg 的信息,以及定义每节点的 operation 的行为,还有它的 dependency 的依赖,最后去运行整个 flow 的拓扑视图。
图片 6
接下来看一下完整的流批调度的驱动机制,下图右侧是完整的三个工作节点的驱动视图。第一个是从 Source 到 SQL 到 Sink。引入的黄色方框是扩展的 supervisor,他可以收集全局的 watermark 进度。当整个流式作业发现 watermark 可以推进到下一个小时的分区的时候,它会下发一条消息,去给到 NotifyService。NotifyService 拿到这条消息之后,它会去下发给到下一个作业,下一个作业主要会在整个 Flink 的 DAG 当中去引入 flow 的 operator,operator 在没有收到上个作业下发了消息之前,它会堵塞整个作业的运行。直到收到消息驱动之后,就代表上游其实上一个小时分区已经完成了,这时下个 flow 节点就可以驱动拉起来运作。同样,下个工作流节点也引入了 GlobalWatermark Collector 的模块来汇总收集它的处理的进度。当上一个小时分区完成之后,它也会下发一条消息到 NotifyService,NotifyService 会将这条消息去驱动调用 AIScheduler 的模块,从而去拉起 spark 离线作业来做 spark 离线的收尾。从里你们可以看出,整个链路其实是支持批到批,批到流以及流到流,以及流到批的四个场景。
图片 7
在流和批的整个 flow 定义和调度的基础上,在 2020 年初步构建出来了实时 AI 全链路的雏形,核心是面向实验。算法同学也可以基于 SQL 来开发的 Node 的节点,Python 是可以定义完整的 DAG 工作流。监控,告警以及运维是一体化的。
同时,支持从离线到实时的打通,从数据处理到模型训练,从模型训练到实验效果的打通,以及面向端到端的打通。右侧是整个近线实验的链路。下面是将整个实验链路产出的物料数据提供给在线的预测训练的服务。整体会有三个方面的配套:
图片 8
在未来的一年,我们还会更加集中在两个方面的一些工作。
图片 10
领取专属 10元无门槛券
私享最新 技术干货