一、
除了日志数据,关系数据库中的数据也是数据分析的重要来源。在数据的采集方式上,用Spark
实现类 Sqoop 的分布式
抓取替代了早期定期用单机全量抓取 MySQL 数据表
的方式,有效的提升了抓取速度,突破了单机瓶颈。
再之后为了减少MySQL
压力,选用Canal
来接收MySQL binlog
,离线 merge 出全量表,这样就不再直接读 MySQL
了,而且对千万/亿级大表
的处理速度也会更快。
二、数据传输——Kafka
做消息总线连接在线和离线系统
数据在客户端向服务端回传或者直接在服务端产生时,可以认为是在线状态。当数据落地到统计分析相关的基础设施时,就变成离线的状态了。在线系统和离线系统采用消息队列来连接。
数据传输以Kafka
作为数据总线,所有实时和离线数据的接入都要通过 Kafka
,包括日志、binlog
等。这里值得注意的是:尽早引入消息队列,与业务系统解耦
。
因为以目前的数据和集群规模,直接使用社区版本乃至企业版的产品,都会遇到大量困难。像数据接入,就使用自研 Databus,作为单机 Agent,封装 Kafka 写入,提供异步写入、buffer、统一配置等 feature。
Kafka 数据通过 Dump 落地到 HDFS,供后续离线处理使用。随着数据规模的增加,Dump 的实现也经历了几个阶段。最初实现用的是类似 Flume 模式的单机上传,很快遇到了瓶颈,实现改成了通过 Storm 来实现多机分布式的上传,支持的数据吞吐量大幅增加。
现在开发了一个服务,作为托管服务方便整合到平台工具上,底层实现切换到了 SparkStreaming
,并实现了 exactly-once 语义
,保证 Dump
数据不丢不重。
三、数据入库—数据仓库、ETL
数据仓库中数据表的元信息都放在 Hivemetastore
里,数据表在 HDFS
上的存储格式以Parquet
为主,这是一种列式存储格式,对于嵌套数据结构的支持也很好。
有多种 ETL 的实现模式在并存,对于底层数据构建,一种选择是使用 Python 通过 HadoopStreaming 来实现 Map Reduce 的任务,但现在更倾向于使用 Spark 直接生成 Parquet 数据,Spark 相比 MapReduce 有更丰富的处理原语,代码实现可以更简洁,也减少了中间数据的落地量。对于高层次的数据表,会直接使用 HiveSQL 来描述 ETL 过程。
四、数据计算——计算引擎的演进
数据仓库中的数据表如何能被高效的查询很关键,因为这会直接关系到数据分析的效率。常见的查询引擎可以归到三个模式中:Batch 类、MPP 类、Cube 类
。
Hive
是一个很稳定的选择,但速度一般。
为了更好的支持 Adhoc 交互式查询
,调研 MPP 类
查询引擎,先后使用过 Impala 和 Presto
,但在超大数据量级下都遇到了稳定性的问题。
现在的方案是混合使用 Spark SQL 和 Hive
,并自研 查询分析系统,自动分析并分发查询 SQL 到适合的查询引擎。在Cube
类查询引擎上,采用了Kylin
。