文档大纲:
本文上半部分之前已经发过了,传送门:50000字,数仓建设保姆级教程,离线和实时一网打尽(理论+实战) 上
此篇文章是整个文档的下半部分,将接着上半部分从第五章开始。
虽然实时计算在最近几年才火起来,但是在早期也有部分公司有实时计算的需求,但是数据量比较少,所以在实时方面形成不了完整的体系,基本所有的开发都是具体问题具体分析,来一个需求做一个,基本不考虑它们之间的关系,开发形式如下:
早期实时计算
如上图所示,拿到数据源后,会经过数据清洗,扩维,通过Flink进行业务逻辑处理,最后直接进行业务输出。把这个环节拆开来看,数据源端会重复引用相同的数据源,后面进行清洗、过滤、扩维等操作,都要重复做一遍,唯一不同的是业务的代码逻辑是不一样的。
随着产品和业务人员对实时数据需求的不断增多,这种开发模式出现的问题越来越多:
大家看实时数仓的发展和出现的问题,和离线数仓非常类似,后期数据量大了之后产生了各种问题,离线数仓当时是怎么解决的?离线数仓通过分层架构使数据解耦,多个业务可以共用数据,实时数仓是否也可以用分层架构呢?当然是可以的,但是细节上和离线的分层还是有一些不同,稍后会讲到。
从方法论来讲,实时和离线是非常相似的,离线数仓早期的时候也是具体问题具体分析,当数据规模涨到一定量的时候才会考虑如何治理。分层是一种非常有效的数据治理方式,所以在实时数仓如何进行管理的问题上,首先考虑的也是分层的处理逻辑。
实时数仓的架构如下图:
从上图中我们具体分析下每层的作用:
我们可以看出,实时数仓和离线数仓的分层非常类似,比如 数据源层,明细层,汇总层,乃至应用层,他们命名的模式可能都是一样的。但仔细比较不难发现,两者有很多区别:
Lambda和Kappa架构的概念已在前文中解释,不了解的小伙伴可点击链接:一文读懂大数据实时计算
下图是基于 Flink 和 Kafka 的 Lambda 架构的具体实践,上层是实时计算,下层是离线计算,横向是按计算引擎来分,纵向是按实时数仓来区分:
Lambda架构是比较经典的架构,以前实时的场景不是很多,以离线为主,当附加了实时场景后,由于离线和实时的时效性不同,导致技术生态是不一样的。Lambda架构相当于附加了一条实时生产链路,在应用层面进行一个整合,双路生产,各自独立。这在业务应用中也是顺理成章采用的一种方式。
双路生产会存在一些问题,比如加工逻辑double,开发运维也会double,资源同样会变成两个资源链路。因为存在以上问题,所以又演进了一个Kappa架构。
Kappa架构相当于去掉了离线计算部分的Lambda架构,具体如下图所示:
Kappa架构从架构设计来讲比较简单,生产统一,一套逻辑同时生产离线和实时。但是在实际应用场景有比较大的局限性,因为实时数据的同一份表,会使用不同的方式进行存储,这就导致关联时需要跨数据源,操作数据有很大局限性,所以在业内直接用Kappa架构生产落地的案例不多见,且场景比较单一。
关于 Kappa 架构,熟悉实时数仓生产的同学,可能会有一个疑问。因为我们经常会面临业务变更,所以很多业务逻辑是需要去迭代的。之前产出的一些数据,如果口径变更了,就需要重算,甚至重刷历史数据。对于实时数仓来说,怎么去解决数据重算问题?
Kappa 架构在这一块的思路是:首先要准备好一个能够存储历史数据的消息队列,比如 Kafka,并且这个消息队列是可以支持你从某个历史的节点重新开始消费的。接着需要新起一个任务,从原来比较早的一个时间节点去消费 Kafka 上的数据,然后当这个新的任务运行的进度已经能够和现在的正在跑的任务齐平的时候,你就可以把现在任务的下游切换到新的任务上面,旧的任务就可以停掉,并且原来产出的结果表也可以被删掉。
随着实时 OLAP 技术的发展,目前开源的OLAP引擎在性能,易用等方面有了很大的提升,如Doris、Presto等,加上数据湖技术的迅速发展,使得流批结合的方式变得简单。
如下图是流批结合的实时数仓:
数据从日志统一采集到消息队列,再到实时数仓,作为基础数据流的建设是统一的。之后对于日志类实时特征,实时大屏类应用走实时流计算。对于Binlog类业务分析走实时OLAP批处理。
我们看到流批结合的方式与上面几种架构的存储方式发生了变化,由Kafka换成了Iceberg,Iceberg是介于上层计算引擎和底层存储格式之间的一个中间层,我们可以把它定义成一种“数据组织格式”,底层存储还是HDFS,那么为什么加了中间层,就对流批结合处理的比较好了呢?Iceberg的ACID能力可以简化整个流水线的设计,降低整个流水线的延迟,并且所具有的修改、删除能力能够有效地降低开销,提升效率。Iceberg可以有效支持批处理的高吞吐数据扫描和流计算按分区粒度并发实时处理。
注:本小节内容来自大数据技术与数仓
实时数仓主要解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析,实时大屏展示,实时监控报警各个场景。虽然关于实时数仓架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。接下来主要介绍Flink SQL从0到1搭建一个实时数仓的demo,涉及到数据采集、存储、计算、可视化整个流程。
本文以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成。
具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入Kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行join,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
`order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`operate_time` datetime DEFAULT NULL COMMENT '操作时间',
`expire_time` datetime DEFAULT NULL COMMENT '失效时间',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
`province_id` int(20) DEFAULT NULL COMMENT '地区',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
CREATE TABLE `order_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';
CREATE TABLE `sku_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
`spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
`price` decimal(10,0) DEFAULT NULL COMMENT '价格',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
`sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述',
`weight` decimal(10,2) DEFAULT NULL COMMENT '重量',
`tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',
`category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类id(冗余)',
`sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';
CREATE TABLE `base_category1` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`name` varchar(10) NOT NULL COMMENT '分类名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';
CREATE TABLE `base_category2` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`name` varchar(200) NOT NULL COMMENT '二级分类名称',
`category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';
CREATE TABLE `base_category3` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`name` varchar(200) NOT NULL COMMENT '三级分类名称',
`category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';
CREATE TABLE `base_province` (
`id` int(20) DEFAULT NULL COMMENT 'id',
`name` varchar(20) DEFAULT NULL COMMENT '省名称',
`region_id` int(20) DEFAULT NULL COMMENT '大区id',
`area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `base_region` (
`id` int(20) NOT NULL COMMENT '大区id',
`region_name` varchar(20) DEFAULT NULL COMMENT '大区名称',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
关于ODS层的数据同步这里就不详细展开。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:
本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表和商品维表。处理过程如下:
首先将mydw.base_province
和mydw.base_region
这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:
-- -------------------------
-- 省份
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
`id` INT,
`name` STRING,
`region_id` INT ,
`area_code`STRING
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_province',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 省份
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
`id` INT,
`name` STRING,
`region_id` INT ,
`area_code`STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_province', -- MySQL中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 省份
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_province
SELECT *
FROM ods_base_province;
-- -------------------------
-- 区域
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
`id` INT,
`region_name` STRING
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_region',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 区域
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
`id` INT,
`region_name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_region', -- MySQL中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 区域
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_region
SELECT *
FROM ods_base_region;
经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province
作为维表:
-- ---------------------------------
-- DIM层,区域维表,
-- 在MySQL中创建视图
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
bp.id AS province_id,
bp.name AS province_name,
br.id AS region_id,
br.region_name AS region_name,
bp.area_code AS area_code
FROM base_region br
JOIN base_province bp ON br.id= bp.region_id;
这样我们所需要的维表:dim_province
就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:
-- -------------------------
-- 一级类目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
`id` BIGINT,
`name` STRING
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category1',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 一级类目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
`id` BIGINT,
`name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category1', -- MySQL中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 一级类目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category1
SELECT *
FROM ods_base_category1;
-- -------------------------
-- 二级类目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
`id` BIGINT,
`name` STRING,
`category1_id` BIGINT
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category2',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 二级类目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
`id` BIGINT,
`name` STRING,
`category1_id` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category2', -- MySQL中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 二级类目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;
-- -------------------------
-- 三级类目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
`id` BIGINT,
`name` STRING,
`category2_id` BIGINT
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category3',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 三级类目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
`id` BIGINT,
`name` STRING,
`category2_id` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category3', -- MySQL中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 三级类目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;
-- -------------------------
-- 商品表
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
`id` BIGINT,
`spu_id` BIGINT,
`price` DECIMAL(10,0),
`sku_name` STRING,
`sku_desc` STRING,
`weight` DECIMAL(10,2),
`tm_id` BIGINT,
`category3_id` BIGINT,
`sku_default_img` STRING,
`create_time` TIMESTAMP(0)
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.sku_info',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 商品表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
`id` BIGINT,
`spu_id` BIGINT,
`price` DECIMAL(10,0),
`sku_name` STRING,
`sku_desc` STRING,
`weight` DECIMAL(10,2),
`tm_id` BIGINT,
`category3_id` BIGINT,
`sku_default_img` STRING,
`create_time` TIMESTAMP(0),
PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'sku_info', -- MySQL中的待插入数据的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 商品
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;
经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info
,用作后续使用的维表。
-- ---------------------------------
-- DIM层,商品维表,
-- 在MySQL中创建视图
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
si.id AS id,
si.sku_name AS sku_name,
si.category3_id AS c3_id,
si.weight AS weight,
si.tm_id AS tm_id,
si.price AS price,
si.spu_id AS spu_id,
c3.name AS c3_name,
c2.id AS c2_id,
c2.name AS c2_name,
c3.id AS c1_id,
c3.name AS c1_name
FROM
(
sku_info si
JOIN base_category3 c3 ON si.category3_id = c3.id
JOIN base_category2 c2 ON c3.category2_id =c2.id
JOIN base_category1 c1 ON c2.category1_id = c1.id
);
至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。
经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:
-- -------------------------
-- 订单详情
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
`id` BIGINT,
`order_id` BIGINT,
`sku_id` BIGINT,
`sku_name` STRING,
`img_url` STRING,
`order_price` DECIMAL(10,2),
`sku_num` INT,
`create_time` TIMESTAMP(0)
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.order_detail',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 订单信息
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
`id` BIGINT,
`consignee` STRING,
`consignee_tel` STRING,
`total_amount` DECIMAL(10,2),
`order_status` STRING,
`user_id` BIGINT,
`payment_way` STRING,
`delivery_address` STRING,
`order_comment` STRING,
`out_trade_no` STRING,
`trade_body` STRING,
`create_time` TIMESTAMP(0) ,
`operate_time` TIMESTAMP(0) ,
`expire_time` TIMESTAMP(0) ,
`tracking_no` STRING,
`parent_order_id` BIGINT,
`img_url` STRING,
`province_id` INT
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.order_info',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- ---------------------------------
-- DWD层,支付订单明细表dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,0),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD层,已支付订单明细表
-- 向dwd_paid_order_detail装载数据
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECT *
FROM ods_order_info
WHERE order_status = '2' -- 已支付
) oi JOIN
(
SELECT *
FROM ods_order_detail
) od
ON oi.id = od.order_id;
经过上面的步骤,我们创建了一张dwd_paid_order_detail
明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。
首先在MySQL中创建对应的ADS目标表:ads_province_index
CREATE TABLE ads.ads_province_index(
province_id INT(10),
area_code VARCHAR(100),
province_name VARCHAR(100),
region_id INT(10),
region_name VARCHAR(100),
order_amount DECIMAL(10,2),
order_count BIGINT(10),
dt VARCHAR(100),
PRIMARY KEY (province_id, dt)
) ;
向MySQL的ADS层目标装载数据:
-- Flink SQL Cli操作
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
-- 2.每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index(
province_id INT,
area_code STRING,
province_name STRING,
region_id INT,
region_name STRING,
order_amount DECIMAL(10,2),
order_count BIGINT,
dt STRING,
PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/ads',
'table-name' = 'ads_province_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,2),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------
CREATE TABLE tmp_province_index(
province_id INT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
pay_date DATE
)WITH (
'connector' = 'kafka',
'topic' = 'tmp_province_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
province_id,
count(distinct order_id) order_count,-- 订单数
sum(order_price * sku_num) order_amount, -- 订单金额
TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_province_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
province_id INT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
pay_date DATE,
proctime as PROCTIME() -- 通过计算列产生一个处理时间列
) WITH (
'connector' = 'kafka',
'topic' = 'tmp_province_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM层,区域维表,
-- 创建区域维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
province_id INT,
province_name STRING,
area_code STRING,
region_id INT,
region_name STRING ,
PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'dim_province',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_province_index装载数据
-- 维表JOIN
-- ---------------------------------
INSERT INTO ads_province_index
SELECT
pc.province_id,
dp.area_code,
dp.province_name,
dp.region_id,
dp.region_name,
pc.order_amount,
pc.order_count,
cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp
ON dp.province_id = pc.province_id;
当提交任务之后:观察Flink WEB UI:
查看ADS层的ads_province_index表数据:
首先在MySQL中创建对应的ADS目标表:ads_sku_index
CREATE TABLE ads_sku_index
(
sku_id BIGINT(10),
sku_name VARCHAR(100),
weight DOUBLE,
tm_id BIGINT(10),
price DOUBLE,
spu_id BIGINT(10),
c3_id BIGINT(10),
c3_name VARCHAR(100) ,
c2_id BIGINT(10),
c2_name VARCHAR(100),
c1_id BIGINT(10),
c1_name VARCHAR(100),
order_amount DOUBLE,
order_count BIGINT(10),
sku_count BIGINT(10),
dt varchar(100),
PRIMARY KEY (sku_id,dt)
);
向MySQL的ADS层目标装载数据:
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
-- 2.每天每个商品对应的订单金额
-- 3.每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
sku_id BIGINT,
sku_name VARCHAR,
weight DOUBLE,
tm_id BIGINT,
price DOUBLE,
spu_id BIGINT,
c3_id BIGINT,
c3_name VARCHAR ,
c2_id BIGINT,
c2_name VARCHAR,
c1_id BIGINT,
c1_name VARCHAR,
order_amount DOUBLE,
order_count BIGINT,
sku_count BIGINT,
dt varchar,
PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/ads',
'table-name' = 'ads_sku_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,2),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
sku_id BIGINT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
order_sku_num BIGINT,
pay_date DATE
)WITH (
'connector' = 'kafka',
'topic' = 'tmp_sku_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
sku_id,
count(distinct order_id) order_count,-- 订单数
sum(order_price * sku_num) order_amount, -- 订单金额
sum(sku_num) order_sku_num,
TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_sku_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
sku_id BIGINT,
order_count BIGINT,-- 订单数
order_amount DECIMAL(10,2), -- 订单金额
order_sku_num BIGINT,
pay_date DATE,
proctime as PROCTIME() -- 通过计算列产生一个处理时间列
) WITH (
'connector' = 'kafka',
'topic' = 'tmp_sku_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM层,商品维表,
-- 创建商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
id BIGINT,
sku_name STRING,
c3_id BIGINT,
weight DECIMAL(10,2),
tm_id BIGINT,
price DECIMAL(10,2),
spu_id BIGINT,
c3_name STRING,
c2_id BIGINT,
c2_name STRING,
c1_id BIGINT,
c1_name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'dim_sku_info',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_sku_index装载数据
-- 维表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
sku_id ,
sku_name ,
weight ,
tm_id ,
price ,
spu_id ,
c3_id ,
c3_name,
c2_id ,
c2_name ,
c1_id ,
c1_name ,
sc.order_amount,
sc.order_count ,
sc.order_sku_num ,
cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc
JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
ON ds.id = sc.sku_id;
当提交任务之后:观察Flink WEB UI:
查看ADS层的ads_sku_index表数据:
数仓建设真正的难点不在于数仓设计,而在于后续业务发展起来,业务线变的庞大之后的数据治理,包括资产治理、数据质量监控、数据指标体系的建设等。
其实数据治理的范围很⼴,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在DAMA 数据管理知识体系指南中,数据治理位于数据管理“车轮图”的正中央,是数据架构、数据建模、数据存储、数据安全、数据质量、元数据管理、主数据管理等10大数据管理领域的总纲,为各项数据管理活动提供总体指导策略。
为发挥数据价值需要满足三个要素:合理的平台架构、完善的治理服务、体系化的运营手段。
根据企业的规模、所属行业、数据量等情况选择合适的平台架构;治理服务需要贯穿数据全生命周期,保证数据在采集、加工、共享、存储、应用整个过程中的完整性、准确性、一致性和实效性;运营手段则应当包括规范的优化、组织的优化、平台的优化以及流程的优化等等方面。
数据治理需要循序渐进,但在建设初期至少需要关注三个方面:数据规范、数据质量、数据安全。规范化的模型管理是保障数据可以被治理的前提条件,高质量的数据是数据可用的前提条件,数据的安全管控是数据可以共享交换的前提条件。
数据治理不是一堆规范文档的堆砌,而是需要将治理过程中所产生的的规范、流程、标准落地到IT平台上,在数据生产过程中通过“以终为始”前向的方式进行数据治理,避免事后稽核带来各种被动和运维成本的增加。
数据治理的本质是管理数据,因此需要加强元数据管理和主数据管理,从源头治理数据,补齐数据的相关属性和信息,比如:元数据、质量、安全、业务逻辑、血缘等,通过元数据驱动的方式管理数据生产、加工和使用。
数据模型血缘与任务调度的一致性是建管一体化的关键,有助于解决数据管理与数据生产口径不一致的问题,避免出现两张皮的低效管理模式。
如上面所说,数据治理的范围非常广,其中最重要的是数据质量治理,而数据质量涉及的范围也很广,贯穿数仓的整个生命周期,从数据产生->数据接入->数据存储->数据处理->数据输出->数据展示,每个阶段都需要质量治理,评价维度包括完整性、规范性、一致性、准确性、唯一性、关联性等。
在系统建设的各个阶段都应该根据标准进行数据质量检测和规范,及时进行治理,避免事后的清洗工作。
质量检测可参考以下维度:
维度 | 衡量标准 |
---|---|
完整性 | 业务指定必须的数据是否缺失,不允许为空字符或者空值等。例如,数据源是否完整、维度取值是否完整、数据取值是否完整等 |
时效性 | 当需要使用时,数据能否反映当前事实。即数据必须及时,能够满足系统对数据时间的要求。例如处理(获取、整理、清洗、加载等)的及时性 |
唯一性 | 在指定的数据集中数据值是否唯一 |
参照完整性 | 数据项是否在父表中有定义 |
依赖一致性 | 数据项取值是否满足与其他数据项之间的依赖关系 |
正确性 | 数据内容和定义是否一致 |
精确性 | 数据精度是否达到业务规则要求的位数 |
技术有效性 | 数据项是否按已定义的格式标准组织 |
业务有效性 | 数据项是否符合已定义的 |
可信度 | 根据客户调查或客户主动提供获得 |
可用性 | 数据可用的时间和数据需要被访问时间的比例 |
可访问性 | 数据是否便于自动化读取 |
下面是根据美团的技术文章总结的几点具体治理方式:
规范是数仓建设的保障。为了避免出现指标重复建设和数据质量差的情况,统一按照最详细、可落地的方法进行规范建设。
(1) 词根
词根是维度和指标管理的基础,划分为普通词根与专有词根,提高词根的易用性和关联性。
(2) 表命名规范
通用规范
表命名规则
统一的表命名规范
(3) 指标命名规范
结合指标的特性以及词根管理规范,将指标进行结构化处理。
3.日期修饰词,用于修饰业务发生的时间区间。
4.聚合修饰词,对结果进行聚集操作。
5.基础指标,单一的业务修饰词+基础指标词根构建基础指标 ,例如:交易金额-trade_amt。
6.派生指标,多修饰词+基础指标词根构建派生指标。派生指标继承基础指标的特性,例如:安装门店数量-install_poi_cnt。
7.普通指标命名规范,与字段命名规范一致,由词汇转换即可以。
(1) 数据分层
优秀可靠的数仓体系,往往需要清晰的数据分层结构,即要保证数据层的稳定又要屏蔽对下游的影响,并且要避免链路过长,一般的分层架构如下:
(2) 数据流向
稳定业务按照标准的数据流向进行开发,即ODS-->DWD-->DWA-->APP。非稳定业务或探索性需求,可以遵循ODS->DWD->APP或者ODS->DWD->DWT->APP两个模型数据流。在保障了数据链路的合理性之后,又在此基础上确认了模型分层引用原则:
元数据可分为技术元数据和业务元数据:
技术元数据为开发和管理数据仓库的IT 人员使用,它描述了与数据仓库开发、管理和维护相关的数据,包括数据源信息、数据转换描述、数据仓库模型、数据清洗与更新规则、数据映射和访问权限等。
常见的技术元数据有:
业务元数据为管理层和业务分析人员服务,从业务角度描述数据,包括商务术语、数据仓库中有什么数据、数据的位置和数据的可用性等,帮助业务人员更好地理解数据仓库中哪些数据是可用的以及如何使用。
元数据不仅定义了数据仓库中数据的模式、来源、抽取和转换规则等,而且是整个数据仓库系统运行的基础,元数据把数据仓库系统中各个松散的组件联系起来,组成了一个有机的整体。
元数据治理主要解决三个问题:
围绕数据安全标准,首先要有数据的分级、分类标准,确保数据在上线前有着准确的密级。第二,针对数据使用方,要有明确的角色授权标准,通过分级分类和角色授权,来保障重要数据拿不走。第三,针对敏感数据,要有隐私管理标准,保障敏感数据的安全存储,即使未授权用户绕过权限管理拿到敏感数据,也要确保其看不懂。第四,通过制定审计标准,为后续的审计提供审计依据,确保数据走不脱。
任何事物都具有一定的生命周期,数据也不例外。从数据的产生、加工、使用乃至消亡都应该有一个科学的管理办法,将极少或者不再使用的数据从系统中剥离出来,并通过核实的存储设备进行保留,不仅能够提高系统的运行效率,更好的服务客户,还能大幅度减少因为数据长期保存带来的储存成本。数据生命周期一般包含在线阶段、归档阶段(有时还会进一步划分为在线归档阶段和离线归档阶段)、销毁阶段三大阶段,管理内容包括建立合理的数据类别,针对不同类别的数据制定各个阶段的保留时间、存储介质、清理规则和方式、注意事项等。
从上图数据生命周期中各参数间的关系中我们可以了解到,数据生命周期管理可以使得高价值数据的查询效率大幅提升,而且高价格的存储介质的采购量也可以减少很多;但是随着数据的使用程度的下降,数据被逐渐归档,查询时间也慢慢的变长;最后随着数据的使用频率和价值基本没有了之后,就可以逐渐销毁了。
数据治理的范围非常广,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在这么多治理内容中,大家想下最重要的治理是什么?当然是数据质量治理,因为数据质量是数据分析结论有效性和准确性的基础,也是这一切的前提。所以如何保障数据质量,确保数据可用性是数据仓库建设中不容忽视的环节。
数据质量涉及的范围也很广,贯穿数仓的整个生命周期,从数据产生->数据接入->数据存储->数据处理->数据输出->数据展示,每个阶段都需要质量治理。
在系统建设的各个阶段都应该根据标准进行数据质量检测和规范,及时进行治理,避免事后的清洗工作。
本文档首发于公众号【五分钟学大数据】,完整的数据治理及数仓建设文章公众号上都有!
很多刚入门的数据人,拿到数据后会立刻开始对数据进行各种探查、统计分析等,企图能立即发现数据背后隐藏的信息和知识。然而忙活了一阵才颓然发现,并不能提炼出太多有价值的信息,白白浪费了大量的时间和精力。比如和数据打交道的过程中,可能会出现以下的场景:
场景一:作为数据分析人员,要统计一下近 7 天用户的购买情况,结果从数仓中统计完发现,很多数据发生了重复记录,甚至有些数据统计单位不统一。
场景二:业务看报表,发现某一天的成交 gmv 暴跌,经过排查发现,是当天的数据缺失。
造成这一情况的一个重要因素就是忽视了对数据质量的客观评估,没有制定合理的衡量标准,导致没有发现数据已出现问题。所以,进行科学、客观的数据质量衡量标准是非常必要且十分重要的。
如何评估数据质量的好坏,业界有不同的标准,我总结了以下六个维度进行评估,包括完整性、规范性、一致性、准确性、唯一性、及时性。
完整性指的是数据信息是否存在缺失的状况,数据缺失的情况可能是整个数据记录缺失,也可能是数据中某个字段信息的记录缺失。
规范性指的是描述数据遵循预定的语法规则的程度,是否符合其定义,比如数据的类型、格式、取值范围等。
一致性是指数据是否遵循了统一的规范,数据集合是否保持了统一的格式。数据质量的一致性主要体现在数据记录的规范和数据是否符合逻辑,一致性并不意味着数值上的绝对相同,而是数据收集、处理的方法和标准的一致。常见的一致性指标有:ID 重合度、属性一致、取值一致、采集方法一致、转化步骤一致。
准确性是指数据记录的信息是否存在异常或错误。和一致性不一样,存在准确性问题的数据不仅仅只是规则上的不一致,更为常见的数据准确性错误就如乱码,其次异常的大或者小的数据也是不符合条件的数据。常见的准确性指标有:缺失值占比、错误值占比、异常值占比、抽样偏差、数据噪声。
唯一性指的是数据库的数据不存在重复的情形。比如真实成交 1 万条,但数据表有 3000 条重复了,成了 1.3 万条成交记录,这种数据不符合数据唯一性。
及时性是指数据从产生到可以查看的时间间隔,也叫数据的延时时长。比如一份数据是统计离线今日的,结果都是第二天甚至第三天才能统计完,这种数据不符合数据及时性。
还有一些其他的衡量标准,在此简单列出:
维度 | 衡量标准 |
---|---|
参照完整性 | 数据项是否在父表中有定义 |
依赖一致性 | 数据项取值是否满足与其他数据项之间的依赖关系 |
正确性 | 数据内容和定义是否一致 |
精确性 | 数据精度是否达到业务规则要求的位数 |
技术有效性 | 数据项是否按已定义的格式标准组织 |
业务有效性 | 数据项是否符合已定义的 |
可信度 | 根据客户调查或客户主动提供获得 |
可用性 | 数据可用的时间和数据需要被访问时间的比例 |
可访问性 | 数据是否便于自动化读取 |
本节流程如下图所示:
根据当数据质量不满足完整性、规范性、一致性、准确性、唯一性、及时性时,对业务的影响程度大小来划分数据的资产等级。
重要程度:L1>L2>L3>L4>Lx。如果一份数据出现在多个应用场景中,则根据其最重要程度进行标记。
定义数据资产等级后,我们可以从数据流程链路开始进行数据资产等级标记,完成数据资产等级确认,给不同的数据定义不同的重要程度。
1. 分析数据链路:
数据是从业务系统中产生的,经过同步工具进入数据仓库系统中,在数据仓库中进行一般意义上的清洗、加工、整合、算法、模型等一系列运算后,再通过同步工具输出到数据产品中进行消费。而从业务系统到数据仓库再到数据产品都是以表的形式体现的,其流转过程如下图所示:
2. 标记数据资产等级:
在所有数据链路上,整理出消费各个表的应用业务。通过给这些应用业务划分数据资产等级,结合数据的上下游依赖关系,将整个链路打上某一类资产等级标签。
举例:
假设公司有统一的订单服务中心。应用层的应用业务是按照业务线,商品类型和地域统计公司的订单数量和订单金额,命名为order_num_amount
。
假设该应用会影响到整个企业的重要业务决策,我们可以把应用定级为 L2,从而整个数据链路上的表的数据等级,都可以标记为L2-order_num_amount
,一直标记到源数据业务系统,如下图所示:
在线业务复杂多变,总是在不断地变更,每一次变更都会带来数据的变化,数据仓库需要适应这多变的业务发展,及时做到数据的准确性。
基于此,在线业务的变更如何高效地通知到离线数据仓库,同样也是需要考虑的问题。为了保障在线数据和离线数据的一致性,我们可以通过工具+人员管理并行的方式来尽可能的解决以上问题:既要在工具上自动捕捉每一次业务的变化,同时也要求开发人员在意识上自动进行业务变更通知。
1. 业务上线发布平台:
监控业务上线发布平台上的重大业务变更,通过订阅这个发布过程,及时将变更内容通知到数据部门。
由于业务系统复杂多变,若日常发布变更频繁,那么每次都通知数据部门,会造成不必要的资源浪费。这时,我们可以使用之前已经完成标记的数据资产等级标签,针对涉及高等级数据应用的数据资产,整理出哪些类型的业务变更会影响数据的加工或者影响数据统计口径的调整,则这些情况都必须及时通知到数据部门。
如果公司没有自己的业务发布平台,那么就需要与业务部门约定好,针对高等级的数据资产的业务变更,需要以邮件或者其他书面的说明及时反馈到数据部门。
2. 操作人员管理:
工具只是辅助监管的一种手段,而使用工具的人员才是核心。数据资产等级的上下游打通过程需要通知给在线业务系统开发人员,使其知道哪些是重要的核心数据资产,哪些暂时还只是作为内部分析数据使用,提高在线开发人员的数据风险意识。
可以通过培训的方式,把数据质量管理的诉求,数据质量管理的整个数据加工过程,以及数据产品的应用方式及应用场景告知在线开发人员,使其了解数据的重要性、价值及风险。确保在线开发人员在完成业务目标的同时,也要考虑数据的目标,保持业务端和数据段一致。
数据从在线业务系统到数据仓库再到数据产品的过程中,需要在数据仓库这一层完成数据的清洗、加工。正是有了数据的加工,才有了数据仓库模型和数据仓库代码的建设。如何保障数据加过程中的质量,是离线数据仓库保障数据质量的一个重要环节。
在这些环节中,我们可以采用以下方式来保障数据质量:
开发相关的规则引擎,辅助代码提交校验。规则分类大致为:
加强测试环节,测试环境测试后再发布到生成环境,且生成环境测试通过后才算发布成功。
在进行数据更新操作前,需要通知下游数据变更原因、变更逻辑、变更时间等信息。下游没有异议后,再按照约定时间执行变更发布操作。
风险点监控主要是针对数据在日常运行过程中容易出现的风险进行监控并设置报警机制,主要包括在线数据和离线数据运行风险点监控。
在线业务系统的数据生产过程需要保证数据质量,主要根据业务规则对数据进行监控。
比如交易系统配置的一些监控规则,如订单拍下时间、订单完结时间、订单支付金额、订单状态流转等都配置了校验规则。订单拍下时间肯定不会大于当天时间,也不会小于业务上线时间,一旦出现异常的订单创建时间,就会立刻报警,同时报警给到多人。通过这种机制,可以及时发现并解决问题。
随着业务负责程度的提升,会导致规则繁多、规则配置的运行成本增大,这时可以按照我们之前的数据资产等级有针对性的进行监控。
离线数据风险点监控主要包括对数据准确性和数据产出及时性的监控。对数据调度平台上所有数据处理调度进行监控。
我们以阿里的 DataWorks 数据调度工具为例,DataWorks 是基于 MaxCompute 计算引擎的一站式开发工场,帮助企业快速完成数据集成、开发、治理、质量、安全等全套数据研发工作。
DataWorks 中的 DQC 通过配置数据质量校验规则,实现离线数据处理中的数据质量监控报警机制。
下图是 DQC 的工作流程图:
DQC 数据监控规则有强规则和弱规则:
DQC 提供常用的规则模板,包括表行数较 N 天前波动率、表空间大小较 N 天前波动率、字段最大/最小/平均值相比 N 天前波动率、字段空值/唯一个数等。
DQC 检查其实也是运行 SQL 任务,只是这个任务是嵌套在主任务中的,一旦检查点太多自然就会影响整体的性能,因此还是依赖数据产等级来确定规则的配置情况。比如 L1、L2 类数据监控率要达到 90% 以上,规则类型需要三种及以上,而不重要的数据资产则不强制要求。
在确保数据准确性的前提下,需要进一步让数据能够及时地提供服务,否则数据的价值将大幅度降低,甚至没有价值,所以确保数据及时性也是保障数据质量重中之重的一环。
对于DataWorks平台的调度任务,可以通过智能监控工具进行优先级设置。DataWorks的调度是一个树形结构,当配置了叶子节点的优先级,这个优先级会传递到所有的上游节点,而叶子节点通常就是服务业务的消费节点。
因此,在优先级的设置上,要先确定业务的资产等级,等级越高的业务对应的消费节点优先级越高,优先调度并占用计算资源,确保高等级业务的准时产出。
总之,就是按照数据资产等级优先执行高等级数据资产的调度任务,优先保障高等级业务的数据需求。
任务报警和优先级类似,通过DataWorks的智能监控工具进行配置,只需要配置叶子节点即可向上游传递报警配置。任务执行过程中,可能出错或延迟,为了保障最重要数据(即资产等级高的数据)产出,需要立即处理出错并介入处理延迟。
DataWorks进行离线任务调度时,提供智能监控工具,对调度任务进行监控告警。根据监控规则和任务运行情况,智能监控决策是否报警、何时报警、如何报警以及给谁报警。智能监控会自动选择最合理的报警时间、报警方式以及报警对象。
要想真正解决数据质量问题,就要明确业务需求并从需求开始控制数据质量,并建立数据质量管理机制。从业务出发做问题定义,由工具自动、及时发现问题,明确问题责任人,通过邮件、短信等方式进行通知,保证问题及时通知到责任人。跟踪问题整改进度,保证数据质量问题全过程的管理。
稳定业务按照标准的数据流向进行开发,即 ODS –> DWD –> DWS –> APP。非稳定业务或探索性需求,可以遵循 ODS -> DWD -> APP 或者 ODS -> DWD -> DWM ->APP 两个模型数据流。
在保障了数据链路的合理性之后,也必须保证模型分层引用原则:
举例:
需统一规定不同的数据的数据类型,严格按照规定的数据类型执行:
宽表的冗余字段要确保:
保证主题域内,指标口径一致,无歧义。
通过数据分层,提供统一的数据出口,统一对外输出的数据口径,避免同一指标不同口径的情况发生。
指标口径的不一致使得数据使用的成本极高,经常出现口径打架、反复核对数据的问题。在数据治理中,我们将需求梳理到的所有指标进行进一步梳理,明确其口径,如果存在两个指标名称相同,但口径不一致,先判断是否是进行合并,如需要同时存在,那么在命名上必须能够区分开。
指标管理分为原子指标维护和派生指标维护。
原子指标:
派生指标:
新增数据,增量数据是上次导出之后的新数据。
每天的所有的最新状态的数据。
按日分区,记录截止数据日期的全量数据。
记录截止数据日期的全量数据。
这部分主要是要通过对历史数据的等级划分与对表类型的划分生成相应的生命周期管理矩阵。
主要将历史数据划分P0、Pl、P2、P3 四个等级,其具体定义如下:
事件型流水表(增量表)指数据无重复或者无主键数据,如日志。
事件型镜像表(增量表)指业务过程性数据,有主键,但是对于同样主键的属性会发生缓慢变化,如交易、订单状态与时间会根据业务发生变更。
维表包括维度与维度属性数据,如用户表、商品表。
Merge 全量表包括业务过程性数据或者维表数据。由于数据本身有新增的或者发生状态变更,对于同样主键的数据可能会保留多份,因此可以对这些数据根据主键进行 Merge 操作,主键对应的属性只会保留最新状态,历史状态保留在前一天分区 中。例如,用户表、交易表等都可以进行 Merge 操作。
ETL 临时表是指 ETL 处理过程中产生的临时表数据,一般不建议保留,最多7天。
TT 拉取的数据和 DbSync 产生的临时数据最终会流转到 DS 层,ODS 层数据作为原始数据保留下来,从而使得 TT&DbSync 上游数据成为临时数据。这类数据不建议保留很长时间,生命周期默认设置为 93天,可以根据实际情况适当减少保留天数。
7. 普通全量表
很多小业务数据或者产品数据,BI一般是直接全量拉取,这种方式效率快,对存储压力也不是很大,而且表保留很长时间,可以根据历史数据等级确定保留策略。
通过上述历史数据等级划分与表类型划分,生成相应的生命周期管理矩阵,如下表所示:
同步规范:
表分类与生命周期:
数据质量:
共维度在不同的物理表中的字段名称、数据类型、数据内容必须保持一致(历史原因不一致,要做好版本控制)
将维度与关联性强的字段进行组合,一起查询,一起展示,两个维度必须具有天然的关系,如:商品的基本属性和所属品牌。
无相关性:如一些使用频率较小的杂项维度,可以构建一个集合杂项维度的特殊属性。
行为维度:经过计算的度量,但下游当维度处理,例:点击量 0-1000,100-1000等,可以做聚合分类。
针对重要性,业务相关性、源、使用频率等可分为核心表、扩展表。
数据记录较大的维度,可以适当冗余一些子集。
建议按天分区。
建议按天分区。
数据仓库的性能是数据仓库建设是否成功的重要标准之一。聚集主要是通过汇总明细粒度数据来获得改进查询性能的效果。通过访问聚集数据,可以减少数据库在响应查询时必须执行的工作量,能够快速响应用户的查询,同时有利于减少不同用访问明细数据带来的结果不一致问题。
第一步:确定聚集维度
在原始明细模型中会存在多个描述事实的维度,如日期、商品类别、卖家等,这时候需要确定根据什么维度聚集,如果只关心商品的交易额情况,那么就可以根据商品维度聚集数据。
第二步:确定一致性上钻
这时候要关心是按月汇总还是按天汇总,是按照商品汇总还是按照类目汇总,如果按照类目汇总,还需要关心是按照大类汇总还是小类汇总。当然,我们要做的只是了解用户需要什么,然后按照他们想要的进行聚集。
第三步:确定聚集事实
在原始明细模型中可能会有多个事实的度量,比如在交易中有交易额、交易数量等,这时候要明确是按照交易额汇总还是按照成交数量汇总。
除了聚集基本的原则外,公共汇总层还必须遵循以下原则:
_Id
表示最近1天,_td
表示截至当天,_nd
表示最近N天。词根属于数仓建设中的规范,属于元数据管理的范畴,现在把这个划到数据治理的一部分。完整的数仓建设是包含数据治理的,只是现在谈到数仓偏向于数据建模, 而谈到数据治理,更多的是关于数据规范、数据管理。
表命名,其实在很大程度上是对元数据描述的一种体现,表命名规范越完善,我 们能从表名获取到的信息就越多。比如:一部分业务是关于货架的,英文名是:rack, rack 就是一个词根,那我们就在所有的表、字段等用到的地方都叫 rack,不要叫成 别的什么。这就是词根的作用,用来统一命名,表达同一个含义。
指标体系中有很多“率”的指标,都可以拆解成 XXX+率,率可以叫 rate,那我 们所有的指标都叫做 XXX+rate。
词根:可以用来统一表名、字段名、主题域名等等。
举例:以流程图的方式来展示,更加直观和易懂,本图侧重 dwm 层表的命名 规范,其余命名是类似的道理:
第一个判断条件是该表的用途,是中间表、原始日志还是业务展示用的表 如果该表被判断为中间表,就会走入下一个判断条件:表是否有 group 操作 通过是否有 group 操作来判断该表该划分在 dwd 层还是 dwm 和 dws 层 如果不是 dwd 层,则需要判断该表是否是多个行为的汇总表(即宽表) 最后再分别填上事业群、部门、业务线、自定义名称和更新频率等信息即可。
分层:表的使用范围
事业群和部门:生产该表或者该数据的团队
业务线:表明该数据是哪个产品或者业务线相关
主题域:分析问题的角度,对象实体
自定义:一般会尽可能多描述该表的信息,比如活跃表、留存表等
更新周期:比如说天级还是月级更新
数仓表的命名规范如下:
1. 数仓层次:
公用维度:dim
DM层:dm
ODS层:ods
DWD层:dwd
DWS层:dws
2. 周期/数据范围:
日快照:d
增量:i
全量:f
周:w
拉链表:l
非分区全量表:a
常规表是我们需要固化的表,是正式使用的表,是目前一段时间内需要去维护去 完善的表。
规范:分层前缀[dwd|dws|ads]_部门_业务域_主题域_XXX_更新周期|数据范围
业务域、主题域我们都可以用词根的方式枚举清楚,不断完善。
更新周期主要的是时间粒度、日、月、年、周等。
中间表一般出现在 Job 中,是 Job 中临时存储的中间数据的表,中间表的作 用域只限于当前 Job 执行过程中,Job 一旦执行完成,该中间表的使命就完 成了,是可以删除的(按照自己公司的场景自由选择,以前公司会保留几天 的中间表数据,用来排查问题)。
规范:mid_table_name_[0~9|dim]
table_name 是我们任务中目标表的名字,通常来说一个任务只有一个目标表。这里加上表名,是为了防止自由发挥的时候表名冲突,而末尾大家可以选择自由发挥,起一些有意义的名字,或者简单粗暴,使用数字代替,各有优劣吧,谨慎选择。
通常会遇到需要补全维度的表,这里使用 dim 结尾。
如果要保留历史的中间表,可以加上日期或者时间戳。
临时表是临时测试的表,是临时使用一次的表,就是暂时保存下数据看看,后续一般不再使用的表,是可以随时删除的表。
规范:tmp_xxx
只要加上 tmp 开头即可,其他名字随意,注意 tmp 开头的表不要用来实际使用,只是测试验证而已。
维度表是基于底层数据,抽象出来的描述类的表。维度表可以自动从底层表抽象出来,也可以手工来维护。
规范:dim_xxx
维度表,统一以 dim 开头,后面加上,对该指标的描述。
手工表是手工维护的表,手工初始化一次之后,一般不会自动改变,后面变更,也是手工来维护。
一般来说,手工的数据粒度是偏细的,所以暂时统一放在 dwd 层,后面如果有目标值或者其他类型手工数据,再根据实际情况分层。
规范:dwd_业务域_manual_xxx
手工表,增加特殊的主题域,manual,表示手工维护表。
--END--