首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >100天跟着CP学PostgreSQL+AI,第12天 : 用 PostgreSQL 构建 AI 训练数据管道(ETL 全流程)

100天跟着CP学PostgreSQL+AI,第12天 : 用 PostgreSQL 构建 AI 训练数据管道(ETL 全流程)

作者头像
用户8465142
发布2025-08-27 14:05:27
发布2025-08-27 14:05:27
10000
代码可运行
举报
运行总次数:0
代码可运行

作者介绍:崔鹏,计算机学博士,专注 AI 与大数据管理领域研究,拥有十五年数据库、操作系统及存储领域实战经验,兼具 ORACLE OCM、MySQL OCP 等国际权威认证,PostgreSQL ACE,运营技术公众号 "CP 的 PostgreSQL 厨房",持续输出数据库技术洞察与实践经验。作为全球领先专网通信公司核心技术专家,深耕数据库高可用、高性能架构设计,创新探索 AI 在数据库领域的应用落地,其技术方案有效提升企业级数据库系统稳定性与智能化水平。学术层面,已在AI方向发表2篇SCI论文,将理论研究与工程实践深度结合,形成独特的技术研发视角。

系列文章介绍

第二阶段 : 技术融合基础篇

主要内容

主题:用 PostgreSQL 构建 AI 训练数据管道(ETL 全流程)

核心内容:Flink CDC 实时同步数据 / 数据版本管理(维表 Slowly Changing Dimensions)

实践案例:搭建一个动态更新的推荐系统数据集管道

正文

在 AI 训练领域,高质量的数据集是模型效果的核心保障。尤其是推荐系统,需要实时捕捉用户行为变化,同时保留历史维度数据用于模型回溯。本文将通过实战案例,带您搭建一套基于 PostgreSQL 的动态更新推荐系统数据集管道,深度解析 Flink CDC 实时同步技术与维表版本管理方案。

一、推荐系统数据管道核心痛点与架构设计

1.1 业务挑战

实时性要求:用户点击、收藏等行为数据需秒级同步至训练集,传统 T+1 离线 ETL 无法满足

维度变更管理:商品类目调整、用户标签更新等维度变化需保留历史版本,避免模型训练数据断层

数据一致性:跨库表的事实表(用户行为)与维表(用户 / 商品信息)需保证实时关联一致性

1.2 核心组件

Flink CDC:基于 Debezium 实现数据库变更数据捕获,支持增量同步

PostgreSQL:作为源端与目标端存储,利用 JSONB 字段存储扩展属性,通过 TRIGGER 实现版本控制

维表类型:采用 Slowly Changing Dimensions (SCD) 类型 2(历史版本保留)处理维度变更

二、Flink CDC 实时同步实战:以用户行为表为例

2.1 环境准备

代码语言:javascript
代码运行次数:0
运行
复制
# 安装Flink 1.17.1
wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar -zxvf flink-1.17.1-bin-scala_2.12.tgz
# 添加PostgreSQL CDC依赖
cp flink-sql-connector-postgres-cdc-2.3.0.jar flink/lib/

2.2 源表与目标表 DDL 设计

源表(用户点击日志)

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE source.click_log (
    log_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    item_id BIGINT,
    click_time TIMESTAMP,
    client_ip VARCHAR(32)
);

