目录
背景
元数据是关于数据的数据,是对数据的描述,元数据又分为三类:管理元数据、业务元数据和技术元数据。而字段或表级血缘关系就是技术元数据,关于业务元数据和管理元数据等元数据相关知识笔者给出前期文章分享链接,笔者这里不再赘述。
前期几篇文章讲了元数据和血管关系整体思路,但没讲字段级血缘如何解析如何实现,此篇文章重点讲解跨引擎超完备字段血缘关系实现解题方法。
血缘关系使用场景
血缘关系重要性不言而喻,比喻为数据的“一条龙脉”都不为过。随着数据应用场景日益繁多,数据没有形成血管关系问题逐渐凸显,从宏观看,数据如同珍珠散落各处,逐渐形成数据孤岛;从微观看,数据如DNA断裂,无法快速地识别数据来源、加工逻辑或计算口径。数据没有形成一张网,没有一条龙脉,数据就无法变成一个可迁移、可溯源、可判断、可量化的活的生态有机体。这里列举一些血缘关系使用场景:
数据迁移工作可分几个步骤:数据迁移前数据盘点;迁移任务;迁移完成后数据下线。数据迁移前盘点,从宏观看有哪些数据集群间、系统间、系统表之间关系,没有血缘关系靠人工盘点识别血缘关系,难免会有错误、出现遗漏并且效率低;迁移任务会进行模型重构,需要清晰知道原字段或指标加工逻辑,低效地识别字段如何加工使用哪些系统和表;数据迁移过程中局部作业下线时无法清晰识别是否有依赖,模型是否自闭环完全可下线等等。血缘关系可形成不同粒度的如集群级血缘、系统级血缘、表级别血缘、字段级别关系网络,满足数据迁移盘点量化,迁移中逻辑梳理,迁移后血缘关系叶子节点层层下线。
影响度分析,也是较为血缘关系应用的一部分,其用来分析数据的下游流向。当系统进行升级改造或者数据延迟,能动态数据结构变更、删除及时告知下游系统。通过依赖数据的影响性分析,可以快速定位出元数据修改会影响到哪些下游系统,哪些表和哪些字段。从而减少系统升级改造带来的风险。
下线分析和影响度分析功能大致相同,只是应用的侧重点不同,下线分析是根据数据热度,对冷数据或冰数据归档下线、或者数据模型迭代升级,旧模型不在使用时,是否对其他应用造成依赖影响,便于数据归档操作。
数据价值分析主要是对数据表的被使用情况进行统计,价值密度、访问频次、使用方式、时效性等级等维度评估,从而评级出数据热度,热数据、温数据、冷数据和冰数据。数据价值访问评估一些常用的维度:表的访问频率分析、表分区数据访问分析、跨表访问分析、跨层访问分析、跨库访问分析、字段访问频率分析、表访问用户量分析和分层表访问总量分析等。数据热度应随着时间的推移,数据价值会变化,应动态更新数据热度等级,推动数据从产生到销毁数据生命周期管理。总之,在成本可控、可量化、可管理的前提下,从数据中挖掘出更多有效的数据价值。
如今开源时代,已经涌现出不同好用的计算引擎,因引擎特性被应用在不同的使用场景,这些计算引擎虽然语法上支持SQL或已在SQL化,即使国际上也有SQL标准,但是各个引擎语法方言却多少有点各不相同,元数据且不共用,难以形成一个跨语法跨引擎的字段级血缘关系网络。
跨引擎字段级血缘关系实现
据了解,无论是全球各大云厂商如亚马逊、阿里云、腾讯云、华为云等等,还是国内自研数据中台或其他数据平台还没有做到跨引擎完备的表或字段级血缘关系,大部分还支持表级别血缘关系,表级别和字段级别区别,比如在应用场景上,笔者这里列举几个常见场景:
现有表或字段血缘关系现有常见的做法:
等等以上更有做法各有千秋,但目前Hadoop上开源引擎种类丰富,有着不同数据服务场景,
时效性,准确性、高并发低延迟,或者历史组件迭代等,选择各种组件优点,使用各种技术栈,数据在hive、spark、flink、hbase、redis、clickhouse、mysql和redis等等组件之间流转之后,血缘关系已经断裂,如一辆跑车进入隧道后被隐藏到一辆卡车里开出来,永远地找不到了,跟丢了。虽然各种引擎都会往SQL简单化,易用化,使用人群高的SQL语言,但是SQL确实存在方言,虽然sql通通用性很高,但是多少有点不一样,就会导致血缘关系断裂,没有形成一个跨引擎完备血缘关系网络,下面笔者给出跨引擎完备血缘关系实现方法。
正解就是根据不同组件,编写不同词法文件,生成词法分析器、语法分析器,对抽象语法树遍历,生成血缘关系,再次整合局部血缘,最终生成全局血缘完备血缘关系。
1.准备词法文件
笔者这里使用Antlr4编写词法文件,词法分析器、语法分析器、对抽象语法树遍历来生成血缘关系,以Hive引擎为例,其他引擎方法类似。因为SQL90%语法相同(其他非SQL同样可以Antlr进行实现),于是笔者也使Presto的词法文件进行改写使其完备通用满足Hive SQL语法,至于词法文件如何实现,笔者给出往期文章链接,Antlr4实战:统一SQL路由多引擎,这里不再赘述。以下给出某些词法文件的新增核心改动点。
INSERT OVERWRITE TABLE DM.DM_CHANNEL_MAIN_1D_DI PARTITION(SDT)
SELECT
USER_ID
FROM ROWNUMBER
添加分区表达式以及分区内的表达式
//单独添加Insert into语句
insertIO
: INSERT (INTO|OVERWRITE) TABLE qualifiedName partitionSpec? query
;
//分区表达式以及分区内的表达式
partitionSpec
: PARTITION '(' partitionVal (',' partitionVal)* ')'
;
partitionVal
: identifier (EQ identifier)?
;
SELECT
TRIM(CONCAT('001'
,IF(CAST (GROUPING__ID AS INT) || FLOOR(POWER(2,13)) = 0 ,'1','')
,IF(CAST (GROUPING__ID AS INT) & FLOOR(POWER(2,11)) = 0 ,'2','')
FROM TABLE1
与和并写法支持改写:
valueExpression
: primaryExpression #valueExpressionDefault
| valueExpression AT timeZoneSpecifier #atTimeZone
| operator=(MINUS | PLUS) valueExpression #arithmeticUnary
| left=valueExpression operator=(ASTERISK | SLASH | PERCEN) right=valueExpression #arithmeticBinary
| left=valueExpression operator=(PLUS | MINUS) right=valueExpression #arithmeticBinary
| left=valueExpression CONCAT right=valueExpression #concatenation
;
CONCAT: '||' | '&';//这里添加’&‘并写法
2.遍历语法树解析字段级血缘
准备好词法文件,词法分析器、语法分析器、用访问者模式遍历抽象语法树来生成血缘关系。这一步是最复杂最关键的环节,这里重点讲解一些实现思路和部分代码。
对有些同学理解起来比较抽象,所以在讲解之前,笔者先讲个例子便于理解,举例SQL如下:
INSERT OVERWRITE TABLE DM.DM_TRAFFIC_CHANNEL_MAIN_1D_DI
SELECT
T1.AA,T1.CC,T2.FF
FROM (
SELECT
TA.AA+TA.BB AS AA,
CASE WHEN TB.CC='A' THEN TA.CC ELSE TB.AA END + CASE WHEN TB.CC='A' THEN TA.CC ELSE TB.AA END AS CC,
SUBSTRING(TB.CC,1,2) AS EEE,
SUBSTRING(HH,1,2) AS FF
FROM DIM.ROWNUMBER_INFO AS TA
LEFT JOIN DWS.CONSUMER_INFO AS TB ON TA.A=TB.B
) T1
LEFT JOIN TEST_INFO T2 ON T1.A = T2.A
下面是上述SQL的语法树的展示,因太过复杂展示不全面,给出部分,如图:
因为Antlr的遍历语法树的顺序是从左到右,从里向外遍历的,语法树太复杂,截图不全,笔者从SQL标准里内层1、外层2和目标表层级标注来便于理解。
目标表:DM.DM_TRAFFIC_CHANNEL_MAIN_1D_DI
源表:DIM.ROWNUMBER_INFO
DWS.CONSUMER_INFO
TEST_INFO
字段血缘关系:
目标表DM.DM_TRAFFIC_CHANNEL_MAIN_1D_DI字段血缘关系
T1.AA = TA.AA+TA.B=DIM.ROWNUMBER_INFO.AA + DIM.ROWNUMBER_INFO.BB
T1.CC=TB.CC+TA.CC+TB.AA=DWS.CONSUMER_INFO.CC+DWS.CONSUMER_INFO.AA + DIM.ROWNUMBER_INFO.CC
T2.FF = TEST_INFO.FF
从上述看,血缘关系网络需要识别的不光是从左到右、从内层到外层的、还需要识别数据库、系统表、表别名、字段名称、字段加工逻辑、字段别名以及之间的映射关系等等,如果做个和原生语法等价的,还需些特殊情况要考虑,如 SELECT * ;WITH 别名 AS ()的写法都要考虑,我们后面再细讲。
总体思路:
总体思路还是较为抽象,笔者贴下核心代码出来,协助理解。
定义Field字段对象
public class Field {
public String fieldID;//有字段别名就取字段别名,没别名就用字段名称作为fieldID
public String fieldName;
public String tableName;
public String tableAlias;
public String dataBase;
public String isSubQuery="0";//"1" 实体表;"0" 子查询
}
定义LogicField和Field字段对象是一对多的关系,一个逻辑字段可能是多个逻辑字段加工而成。
public class LogicField {
public String logicFiedlID;//存放字段别名,没有字段别名的字段名
public List<Field> fieldObjList;//拆解多个字段,并把数据库、表等信息完善后,把Field对象列表回写
public String fieldContent;//存放此字段的实际内容
public List<String> fieldList = new ArrayList<>(); //实际拆解出字段列表
public List<String> tableList = new ArrayList<>();//实际拆解出表列表,在遍历LogicField字段会判断是否为子查询,用真实数据库来覆盖
public List<String> tableAliasList = new ArrayList<>();//实际拆解出表-别名称列表,用于匹配上下层血缘字段构成唯一性查找的问题
public List<String> dataBaseList = new ArrayList<>() ;//作用同tableList;
public String isMultiFieldFlag="0"; //是否是多个字段组成 "1" 代表 是;"0"代表 否
}
LogicField对象与Field对象之间的区别:
其实在实现过程中,要比这些还要复杂遇到难点:union在这个现有词法文件语法是跨层级的,层级的判断通过visitQuerySpecification(HiveSqlBaseParser.QuerySpecificationContext ctx) 进入顺序来判断层级的,下图显然是进入两次,意味着两个不同层级,逻辑上一个层级,就需要特殊处理
需要判断把其规范到同一个层级即可,关键实现代码如下:
这里设置三个变量
1)UnionCnt UNion的个数 在SetOperation函数中去判断
2)isUnionFlag中判断是否有Uinon操作
3)UnionStep步长判断深度优先遍历的Union扫描何时结束,把此次的开关关上,计数器复位为0。
if(isUnionFlag && (UnionStep < UnionCnt)){
if((InvViable != 0)&&(UnionStep == 0)){//如果这个Union不是在最内层0开始的,说明前面已经有层级存在,需要+1 上升一个层级,但是union内部操作多次不变化,第一次加过不再加了
InvViable = InvViable + 1;
}else{
isStartFlag = false;//启动之后,就不再使用0的层级,之后每个层级都自动+1
}
if(querySpecificationLevel.keySet().contains(InvViable)){//如果这里Union是在同一调用层级,需要里面的LogicField集合拿出来进行合并操作
Multimap<String,LogicField> logicFieldMapOther = querySpecificationLevel.get(InvViable);//相同别名ID,可能对应多个LogicField对象
logicFieldMap.putAll(logicFieldMapOther);
querySpecificationLevel.put(InvViable,logicFieldMap);
}else{
querySpecificationLevel.put(InvViable,logicFieldMap);
}
UnionStep++;
}else{
UnionStep = 0;
UnionCnt = 0;
isUnionFlag = false;
if(InvViable == 0 && isStartFlag){//如果是从非0开始的 累计+1
querySpecificationLevel.put(InvViable,logicFieldMap);
isStartFlag = false;//启动之后,就不再使用0的层级,之后每个层级都自动+1
}else {
InvViable = InvViable + 1;
querySpecificationLevel.put(InvViable,logicFieldMap);
}
还如with 别名 as (子查询写法)的问题:
WITH BODY AS (
SELECT
TA.AA,
TA.BB,
TB.CC,
TB.DD,
TB.EE
FROM DIM.ROWNUMBER_INFO AS TA
LEFT JOIN DWS.CONSUMER_INFO AS TB
ON TA.A=TB.B AND TA.C='1' AND TB.D = 'AAAAA' )
INSERT OVERWRITE TABLE DM.DM_TRAFFIC_CHANNEL_MAIN_1D_DI PARTITION(SDT)
SELECT AA,BB,CC,DD,EE FROM BODY AS W
这个问题正面比较难解决,笔者换个思路,对这类SQL进行等价重写(等价很重要,否则都是错的),换成子查询方式来实现
首先,抹去with 别名 as (子查询)把别名作为key,子查询作为value存储在Map中,在下游From 别名直接到上述Map去判断查找即可。
去掉with 别名 as(子查询)写法,代码如下:
/*
* 把with 别名称 as 子查询()改写成:(表别名,子查询) 键值对。后面有引用别名的,直接替换为子查询。
* @param ctx
*/
@Override
public String visitWith(HiveSqlBaseParser.WithContext ctx) {
StringBuffer sb = null;
List<HiveSqlBaseParser.NamedQueryContext> namedQueryContexts = ctx.namedQuery();
for(HiveSqlBaseParser.NamedQueryContext context : namedQueryContexts){
sb = new StringBuffer();
for(int i=1;i<context.getChildCount();i++){
if(!context.getChild(i).getText().equals("AS")){
sb.append(" " + visitResult(context.getChild(i)) );
}
}
sourceTableMapWiht.put(context.getChild(0).getText(),"(" +sb.toString() + ")");
}
return ""; //这里就把with 别名 as ()子查询的写法直接删掉,
}
with as 真正内容换成子查询方式,代码如下:
public String visitTableName(HiveSqlBaseParser.TableNameContext ctx) {
String curTableName = ctx.qualifiedName().getText();
String replacedSubQueryTableName = null;
if(sourceTableMapWiht.containsKey(curTableName)){
replacedSubQueryTableName = sourceTableMapWiht.get(curTableName);
}else{
replacedSubQueryTableName = curTableName;
}
return replacedSubQueryTableName;
}
3.字段级血缘存储
其他实现代码还有很多,这里就一一不再讲述,解析完整的血缘关系存储到哪种数据库,常见关系型数据库处理这种会吃力很多不建议,笔者选择开源的Neo4J,其是图数据库有天然的处理关系网络的能力,Neo4J如何存储和展示,之前文章讲解数据血缘关系:图数据库Neo4j存储实现参考。
展望
有了完备的血缘关系和访问热度数据,可根据使用的数据源、表、字读直接关系,以及字段使用热度,以及等价逻辑抽象,自定义数仓建模的规则,可实现等价模型重构等场景应用。
有了完备的血缘关系,可以把相关的加工逻辑转化中文,字段comment、字段加工逻辑以文中转为数学公式等等应用场景。
总结
此篇以Hive引擎为例使用Antlr4编写词法文件,词法分析器、语法分析器、对抽象语法树遍历来生成血缘关系,源码中使用Antlr实现词法解析的还有Spark、Presto等,其他Flink、Clichouse、Mysql等引擎同样可以使用此方法来字段级血缘解析,从而形成跨各种引擎的完备的字段级血缘关系网络,形成数据的一条“龙脉”。