首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >100天跟着CP学PostgreSQL+AI,第25天 :timescaledb结合机器学习实战(应PGer同学留言临时增加)

100天跟着CP学PostgreSQL+AI,第25天 :timescaledb结合机器学习实战(应PGer同学留言临时增加)

作者头像
用户8465142
发布2025-08-27 14:12:10
发布2025-08-27 14:12:10
17600
代码可运行
举报
运行总次数:0
代码可运行

作者介绍:崔鹏,计算机学博士,专注 AI 与大数据管理领域研究,拥有十五年数据库、操作系统及存储领域实战经验,兼具 ORACLE OCM、MySQL OCP 等国际权威认证,PostgreSQL ACE,运营技术公众号 "CP 的 PostgreSQL 厨房",持续输出数据库技术洞察与实践经验。作为全球领先专网通信公司核心技术专家,深耕数据库高可用、高性能架构设计,创新探索 AI 在数据库领域的应用落地,其技术方案有效提升企业级数据库系统稳定性与智能化水平。学术层面,已在AI方向发表2篇SCI论文,将理论研究与工程实践深度结合,形成独特的技术研发视角

应PGer同学留言临时增加一篇文章

一、引言

在工业物联网、金融量化分析、智能设备监控等领域,时间序列数据正以爆炸式增长。TimescaleDB 作为专为时间序列数据设计的高性能数据库,能够高效处理 TB 级规模的时序数据存储与查询。当结合机器学习技术时,我们可以从海量历史数据中挖掘规律,实现智能预测与异常检测。本文将通过完整的实战案例,演示如何利用 TimescaleDB 构建机器学习应用,包含完整的试验代码与技术解析。

二、技术栈准备

1. 环境配置

代码语言:javascript
代码运行次数:0
运行
复制
# 安装Python依赖
pip install psycopg2-binary pandas numpy scikit-learn tensorflow matplotlib
# TimescaleDB服务启动(Docker方式)
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=timescaledb --name timescaledb timescale/timescaledb:latest-pg15

2. 数据库连接配置

代码语言:javascript
代码运行次数:0
运行
复制
import psycopg2
DB_CONFIG = {
    "dbname": "tsdb",
    "user": "postgres",
    "password": "timescaledb",
    "host": "localhost",
    "port": 5432
}
def get_db_connection():
    conn = psycopg2.connect(**DB_CONFIG)
    conn.autocommit = True
    return conn

三、TimescaleDB 数据建模

1. 创建时间序列表

代码语言:javascript
代码运行次数:0
运行
复制
CREATE DATABASE tsdb;
\c tsdb;
CREATE EXTENSION timescaledb;
CREATE TABLE sensor_data (
    time        TIMESTAMP NOT NULL,
    device_id   INTEGER NOT NULL,
    temperature FLOAT,
    humidity    FLOAT,
    pressure    FLOAT
);
SELECT create_hypertable('sensor_data', 'time');

2. 生成模拟数据

代码语言:javascript
代码运行次数:0
运行
复制
import random
from datetime import datetime, timedelta
def generate_sample_data(start_time, device_id, num_samples):
    data = []
    for i in range(num_samples):
        current_time = start_time + timedelta(minutes=i)
        temp = 20 + random.uniform(-2, 2) + 0.1*i  # 模拟温度趋势
        hum = 60 + random.uniform(-5, 5)
        press = 1013 + random.uniform(-10, 10)
        data.append((current_time, device_id, temp, hum, press))
    return data
# 插入10万个模拟数据点
conn = get_db_connection()
cur = conn.cursor()
start_time = datetime(2023, 1, 1, 0, 0, 0)
for device_id in range(1, 11):  # 10个设备
    samples = generate_sample_data(start_time, device_id, 10000)
    insert_sql = "INSERT INTO sensor_data (time, device_id, temperature, humidity, pressure) VALUES %s"
    cur.execute(insert_sql, [tuple(samples)])
cur.close()
conn.close()

四、机器学习任务定义:设备温度预测

1. 数据预处理模块

代码语言:javascript
代码运行次数:0
运行
复制
import pandas as pd
def load_data(device_id, lookback=24):
    """
    加载指定设备数据并生成时序特征
    :param device_id: 设备ID
    :param lookback: 时间窗口大小(小时)
    :return: 特征矩阵X,标签y
    """
    conn = get_db_connection()
    query = f"""
    SELECT time, temperature 
    FROM sensor_data 
    WHERE device_id = {device_id} 
    ORDER BY time ASC
    """
    df = pd.read_sql(query, conn, parse_dates=['time'])
    conn.close()
    
    # 生成时间序列特征
    df['timestamp'] = df['time'].map(pd.Timestamp.timestamp)
    df = df.set_index('time')
    
    X, y = [], []
    for i in range(lookback, len(df)):
        X.append(df['temperature'][i-lookback:i].values)
        y.append(df['temperature'][i])
    
    return np.array(X), np.array(y)

2. 模型构建(LSTM 神经网络)

代码语言:javascript
代码运行次数:0
运行
复制
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
def build_lstm_model(input_shape, units=64, dropout=0.2):
    model = Sequential()
    model.add(LSTM(units, input_shape=input_shape, return_sequences=True))
    model.add(Dropout(dropout))
    model.add(LSTM(units, return_sequences=False))
    model.add(Dropout(dropout))
    model.add(Dense(1))
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

3. 训练流程实现

