前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据血缘分析-Python代码的智能解析

数据血缘分析-Python代码的智能解析

作者头像
大数据技术架构
发布2022-12-01 15:54:46
1.8K0
发布2022-12-01 15:54:46
举报
文章被收录于专栏:大数据技术架构

如果你的工作是从事数据挖掘、数据仓库建设或者信息系统开发/维护,有没有曾经遇到过如下的烦恼?

  • 面对着几百上千张数据表,不知该如何关联,不知哪些表更有价值
  • 执行着长的令人绝望,慢的无法忍受的SQL脚本,却不敢挥刀整改
  • 准备着新功能上线,但总担心一行代码的修改会造成严重的生产事故

有没有科学的办法,来管理表之间、代码之间的复杂关系?从而帮助开发人员更好地认识和理解业务系统业务与底层表关系、底层表的表间关系,理清当前数据(字段、关键指标或者数据标签)从哪里来、到哪里去,搞清楚哪些下游系统在使用这些数据。

血缘分析是解决这类问题的一种技术手段。数据血缘(Data Lineage),指的是数据从产生、ETL处理、加工、融合、流转到最终消亡,数据之间自然形成一种关系。这些关系就是描述数据的数据(元数据)。掌握了这个元数据,就能最大程度的做好数据的应用和管理。

那么如何推导数据之间的血缘关系呢?其实在开发或者分析师团队提供的成百上千的数据脚本中,每一行代码背后都蕴含着业务逻辑和数据关系。有没有可能通过批量解析这些数据脚本,自动提炼出背后的数据逻辑,以及脚本之间的依赖关系呢?

本文介绍一种针对python代码的推导方法。

既然解析的对象是python代码,我们首先要理解python的编译过程。以 CPython 为例,编译过程如下:

  1. 将源代码解析为解析树(Parser Tree)
  2. 将解析树转换为抽象语法树(Abstract Syntax Tree)
  3. 将抽象语法树转换到控制流图(Control Flow Graph)
  4. 根据流图将字节码(bytecode)发送给虚拟机(eval)

我们平常在python开发环境中编写代码时,IDE会提示各种编写过程中的语法错误,本质上是代码静态检查,对代码的内容和结构进行解析和分析,类似编译过程中的前三个步骤,让机器读懂代码并且判断其是否符合规范。

因此我们就可以利用这个原理,通过代码解析自动提取代码中的关键信息,例如代码

  • 引用了哪些外部函数,进一步调用了哪些数据脚本
  • SQL语句使用了哪个数据源,查询了哪些表,更新了哪些字段
  • 处理逻辑中对字段做了哪些衍生操作,用了什么算法

这种方法虽不能一键生成完整的开发文档,但却能提供大量丰富的线索,有助于快速开展梳理工作,事半功倍。

我们参考Pyflakes来实现以上功能,Pyflakes是Python的一个代码分析包,用来分析代码,发现潜在的代码问题,例如:引入但没有用到的模块、变量创建但是没有使用。查看Pyflakes的源码,可以发现其进一步使用ast 模块,其用于生成和编译 Python 代码的抽象语法树,关于ast的介绍可以进一步查看https://blog.csdn.net/ThinkTimes/article/details/110831176

一般来说pyFlakes是用cmd命令来执行,但是我们这里为了了解其运行机制,使用python的方式来调用其API函数,方便使用debug的方式跟踪程序。具体代码如下:

代码语言:javascript
复制
from pyflakes import reporter as modReporter
from pyflakes import api

if __name__ == "__main__":
    reporter = modReporter._makeDefaultReporter()
    args = ['C:\\Users\\yzeng\\PycharmProjects\\pythonProject\\flakes']
    warnings = api.checkRecursive(args, reporter)

Args传入的是文件夹信息,然后调用程序checkRecursive检查该文件夹下的所有代码,进一步调用函数checkPath,使用f.read读取代码内容,调用核心函数check(codestr, filename, reporter)。

在这个函数中,首先使用tree = ast.parse(codeString, filename=filename),生成抽象语法树。在pycharm的debug窗口查看tree的结构,如下

