"数据库又慢了!"这已经成为我们DBA团队的日常噩梦。随着业务快速发展,公司同时使用Oracle和MySQL数据库,性能问题频发。每次业务卡顿,我们都得像救火队员一样,手动登录各个数据库排查,效率低下且被动。今天我用AI帮忙生成了一个统一的数据库慢SQL监控系统,不但开发效率高而且还提供了比较智能化思路。
感兴趣的同学可以联系我获取源码,一起共同改造提升、丰富功能。
一、 背景
1. 痛点:分散监控的困境
之前我已经写过有两套独立的监控方案:
虽然也都配置了监控到就发送预警信息,但是这种分散的监控方式还是存在明显问题:
2. 转折点:统一监控及可视化的想法
直到最近,我在做日常巡检及统计的时候,我下定决心:必须建立一个统一的数据库监控平台给自己用,来提升效率及可视化展示,方便分析及处理。
二、设计过程
1. 第一版设计思路
最开始我的设想比较简单,就是通过采集程序将各个数据采集、存储在数据库的表里

2. 第二步优化
以MySQL为例,在原先的MySQL的监控程序基础上添加前端展示页面

3. 最终前端优化
最终采集了几个实例的数据库后发现数据量还是比较多的,整个页面就显得很长,因此考虑进行分页展示SQL明细。

