mysql 8.0.33 环境需要审计:
审计内容:
要求记录DBA账号的所有操作, 业务账号的DDL操作(dml太多了.) 主要是记录哪个时间点, 哪个人执行的啥SQL
常见的审计插件如下:
1. mysql-audit 只支持到8.0.30 (8.0.33测试无法安装)
2. Mariadb server_audit.so 5.7.3X之后就不支持了
3. Percona Audit Log Plugin 不兼容mysql社区版
当然还有mysql企业版的审计, 但没钱....
那就只能自己写脚本了呗, 可以参考我之前写的审计脚本 https://cloud.tencent.com/developer/article/2259748
原理很简单, 就是通过抓包解析, 匹配符合要求的SQL并记录下来即可.
登录mysql执行如下sql
select concat('{',group_concat("'",host,"':'",user,"'"),'}') from information_schema.processlist;
把结果报错到配置文件中
我这里只记录新连接, 所以就置为空了. (其实也没得旧连接....)
由于环境问题, 我就提前编译好了. (pyinstaller -F auditmysql_by_ddcw.py
), 这里旧直接使用二进制文件了
不支持SSL.
我这里演示的是mysql-router的. 因为mysql-router没得审计功能. 但又想要审计功能.
随便跑点SQL后, 查看审计日志内容. 我这里旧只测试下dba账号的. 执行的sql都记录下来了. 跑不了了(不是)
FILENAME 审计日志的文件名
INTERFACE_NAME 网卡名字, 就是应用的流量到mysql的时候经过的网卡
INTERFACE_PORT 目标端口, 如果是mysql就是3306, 如果是router就是6446之类的, 根据你实际情况来
CHILD: 要监控的组
- name: 'DBA ' #组名字, 随便取, 只是方便识别而已, 没啥物理意义
users: ['root','dba1','db2','u2023'] #用户名
record_begin: ['show','select','insert','delete','update','drop','truncate','alter','create','rename']
- name: 'YEWU' #另一个组, 想分多少,就分多少, 没得限制,
users: ['u1','u2','u3']
record_begin: ['drop','truncate','alter','create','rename'] #审计的操作, 要求小写, 我都是转换为小写后比较的
写得比较匆忙, 还没时间整理, 就这样吧, 能用就行.
其实mysql-router加个审计插件也是不错的选择, 有机会可以试下. 但要求业务连接到mysql-router, 还对业务有入侵.
感觉还是不如现在这个旁挂审计方便, 主要是不影响现有架构, 但也有缺陷, 就是不支持SSL流量. 不过一般业务也不会使用ssl连接. 但dba会啊, 但dba很少误操作....(自信.jpg)
没时间放github了, 有机会再放上去
from scapy.all import sniff
import datetime
import yaml
import sys,os
def btoint(bdata,t='little'):
return int.from_bytes(bdata,t)
#定于全局变量
global userinfo
global user_dict
global f
def audit_sql(sql,username):
sql = sql.strip().lower()
for x in userinfo[username][1]:
if sql.startswith(x):
return True
return False
def save_pack(pack):
if hasattr(pack,'load') and len(pack.load) >= 5:
bdata = pack.load
ip_port = f"{pack['IP'].src}:{pack['TCP'].sport}"
if len(bdata) == btoint(bdata[:3])+4: #不支持SSL
bdata = bdata[4:]
else:
return None
#print(bdata)
lbdata = len(bdata)
msg = ''
if bdata[:1] == b'\x03': #SQL PACK
if ip_port in user_dict:
username = user_dict[ip_port]
sql = bdata[3:].decode()
if audit_sql(sql,username): #如果符合审计要求,就返回True
msg = f"[{datetime.datetime.now()}] [{userinfo[username][0]}] [{ip_port}] [{username}] : {sql}\n"
f.write(msg)
f.flush()
elif bdata[:1] == b'\x01':
if ip_port not in user_dict: #不是需要审计的账号, 就直接跳过
return None
username = user_dict[ip_port]
msg = f"[{datetime.datetime.now()}] [{userinfo[username][0]}] [{ip_port}] [{username}] : DISCONNECT\n"
#del user_dict[ip_port]
f.write(msg)
f.flush()
elif lbdata > 32 and len(set(bdata[9:32])) == 1: #CONNECT PACK
username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
if username not in userinfo: #不是需要审计的账号, 就直接跳过
return None
user_dict[ip_port] = username
msg = f"[{datetime.datetime.now()}] [{userinfo[username][0]}] [{ip_port}] [{username}] : CONNECTING\n"
f.write(msg)
f.flush()
else:
#print('FAILD...',ip_port)
pass
if __name__ == '__main__':
if len(sys.argv) <= 1:
print(f'python {sys.argv[0]} xxx.yaml')
sys.exit(0)
conf_filename = sys.argv[1]
if os.path.exists(conf_filename):
with open(conf_filename, 'r', encoding="utf-8") as f:
inf_data = f.read()
conf = yaml.load(inf_data,Loader=yaml.Loader)
else:
print(f'{sys.argv[0]} does not exist')
sys.exit(1)
user_dict = conf['GLOBAL']['USER_DICT'] #记录连接和账号对应关系的dict
userinfo = {} #要审计的账号信息
for x in conf['CHILD']:
for username in x['users']:
userinfo[username] = [x['name'],x['record_begin']]
f = open(conf['GLOBAL']['FILENAME'],'a')
sniff(filter=f"dst port {conf['GLOBAL']['INTERFACE_PORT']}", iface=conf['GLOBAL']['INTERFACE_NAME'], prn=save_pack)
yaml格式的, 最近写的脚本 都是这种格式了..
GLOBAL:
FILENAME: 'audit.log' #审计日志的名字
INTERFACE_NAME: 'ens32' #要监控的网卡名字
INTERFACE_PORT: 6446 #要监控的网卡的端口, 就是mysql的端口
USER_DICT : {}
#USER_DICT : {'192.168.101.1:54364':'root','192.168.101.1:54365':'root','localhost':'event_scheduler','192.168.101.1:54786':'u1','192.168.101.1:54787':'u1','ddcw21:50718':'root'} #旁挂审计无法识别已连接的账号信息, 只能识别新连接. 故可以初始化的时候就告诉审计插件相关信息. 参考SQL: select concat('{',group_concat("'",host,"':'",user,"'"),'}') from information_schema.processlist;
CHILD:
- name: 'DBA '
users: ['root','dba1','db2','u2023']
record_begin: ['show','select','insert','delete','update','drop','truncate','alter','create','rename']
- name: 'YEWU'
users: ['u1','u2','u3']
record_begin: ['drop','truncate','alter','create','rename']
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。