这个过程类似语言处理技术,对文字的词法和句法解析以便让机器了解文字含义。Python运行时需要对python脚本内容进行解析,也就是把python脚本的每一个语句进行分类,并且建立语句之间的语法关系,也就是抽象语法树。在这个截图中,可以看到有5个节点,对应源代码的5段代码片段,例如 ImportFrom代码 和 函数定义function define的代码。这5个节点还有其各自的子节点,例如assign节点的子节点是call类型的节点(如果是调用一个函数)。更多语法树的节点类型,可参考https://docs.python.org/3/library/ast.html 不同类型的节点其属性不一样,通用的属性有位置信息,例如col_offset和end_col_offset指的是该代码片段在列的起始和结束位置,type_comment指的是该代码是否有type 类型的注释(可以为函数参数、返回值、变量等添加类型提示,主要目的在于帮助开发工具通过静态检查发现代码中的 Bug)。

而后调用file_tokens = checker.make_tokens(codeString)将代码的所有内容进行分词,记录每一个词在代码中的起始位置,结果如下:

最后,w = checker.Checker(tree, file_tokens=file_tokens, filename=filename) 是主要实现代码检查的函数。在checker类中scopeStack约定语法树的范围,scope本质是一个字典,默认是代码文件级别(即module)。针对所有python内置的标识符(函数),将其赋值到字典self.scope里

代码语言:javascript
复制
for builtin in self.builtIns:
 self.addBinding(None, Builtin(builtin))

调用self.handleChildren(tree),遍历树里面的每一个节点,计算整棵树的深度nodeDepth,记录每个节点所在的深度,进一步调用handler = self.getNodeHandler(node.__class__),为每一种类型的节点动态加载针对该节点类型的处理函数,并且执行它,例如 函数IMPORTFROM就是针对import from节点执行的函数。在该函数中,解析到具体引入的包名,然后使用addBinding函数,检查这个节点的属性是否在self.scope里,如果没有就新建这个值,如果有则会根据python语法规则进行判断提示警告,例如是否是“重复导入相同的包”。当代码片段是赋值操作的时候,就会调用handleNodeLoad,判断之前引入的变量是否存在且完成赋值。后续在函数checkDeadScopes就会检查该变量是否被使用,没有就会报“imported but unused”的警告。

在了解Pyflakes源码基础上,我们采用下面的代码来遍历语法树,会有一个更直观的感受。

代码语言:javascript
复制
#借鉴flakes的类Checker
class linkage_Checker:
    nodeDepth = 0

    def __init__(self, tree, file_tokens=(), filename='(none)',codestr='none'):
        self._nodeHandlers = {}
        self.codelines = codestr.decode().split('\r\n')
        self.handleChildren(tree)


    #遍历语法树
    def handleChildren(self, tree, omit=None):
        for node in checker.iter_child_nodes(tree, omit=omit):
            self.handleNode(node, tree)

    #针对节点处理
    def handleNode(self, node, parent):
        if node is None:
            return
        self.nodeDepth += 1
        print('-----------------')
        print('节点类型:%s' % node.__class__)
        print('节点层次:%s' % self.nodeDepth)
        try:
           fields =  '/'.join([field for field in node.__class__._fields])
           print('节点属性:%s' % fields)
        except:
           print(123)

        lineno = getattr(node, 'lineno', 0)
        end_lineno = getattr(node, 'end_lineno', 0)
        col_offset = getattr(node, 'col_offset', 0)
        end_col_offset = getattr(node, 'end_col_offset', 0)

        print('起始行:%s' %lineno )
        print('结束行:%s' %end_lineno )
        print('起始列:%s' %col_offset )
        print('结束列:%s' %end_col_offset )
        if lineno > 0:
           getCodebyposition(self.codelines, lineno, end_lineno, col_offset, end_col_offset)

        try:
            handler = self.getNodeHandler(node.__class__)
            handler(node)
        finally:
            self.nodeDepth -= 1

    #针对节点类型获得对应的处理函数
    def getNodeHandler(self, node_class):
        try:
            return self._nodeHandlers[node_class]
        except KeyError:
            nodeType = checker.getNodeType(node_class)
        self._nodeHandlers[node_class] = handler = getattr(
            self, nodeType, self._unknown_handler,
        )
        return handler

    #默认就使用 遍历的函数
    def _unknown_handler(self, node):
        self.handleChildren(node)