代码语言:javascript
代码运行次数:0
运行
复制
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# 数据加载与预处理
device_id = 1
X, y = load_data(device_id)
X = X.reshape((X.shape[0], X.shape[1], 1))  # LSTM输入格式
# 标准化处理
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X.reshape(-1, 1)).reshape(X.shape)
y_scaled = scaler.transform(y.reshape(-1, 1)).flatten()
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y_scaled, test_size=0.2, shuffle=False
)
# 模型训练
model = build_lstm_model((X_train.shape[1], X_train.shape[2]))
history = model.fit(
    X_train, y_train,
    batch_size=32,
    epochs=50,
    validation_split=0.1,
    verbose=1
)

五、高级优化技术

1. 时间序列交叉验证

代码语言:javascript
代码运行次数:0
运行
复制
from sklearn.model_selection import TimeSeriesSplit
tss = TimeSeriesSplit(n_splits=5)
for train_idx, val_idx in tss.split(X_scaled):
    X_tr, X_val = X_scaled[train_idx], X_scaled[val_idx]
    y_tr, y_val = y_scaled[train_idx], y_scaled[val_idx]
    # 执行模型训练与评估

2. 数据库端数据聚合优化

代码语言:javascript
代码运行次数:0
运行
复制
def load_aggregated_data(device_id, lookback=24, interval='15 minutes'):
    """
    使用TimescaleDB进行数据降采样
    """
    conn = get_db_connection()
    query = f"""
    SELECT time_bucket('%s', time) AS bucket, 
           AVG(temperature) AS avg_temp,
           MAX(temperature) AS max_temp,
           MIN(temperature) AS min_temp
    FROM sensor_data 
    WHERE device_id = {device_id} 
    GROUP BY bucket 
    ORDER BY bucket ASC
    """ % interval
    df = pd.read_sql(query, conn, parse_dates=['bucket'])
    conn.close()
    
    # 后续特征工程...

3. 异常检测扩展(Isolation Forest)

代码语言:javascript
代码运行次数:0
运行
复制
from sklearn.ensemble import IsolationForest
# 构建异常检测模型
anomaly_model = IsolationForest(contamination=0.01)
anomaly_model.fit(X_scaled[-30*24:])  # 使用最近30天数据训练
# 实时检测逻辑
new_data = ... # 从数据库获取最新数据窗口
new_data_scaled = scaler.transform(new_data.reshape(-1,1)).reshape(1, -1, 1)
prediction = anomaly_model.predict(new_data_scaled)
if prediction == -1:
    print("检测到异常温度数据")

六、性能优化策略

1. TimescaleDB 查询优化

代码语言:javascript
代码运行次数:0
运行
复制
-- 创建时间+设备ID索引
CREATE INDEX ON sensor_data (device_id, time DESC);
-- 优化连续查询(用于实时聚合)
CREATE MATERIALIZED VIEW device_hourly_stats AS
SELECT device_id,
       time_bucket('1 hour', time) AS bucket,
       AVG(temperature) AS avg_temp,
       STDDEV(temperature) AS temp_std
FROM sensor_data
GROUP BY device_id, bucket
WITH DATA;

2. 模型部署优化

代码语言:javascript
代码运行次数:0
运行
复制
# 保存模型
model.save('temperature_prediction_model.h5')
# 加载模型进行预测
from tensorflow.keras.models import load_model
def predict_next_hour(device_id):
    model = load_model('temperature_prediction_model.h5')
    X, _ = load_data(device_id)
    last_window = X[-1:].reshape(1, X.shape[1], 1)
    scaled_input = scaler.transform(last_window.reshape(-1, 1)).reshape(last_window.shape)
    prediction = model.predict(scaled_input)
    return scaler.inverse_transform(prediction.reshape(-1, 1))[0][0]

七、可视化与结果分析

1. 训练过程可视化

代码语言:javascript
代码运行次数:0
运行
复制
import matplotlib.pyplot as plt
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training Loss')
plt.xlabel('Epoch')
plt.ylabel('MSE')
plt.legend()
plt.show()

2. 预测结果对比

代码语言:javascript
代码运行次数:0
运行
复制
# 生成预测数据
X_test_reshaped = X_test.reshape((X_test.shape[0], X_test.shape[1]))
y_pred = model.predict(X_test).flatten()
y_pred = scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()
y_test_original = scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()
plt.figure(figsize=(12, 6))
plt.plot(y_test_original, label='Actual Temperature')
plt.plot(y_pred, label='Predicted Temperature')
plt.title(f'Device {device_id} Temperature Prediction')
plt.xlabel('Time Step')
plt.ylabel('Temperature (°C)')
plt.legend()
plt.show()

八、生产环境部署建议

数据管道:使用 Apache Kafka 进行实时数据 ingestion,通过 Flink 进行预处理后写入 TimescaleDB

模型服务:通过 FastAPI 封装预测接口,使用 Docker 进行容器化部署

监控系统:建立模型性能监控仪表盘,实时跟踪预测误差与数据分布变化

自动更新:设置定时任务,每周自动用最新数据重新训练模型

九、总结与展望

本文通过完整的温度预测案例,展示了 TimescaleDB 与机器学习结合的全流程实现。实际应用中,这种技术组合在以下领域具有广阔前景:

工业预测性维护:通过设备传感器数据预测故障发生时间

能源管理:实现电网负荷的精准预测以优化资源调配

金融风控:对高频交易数据进行实时异常检测

智能农业:根据环境数据预测作物生长趋势

随着边缘计算的普及,未来可以探索在边缘节点部署轻量级机器学习模型,结合 TimescaleDB 的边缘端版本,实现 "数据采集 - 实时分析 - 智能决策" 的闭环。这种本地化处理方案将进一步提升系统响应速度,降低数据传输成本。

通过持续优化数据库查询性能与机器学习模型精度,我们能够构建更智能、更高效的时序数据处理系统,为各行业的数字化转型提供强大的技术支撑。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-05-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CP的postgresql厨房 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 应PGer同学留言临时增加一篇文章
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档