前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >[MYSQL案例][022] 高版本mysql做审计

[MYSQL案例][022] 高版本mysql做审计

原创
作者头像
大大刺猬
发布2023-11-14 16:45:05
发布2023-11-14 16:45:05
44310
代码可运行
举报
文章被收录于专栏:大大刺猬大大刺猬
运行总次数:0
代码可运行

需求

mysql 8.0.33 环境需要审计:

审计内容:

要求记录DBA账号的所有操作, 业务账号的DDL操作(dml太多了.) 主要是记录哪个时间点, 哪个人执行的啥SQL

分析

常见的审计插件如下:

1. mysql-audit 只支持到8.0.30 (8.0.33测试无法安装)

2. Mariadb server_audit.so 5.7.3X之后就不支持了

3. Percona Audit Log Plugin 不兼容mysql社区版

当然还有mysql企业版的审计, 但没钱....

那就只能自己写脚本了呗, 可以参考我之前写的审计脚本 https://cloud.tencent.com/developer/article/2259748

原理很简单, 就是通过抓包解析, 匹配符合要求的SQL并记录下来即可.

演示

获取连接信息

登录mysql执行如下sql

代码语言:javascript
代码运行次数:0
复制
select concat('{',group_concat("'",host,"':'",user,"'"),'}') from information_schema.processlist;

把结果报错到配置文件中

我这里只记录新连接, 所以就置为空了. (其实也没得旧连接....)

启动审计脚本

由于环境问题, 我就提前编译好了. (pyinstaller -F auditmysql_by_ddcw.py), 这里旧直接使用二进制文件了

模拟连接

不支持SSL.

我这里演示的是mysql-router的. 因为mysql-router没得审计功能. 但又想要审计功能.

随便执行点SQL

随便跑点SQL后, 查看审计日志内容. 我这里旧只测试下dba账号的. 执行的sql都记录下来了. 跑不了了(不是)

配置文件说明

代码语言:javascript
代码运行次数:0
复制
FILENAME 审计日志的文件名
INTERFACE_NAME  网卡名字, 就是应用的流量到mysql的时候经过的网卡
INTERFACE_PORT  目标端口, 如果是mysql就是3306, 如果是router就是6446之类的, 根据你实际情况来

CHILD:  要监控的组
  - name: 'DBA '  #组名字, 随便取, 只是方便识别而已, 没啥物理意义
    users: ['root','dba1','db2','u2023'] #用户名
    record_begin: ['show','select','insert','delete','update','drop','truncate','alter','create','rename']

  - name: 'YEWU' #另一个组, 想分多少,就分多少, 没得限制, 
    users: ['u1','u2','u3']
    record_begin: ['drop','truncate','alter','create','rename'] #审计的操作, 要求小写, 我都是转换为小写后比较的

总结

写得比较匆忙, 还没时间整理, 就这样吧, 能用就行.

其实mysql-router加个审计插件也是不错的选择, 有机会可以试下. 但要求业务连接到mysql-router, 还对业务有入侵.

感觉还是不如现在这个旁挂审计方便, 主要是不影响现有架构, 但也有缺陷, 就是不支持SSL流量. 不过一般业务也不会使用ssl连接. 但dba会啊, 但dba很少误操作....(自信.jpg)

附脚本和配置文件

没时间放github了, 有机会再放上去

脚本

代码语言:javascript
代码运行次数:0
复制
from scapy.all import sniff
import datetime
import yaml
import sys,os

def btoint(bdata,t='little'):
	return int.from_bytes(bdata,t)

#定于全局变量
global userinfo
global user_dict
global f

def audit_sql(sql,username):
	sql = sql.strip().lower()
	for x in userinfo[username][1]:
		if sql.startswith(x):
			return True
	return False