#解析语法树
def check(codestr, filename, reporter=None):
    try:
        tree = ast.parse(codestr, filename=filename)
    except SyntaxError:
        value = sys.exc_info()[1]
        msg = value.args[0]
        (lineno, offset, text) = value.lineno, value.offset, value.text
        print(lineno, offset, text)
    # 分词
    file_tokens = checker.make_tokens(codestr)
w = linkage_Checker(tree, file_tokens=file_tokens,filename=filename,codestr=codestr)
    return 1

def getCodebyposition(codelines,lineno,end_lineno,col_offset,end_col_offset):
    for i in range(lineno,end_lineno+1):
        if i == lineno and lineno == end_lineno:
           print(codelines[lineno-1][col_offset:end_col_offset])
        elif i == lineno:
            print(codelines[lineno - 1][col_offset:])
        elif i == end_lineno:
            print(codelines[end_lineno - 1][:end_col_offset])
        else:
            print(codelines[i - 1])
    return 1

通过代码的运行结果可以看到树状的层次结构,

例如代码:

records = pd.read_sql_query('''select cast(date as date) as NaturalDay, symbol from table where date between begin and end’’’)

是一个ast.assign类型,它的一个子节点是ast.call,对应的代码是

pd.read_sql_query('''select cast(date as date) as NaturalDay, symbol from table where date between begin and end’’’)

其有一个类型为ast.Attribute子节点,对应代码是pd.read_sql_query。这个节点又有一个ast.Name的子节点,对应代码是pd。可见,语法树是把一段代码按照语法结构解析的树状结果,以便编译器进一步将抽象语法树转换为更接近机器代码的 control flow Graph。

进一步,在充分理解ast解析过程和语法树结构的基础上,我们可以针对关心的代码片段进行提取,例如关心“类相关”或者“sql”相关的代码片段。这里我们自定义一个解析sql代码的函数,能够自动提取其用到的表名和字段名。

代码语言:javascript
复制
#解析其中一段sql语句中的字段和表名
def getTableField(statement):
    result = {}
    matchObj = re.search( r'select(.*)from(.*)', statement, re.M|re.I)
    if pd.notnull(matchObj):
        fields = re.split(',', matchObj.group(1))
        fields = [field.strip() for field in fields]
        table = matchObj.group(2)
        # table = table.split()
        table = table.strip()
        result[table] = fields
    return result

#解析sql语句中的字段和表名, 参考 https://www.robin.eu.org/programming/extracting-table-and-column-names-from-sql-query/
def sqlparse(sql_str):
    sql_str = sql_str.replace('SELECT', 'select')
    sql_str = sql_str.replace('WHERE', 'where')
    sql_str = sql_str.replace('FROM', 'from')
    re_skip_detail = re.compile("([a-zA-Z0-9]+)")  # 匹配英文和数字
    tmp = re_skip_detail.split(sql_str)
    # tmp =[x for x in tmp if len(x.strip())>0]
    select_index = from_index  = 0
    parse_result = []
    for index,item in enumerate(tmp):
        # print(item)
        if item in ('select','where'):
           if from_index > 0 : # 如果前面已经有比较完整的sql语句了,也就是已经出现from了
              statement = ''.join(tmp[select_index : index]) # 截取到当前的位置
              # print('语句:%s' %statement)
              if len(statement) > 0:
                  table_fields = getTableField(statement)
                  parse_result.append(table_fields)
                  # print(parse_result)
              # 新的开始
              from_index = 0
           if item == 'select':
              select_index = index
        elif item == 'from':
            from_index = index

    return parse_result

好啦,大功告成,最后展示的结果如下:

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,助力维护团队卓越代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档