三、 程序设计
根据上面的思路,最终在AI的帮助下,完成了总体的程序设计,基本满足了SQL展示及统计的需求,后面还有需求会在上面进行迭代。
1. 项目结构
由于涉及监控采集脚本、Flask主程序、配置文件(可选,我最终去掉了这文件,简化一下)、静态资源(前端页面、样式文件)、日志目录(运行日志),最终的项目结构如下:
long_sql_monitor/
├── db_long_sql_monitor.py # 监控采集程序
├── app.py # Flask主程序
├── requirements.txt # 依赖文件
├── config.py # 配置文件
├── static/
│ ├── index.html # 前端页面
│ └── css/ # 样式文件
└── logs/ # 日志目录2. 核心技术选型
3. 核心模块代码
3.1 管理模块
主要涉及数据库实例信息的获取,动态条件构建,灵活支持多种查询场景;另外考虑用缓存机制减少数据库访问压力,且统一的配置管理入口
class ConfigManager:
def get_database_instances(self, db_type=None):
"""智能获取数据库实例配置,支持缓存优化"""
instances = []
conn = self._get_mysql_connection()
try:
with conn.cursor() as cursor:
# 动态构建查询条件
conditions = ["db_ip IS NOT NULL", "is_active = 1"]
params = []
if db_type and db_type in ['Oracle', 'MySQL']:
conditions.append("db_type = %s")
params.append(db_type)
where_clause = " AND ".join(conditions)
sql = f"""
SELECT id, db_project, db_ip, db_port, instance_name,
db_user, db_password, db_type, monitor_interval
FROM db_instance_info
WHERE {where_clause}
ORDER BY db_type, db_project
"""
cursor.execute(sql, params)
return cursor.fetchall()
except Exception as e:
logging.error(f"配置获取失败: {str(e)}")
return []3.2 多数据库适配引擎
因为后续还会加其他的数据库,因此设计统一接口,新增数据库类型只需实现对应方法。连接池管理,避免频繁创建销毁连接;另外做了结果标准化,便于后续处理,对于的代码片段如下:
class DatabaseAdapter:
def get_long_running_sql(self, instance_info):
"""统一接口获取不同数据库的长时SQL"""
db_type = instance_info['db_type'].lower()
if db_type == 'oracle':
return self._get_oracle_long_sql(instance_info)
elif db_type == 'mysql':
return self._get_mysql_long_sql(instance_info)
else:
raise ValueError(f"不支持的数据库类型: {db_type}")
def _get_oracle_long_sql(self, instance_info):
"""Oracle长SQL检测实现"""
conn = self._connect_oracle(instance_info)
cursor = conn.cursor()
sql = """
SELECT s.sid, s.serial#, s.username, s.sql_id,
sq.sql_text, (SYSDATE - s.sql_exec_start) * 1440 as elapsed_minutes
FROM v$session s
JOIN v$sql sq ON s.sql_id = sq.sql_id
WHERE s.status = 'ACTIVE'
AND (SYSDATE - s.sql_exec_start) * 1440 > 1
AND s.username IS NOT NULL
"""
cursor.execute(sql)
return self._format_oracle_results(cursor.fetchall())
def _get_mysql_long_sql(self, instance_info):
"""MySQL长SQL检测实现"""
conn = self._connect_mysql(instance_info)
cursor = conn.cursor()
sql = """
SELECT id, user, host, db, command, time, state, info
FROM information_schema.processlist
WHERE command != 'Sleep'
AND time > 1
AND info IS NOT NULL
"""
cursor.execute(sql)
return self._format_mysql_results(cursor.fetchall())3.3 智能预警引擎
做了几个级别,进行多维度评分,作为评估问题严重性的参考(可按照自己的业务情况调整)。由于有几个库慢SQL比较多,因此加了防骚扰机制,避免告警风暴。
class SmartAlertEngine:
def analyze_sql_severity(self, sql_info):
"""智能分析SQL严重程度"""
base_score = 0
# 运行时长评分(指数级增长)
if sql_info['elapsed_minutes'] > 60: # 1小时以上
base_score += 100
elif sql_info['elapsed_minutes'] > 30: # 30分钟以上
base_score += 50
elif sql_info['elapsed_minutes'] > 10: # 10分钟以上
base_score += 20
elif sql_info['elapsed_minutes'] > 5: # 5分钟以上
base_score += 10
else: # 1-5分钟
base_score += 5
# 资源消耗评分
if hasattr(sql_info, 'cpu_time') and sql_info['cpu_time'] > 3600:
base_score += 30
if hasattr(sql_info, 'physical_reads') and sql_info['physical_reads'] > 10000:
base_score += 25
# 业务重要性加权
if '核心业务' in sql_info.get('db_project', ''):
base_score *= 1.5
return self._score_to_severity(base_score)
def should_alert(self, instance_ip, severity, alert_history):
"""智能防骚扰判断"""
from datetime import datetime, timedelta
# 同一实例相同严重程度的告警,5分钟内不重复
key = f"{instance_ip}_{severity}"
last_alert = alert_history.get(key)
if last_alert and datetime.now() - last_alert < timedelta(minutes=5):
return False
# 更新告警历史
alert_history[key] = datetime.now()
return True3.4 实时数据存储模块
在数据存储这块,目前考虑的安装如下方式提升存储速度和异常处理:
class DataManager:
def batch_save_sql_records(self, instance_id, sql_records):
"""批量保存SQL监控记录,高性能设计"""
if not sql_records:
return
conn = self._get_connection()
try:
with conn.cursor() as cursor:
# 批量插入数据
sql = """
INSERT INTO long_running_sql_log
(db_instance_id, session_id, username, sql_text,
elapsed_minutes, detect_time, db_type)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
# 分批处理,避免单次事务过大
batch_size = 100
for i in range(0, len(sql_records), batch_size):
batch = sql_records[i:i + batch_size]
values = [
(instance_id, rec['session_id'], rec['username'],
rec['sql_text'][:1000], rec['elapsed_minutes'],
datetime.now(), rec.get('db_type', 'Unknown'))
for rec in batch
]
cursor.executemany(sql, values)
conn.commit()
logging.info(f"成功保存 {len(sql_records)} 条监控记录")
except Exception as e:
logging.error(f"数据保存失败: {str(e)}")
conn.rollback()
finally:
conn.close()3.5 RESTful API 接口
API设计这块不专业,就按照如下原则让AI帮忙生成了:
@app.route('/api/long_sql', methods=['GET'])
def get_long_running_sql():
"""获取长时SQL数据接口"""
try:
# 参数解析和验证
hours = request.args.get('hours', 24, type=int)
db_type = request.args.get('db_type', type=str)
page = request.args.get('page', 1, type=int)
page_size = min(request.args.get('page_size', 20, type=int), 100)
# 数据查询
sql_records = data_manager.query_long_sql(
hours=hours,
db_type=db_type,
page=page,
page_size=page_size
)
# 构建响应
return jsonify({
'success': True,
'data': sql_records,
'pagination': {
'page': page,
'page_size': page_size,
'total': len(sql_records)
},
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logging.error(f"API查询失败: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
}), 5003.6 实时前端展示
前端按照如下特性生成的代码:
class RealTimeDashboard {
constructor() {
this.charts = {};
this.autoRefresh = true;
}
// 初始化图表
initCharts() {
// 实例分布图表
this.charts.instanceChart = echarts.init(document.getElementById('instanceChart'));
this.charts.instanceChart.setOption({
title: { text: '实例监控统计' },
tooltip: { trigger: 'axis' },
xAxis: { type: 'category', data: [] },
yAxis: { type: 'value' },
series: [{ data: [], type: 'bar' }]
});
// 趋势分析图表
this.charts.trendChart = echarts.init(document.getElementById('trendChart'));
}
// 实时数据更新
async updateDashboard() {
try {
const [stats, sqlData] = await Promise.all([
this.fetchStatistics(),
this.fetchSQLData()
]);
this.updateStatsCards(stats);
this.updateInstanceChart(stats.instanceStats);
this.updateTrendChart(stats.trendData);
this.updateSQLTable(sqlData);
} catch (error) {
this.showError('数据更新失败: ' + error.message);
}
}
// 智能自动刷新
startAutoRefresh() {
setInterval(() => {
if (this.autoRefresh && this.isPageVisible()) {
this.updateDashboard();
}
}, 30000); // 30秒刷新
}
}4. 运行效果
4.1 启动应用
本次涉及采集程序(db_long_sql_monitor.py)及应用程序(app.py),因此需要启动这2个程序
nohup python3 db_long_sql_monitor.py &
nohup python3 app.py &4.2 运行的效果如下
当前实现的功能如下:实例个数、慢SQL数量、平均市场、严重告警数量等。 也可以进去时间范围(最近1h,24h、最近7天,30天,全部等),实例筛选、最小运行时间(后续在调整更细粒度及范围的),分页展示行数等

统计模块包含按照实例统计及时间统计趋势。 最终具体的SQL列表可以在上面进行筛选、每页默认20行记录。

5. 结语
通过这个AI生成的数据库监控系统,我们见证了AI如何重塑传统运维工作流。从需求描述到完整系统,AI不仅大幅提升了开发效率,更带来了意想不到的智能化水平。随着AI代码生成能力的持续进化,我们有理由相信:未来的运维系统将更加智能、自适应甚至自修复。而作为技术人,我们要做的不是抗拒变化,而是主动拥抱这场效率革命,将AI转化为提升运维效能的神兵利器。
你是否也曾尝试用AI生成运维脚本或系统?欢迎在评论区分享你的实践经验和心得体会!如果你对文中的监控系统感兴趣,也可以留言获取更多实现细节。