def save_pack(pack):
	if hasattr(pack,'load') and len(pack.load) >= 5:
		bdata = pack.load
		ip_port = f"{pack['IP'].src}:{pack['TCP'].sport}"
		if len(bdata) == btoint(bdata[:3])+4: #不支持SSL
			bdata = bdata[4:]
		else:
			return None
		#print(bdata)
		lbdata = len(bdata)
		msg = ''

		if bdata[:1] == b'\x03': #SQL PACK
			if ip_port in user_dict:
				username = user_dict[ip_port]
				sql = bdata[3:].decode()
				if audit_sql(sql,username): #如果符合审计要求,就返回True
					msg = f"[{datetime.datetime.now()}] [{userinfo[username][0]}] [{ip_port}] [{username}] : {sql}\n"
					f.write(msg)
					f.flush()

		elif bdata[:1] == b'\x01':
			if ip_port not in user_dict: #不是需要审计的账号, 就直接跳过
				return None
			username = user_dict[ip_port]
			msg = f"[{datetime.datetime.now()}] [{userinfo[username][0]}] [{ip_port}] [{username}] : DISCONNECT\n"
			#del user_dict[ip_port]
			f.write(msg)
			f.flush()

		elif lbdata > 32 and len(set(bdata[9:32])) == 1: #CONNECT PACK
			username = bdata[32:32+bdata[32:].find(b'\x00')].decode()
			if username not in userinfo: #不是需要审计的账号, 就直接跳过
				return None
			user_dict[ip_port] = username
			msg = f"[{datetime.datetime.now()}] [{userinfo[username][0]}] [{ip_port}] [{username}] : CONNECTING\n"
			f.write(msg)
			f.flush()
		else:
			#print('FAILD...',ip_port)
			pass
			

			

if __name__ == '__main__':
	if len(sys.argv) <= 1:
		print(f'python {sys.argv[0]} xxx.yaml')
		sys.exit(0)

	conf_filename = sys.argv[1]
	if os.path.exists(conf_filename):
		with open(conf_filename, 'r', encoding="utf-8") as f:
			inf_data =  f.read()
		conf = yaml.load(inf_data,Loader=yaml.Loader)
	else:
		print(f'{sys.argv[0]} does not exist')
		sys.exit(1)

	user_dict = conf['GLOBAL']['USER_DICT'] #记录连接和账号对应关系的dict
	userinfo = {} #要审计的账号信息
	for x in conf['CHILD']:
		for username in x['users']:
			userinfo[username] = [x['name'],x['record_begin']]
	f = open(conf['GLOBAL']['FILENAME'],'a')
	sniff(filter=f"dst port {conf['GLOBAL']['INTERFACE_PORT']}", iface=conf['GLOBAL']['INTERFACE_NAME'], prn=save_pack)

配置文件

yaml格式的, 最近写的脚本 都是这种格式了..

代码语言:javascript
代码运行次数:0
复制
GLOBAL:
  FILENAME: 'audit.log' #审计日志的名字
  INTERFACE_NAME: 'ens32' #要监控的网卡名字
  INTERFACE_PORT: 6446 #要监控的网卡的端口, 就是mysql的端口
  USER_DICT : {}
  #USER_DICT : {'192.168.101.1:54364':'root','192.168.101.1:54365':'root','localhost':'event_scheduler','192.168.101.1:54786':'u1','192.168.101.1:54787':'u1','ddcw21:50718':'root'} #旁挂审计无法识别已连接的账号信息, 只能识别新连接. 故可以初始化的时候就告诉审计插件相关信息. 参考SQL: select concat('{',group_concat("'",host,"':'",user,"'"),'}') from information_schema.processlist;

CHILD:
  - name: 'DBA '
    users: ['root','dba1','db2','u2023']
    record_begin: ['show','select','insert','delete','update','drop','truncate','alter','create','rename']

  - name: 'YEWU'
    users: ['u1','u2','u3']
    record_begin: ['drop','truncate','alter','create','rename']

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求
  • 分析
  • 演示
    • 获取连接信息
    • 启动审计脚本
    • 模拟连接
    • 随便执行点SQL
  • 配置文件说明
  • 总结
  • 附脚本和配置文件
    • 脚本
    • 配置文件
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档