目标事实表(带数据分区

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE target.click_log (
    log_id BIGINT,
    user_id BIGINT,
    item_id BIGINT,
    click_time TIMESTAMP,
    etl_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    op_type VARCHAR(10)  -- 数据操作类型:INSERT/UPDATE/DELETE
);

2.3 Flink SQL 作业配置

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE postgres_source (
    log_id BIGINT,
    user_id BIGINT,
    item_id BIGINT,
    click_time TIMESTAMP,
    __deleted BOOLEAN,  -- CDC删除标记
    PRIMARY KEY (log_id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'source-postgres',
    'port' = '5432',
    'database-name' = 'recommendation_db',
    'schema-name' = 'public',
    'table-name' = 'click_log',
    'username' = 'admin',
    'password' = 'password'
);
INSERT INTO target.click_log (log_id, user_id, item_id, click_time, op_type)
SELECT 
    log_id, 
    user_id, 
    item_id, 
    click_time,
    CASE WHEN __deleted THEN 'DELETE' ELSE 'INSERT' END  -- 简化UPDATE处理,实际需处理字段级变更
FROM postgres_source;

2.4 生产环境优化点

并行度配置:根据 CPU 核数设置parallelism.default: 8

容错机制:启用 Checkpoint,配置execution.checkpointing.interval: 10min

延迟监控:通过 Flink Web UI 监控 Source Lag 指标,阈值设置为 1000 条 / 秒

三、维表版本管理:以商品维度表为例

3.1 SCD 类型 2 表结构设计

代码语言:javascript
代码运行次数:0
运行
复制
CREATE TABLE target.item_dim (
    item_id BIGINT,
    item_name VARCHAR(100),
    category_id BIGINT,
    brand VARCHAR(50),
    version INT,  -- 版本号,自增序列
    effective_start TIMESTAMP,  -- 生效时间
    effective_end TIMESTAMP,    -- 失效时间,默认NULL表示当前版本
    is_current BOOLEAN DEFAULT TRUE,  -- 是否当前有效版本
    PRIMARY KEY (item_id, version)
);

3.2 版本更新逻辑实现

3.2.1 Flink 自定义 Sink 函数

代码语言:javascript
代码运行次数:0
运行
复制
public class SCD2Sink extends RichSinkFunction<Row> {
    private Connection conn;
    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection("jdbc:postgresql://target-postgres:5432/recommendation_db", "admin", "password");
    }
    @Override
    public void invoke(Row row, Context context) throws Exception {
        long itemId = row.getFieldAs("item_id");
        String itemName = row.getFieldAs("item_name");
        // 查询当前最新版本
        PreparedStatement selectStmt = conn.prepareStatement(
            "SELECT version, effective_end FROM item_dim WHERE item_id = ? AND is_current = true"
        );
        selectStmt.setLong(1, itemId);
        ResultSet rs = selectStmt.executeQuery();

        try (Connection txnConn = conn.getConnection();
             PreparedStatement updateStmt = txnConn.prepareStatement(
                 "UPDATE item_dim SET is_current = false, effective_end = NOW() WHERE item_id = ? AND version = ?"
             );
             PreparedStatement insertStmt = txnConn.prepareStatement(
                 "INSERT INTO item_dim (item_id, item_name, category_id, brand, version, effective_start) " +
                 "VALUES (?, ?, ?, ?, ?, NOW())"
             ) {
            if (rs.next()) {
                long currentVersion = rs.getLong("version");
                // 关闭旧版本
                updateStmt.setLong(1, itemId);
                updateStmt.setLong(2, currentVersion);
                updateStmt.executeUpdate();
            }
            // 生成新版本号(实际需使用数据库序列)
            long newVersion = rs.next() ? rs.getLong("version") + 1 : 1;
            insertStmt.setLong(1, itemId);
            insertStmt.setString(2, itemName);
            // 填充其他字段...
            insertStmt.executeUpdate();
            txnConn.commit();
        }
    }
}

3.2.2 版本关联查询

代码语言:javascript
代码运行次数:0
运行
复制
-- 获取用户点击时的商品有效版本
SELECT cl.*, id.item_name, id.brand
FROM target.click_log cl
JOIN target.item_dim id 
ON cl.item_id = id.item_id 
AND cl.click_time BETWEEN id.effective_start AND COALESCE(id.effective_end, NOW())

四、推荐系统数据集管道实践案例

4.1 实验环境

组件

配置

数量

PostgreSQL

16 核 32G, SSD 1TB

2 台

Flink 集群

CPU 8 核,内存 16G

3 节点

数据规模

事实表 500 万条 / 天

维表规模

用户表 100 万条,商品表 50 万条

4.2 实施步骤

4.2.1 初始化数据同步

# 启动Flink作业

代码语言:javascript
代码运行次数:0
运行
复制
flink run -m yarn-cluster -c com.recommendation.PipelineJob /opt/jar/recommendation-pipeline.jar

4.2.2 模拟数据变更

用户行为实时写入:

代码语言:javascript
代码运行次数:0
运行
复制
import psycopg2
conn = psycopg2.connect("dbname=recommendation_db user=admin")
cur = conn.cursor()
cur.execute("INSERT INTO click_log (user_id, item_id, click_time) VALUES (1001, 2001, NOW())")
conn.commit()

商品维度变更(触发版本更新):

代码语言:javascript
代码运行次数:0
运行
复制
UPDATE source.item_info SET category_id = 1002 WHERE item_id = 2001;  -- 类目调整

4.2.3 数据验证

实时同步延迟:通过SELECT NOW() - etl_time FROM click_log,95% 数据延迟 < 500ms

版本完整性:

代码语言:javascript
代码运行次数:0
运行
复制
-- 变更前版本
SELECT * FROM item_dim WHERE item_id = 2001 AND version = 1;
-- 变更后新版本
SELECT * FROM item_dim WHERE item_id = 2001 AND version = 2 AND is_current = true;
验证结果:旧版本effective_end更新为变更时间,新版本effective_start为变更时间,属性值正确更新

关联一致性:

-- 验证点击时间与版本时间对应关系

代码语言:javascript
代码运行次数:0
运行
复制
SELECT COUNT(*) 
FROM click_log cl
LEFT JOIN item_dim id 
ON cl.item_id = id.item_id 
AND cl.click_time BETWEEN id.effective_start AND COALESCE(id.effective_end, NOW())
WHERE id.item_id IS NULL;  -- 应返回0条空关联数据

五、生产环境最佳实践

5.1 性能优化

批量处理:维表更新采用 BATCH_SIZE=500 的批量 UPSERT

索引策略:事实表按click_time创建时间分区索引,维表按item_id, effective_start创建联合索引

连接池:Flink 作业使用 HikariCP 连接池,配置maxPoolSize=20

5.2 数据治理

血缘分析:通过 Apache Atlas 记录数据管道血缘关系,追踪维表变更对训练集的影响

质量监控:设置数据校验规则(如用户 ID 非空率 > 99.9%),触发报警阈值 80%

版本清理:定期删除超过 180 天的历史版本,使用 Cron Job 执行:

代码语言:javascript
代码运行次数:0
运行
复制
DELETE FROM item_dim WHERE effective_end < NOW() - INTERVAL '180 days';

5.3 容错恢复

Checkpoint 存储:使用 HDFS 存储 Checkpoint,配置state.backend: rocksdb

故障转移:Flink 自动从最新 Checkpoint 恢复,结合 PostgreSQL 的 WAL 日志保证数据不丢失

六、总结与扩展方向

通过 Flink CDC 与 PostgreSQL 的深度整合,我们实现了推荐系统数据管道的三大核心能力:

实时数据流:毫秒级捕获用户行为变化

维度可追溯:完整保留维表历史版本,支持模型训练数据重放

数据一致性:通过事务控制与版本时间关联,确保跨表数据匹配准确

未来可扩展方向:

数据湖集成:将版本化数据同步至 Delta Lake,构建湖仓一体架构

智能监控:引入 AI 模型预测数据管道瓶颈,自动调整 Flink 并行度

数据沙箱:为算法团队提供独立版本的维表快照,支持实验环境快速搭建

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-05-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CP的postgresql厨房 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档