这里只是个demo,查询性能还不够上生产的,仅抛砖引玉。
下面代码的核心在于引用了一个外部的jar包用于sql解析(暂未确认企业环境是否可以直接使用)。
# -*- coding: utf-8 -*-
# pip install pymssql==2.2.7
# pip install mysql-connector-python==8.0.31
# 这里核心是对列的解析,使用到了一个java组件 https://github.com/sqlparser/gsp_demo_java/releases/
import json
import subprocess
import time
import mysql.connector
import pymssql
def get_black_fields():
# 需要脱敏的列清单,从数据库中获取
mysql_db = mysql.connector.connect(
host="192.168.31.181",
port=3306,
user="dts",
passwd="123456",
db="sql_query_platform",
)
mysql_cursor = mysql_db.cursor()
mysql_cursor.execute("select field_name from mssql_masked_field where status='ON'")
result = mysql_cursor.fetchall()
black_fields = []
for ii in result:
black_fields.append(ii[0])
print(f"当前的需要脱敏的列清单: {black_fields}")
return black_fields
def fetch_all_as_dict(cursor):
columns = [column[0] for column in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def get_database_results(server, user, password, database, query):
connection = pymssql.connect(server, user, password, database)
try:
cursor = connection.cursor()
cursor.execute(query)
result = fetch_all_as_dict(cursor)
finally:
connection.close()
return result
# 脱敏函数
def desensitize_fields(customer_dict, fields_to_desensitize):
desensitized_res = customer_dict.copy()
for field in fields_to_desensitize:
if field in desensitized_res:
if field in fields_to_desensitize:
desensitized_res[field] = "敏感内容不展示"
return desensitized_res
if __name__ == "__main__":
# 连接到业务库,执行业务数据的查询操作
server = "192.168.31.181:2433"
user = "sa"
password = "Abcd1234"
database = "testdb"
#搞个复杂的SQL试试
query = """
SELECT *
FROM (
SELECT name as "用户名",phone as "手机号", address, remark
FROM dbo.t1
WHERE 1=1
) AS subquery;
"""
start_ts = time.time()
if "as".lower() in query.lower():
# 这里还可以考虑发个钉钉告警
# print(f"发现sql中有用到alias别名写法,请注意是否造成脱敏失败,query明细: {query}")
with open("/tmp/tmp.sql", "w", encoding="utf-8") as f:
f.write(str(query))
# 计算下java解析的耗时
start_ts_java = time.time()
# 使用java解析列名,如果是复杂查询,可能这里比较费时间
cmd = subprocess.Popen(
r"/usr/local/software/jdk1.8.0_181/bin/java -jar /usr/local/Dlineage/bin/gudusoft.dlineage.jar /t mssql /f /tmp/tmp.sql /json > /tmp/json.log",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
cmd.wait()
stop_ts_java = time.time()
with open("/tmp/json.log", "r", encoding="utf-8") as f:
res = f.readlines()
json_res = json.loads(res[0])
relationships = json_res["relationships"]
black_fields = get_black_fields()
for i in relationships:
if (i["target"]["column"] != i["sources"][0]["column"]) and i["sources"][0][
"column"
] in black_fields:
# 将找到的别名列也加进去
black_fields.append(i["target"]["column"])
# 获取查询结果集
results = get_database_results(server, user, password, database, query)
desensitized_res = [desensitize_fields(res, black_fields) for res in results]
for ii in desensitized_res:
# 结果转为json格式,便于前端展示
print(json.dumps(ii, ensure_ascii=False))
stop_ts = time.time()
print(f"总耗时(秒): {(stop_ts - start_ts)}")
print(f"java解析耗时(秒): {(stop_ts_java - start_ts_java)}")
print(f"数据库查询耗时(秒): {(stop_ts - start_ts) - (stop_ts_java - start_ts_java)}")
"""
-- MySQL建表
create database sql_query_platform;
use sql_query_platform;
CREATE TABLE `mssql_masked_field` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`field_name` varchar(128) NOT NULL,
`remark` varchar(128) NOT NULL DEFAULT '',
`status` char(3) NOT NULL DEFAULT 'ON',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
-- 预先定义的数据
[sql_query_platform]> select * from mssql_masked_field ;
+----+------------+--------+---------+
| id | field_name | remark | status |
+----+------------+--------+---------+
| 1 | name | | OFF |
| 2 | address | | OFF |
| 3 | addr | | OFF |
| 4 | phone | | ON |
+----+------------+--------+---------+
4 rows in set (0.00 sec)
"""
另外, 社区开源的druid sqlparser应该也能达到sql解析的效果,具体还有待编码测试(如果生产使用的话建议包装成http接口方式,规避掉jar包启动的耗时,不要像我上面DEMO脚本里面每次java -jar去启动)。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。