作者介绍:崔鹏,计算机学博士,专注 AI 与大数据管理领域研究,拥有十五年数据库、操作系统及存储领域实战经验,兼具 ORACLE OCM、MySQL OCP 等国际权威认证,PostgreSQL ACE,运营技术公众号 "CP 的 PostgreSQL 厨房",持续输出数据库技术洞察与实践经验。作为全球领先专网通信公司核心技术专家,深耕数据库高可用、高性能架构设计,创新探索 AI 在数据库领域的应用落地,其技术方案有效提升企业级数据库系统稳定性与智能化水平。学术层面,已在AI方向发表2篇SCI论文,将理论研究与工程实践深度结合,形成独特的技术研发视角。
系列文章介绍
第二阶段 : 技术融合基础篇
主要内容
主题:用 PostgreSQL 构建 AI 训练数据管道(ETL 全流程)
核心内容:Flink CDC 实时同步数据 / 数据版本管理(维表 Slowly Changing Dimensions)
实践案例:搭建一个动态更新的推荐系统数据集管道
正文
在 AI 训练领域,高质量的数据集是模型效果的核心保障。尤其是推荐系统,需要实时捕捉用户行为变化,同时保留历史维度数据用于模型回溯。本文将通过实战案例,带您搭建一套基于 PostgreSQL 的动态更新推荐系统数据集管道,深度解析 Flink CDC 实时同步技术与维表版本管理方案。
一、推荐系统数据管道核心痛点与架构设计
1.1 业务挑战
实时性要求:用户点击、收藏等行为数据需秒级同步至训练集,传统 T+1 离线 ETL 无法满足
维度变更管理:商品类目调整、用户标签更新等维度变化需保留历史版本,避免模型训练数据断层
数据一致性:跨库表的事实表(用户行为)与维表(用户 / 商品信息)需保证实时关联一致性
1.2 核心组件
Flink CDC:基于 Debezium 实现数据库变更数据捕获,支持增量同步
PostgreSQL:作为源端与目标端存储,利用 JSONB 字段存储扩展属性,通过 TRIGGER 实现版本控制
维表类型:采用 Slowly Changing Dimensions (SCD) 类型 2(历史版本保留)处理维度变更
二、Flink CDC 实时同步实战:以用户行为表为例
2.1 环境准备
# 安装Flink 1.17.1
wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar -zxvf flink-1.17.1-bin-scala_2.12.tgz
# 添加PostgreSQL CDC依赖
cp flink-sql-connector-postgres-cdc-2.3.0.jar flink/lib/
2.2 源表与目标表 DDL 设计
源表(用户点击日志)
CREATE TABLE source.click_log (
log_id BIGINT PRIMARY KEY,
user_id BIGINT,
item_id BIGINT,
click_time TIMESTAMP,
client_ip VARCHAR(32)
);
目标事实表(带数据分区)
CREATE TABLE target.click_log (
log_id BIGINT,
user_id BIGINT,
item_id BIGINT,
click_time TIMESTAMP,
etl_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
op_type VARCHAR(10) -- 数据操作类型:INSERT/UPDATE/DELETE
);
2.3 Flink SQL 作业配置
CREATE TABLE postgres_source (
log_id BIGINT,
user_id BIGINT,
item_id BIGINT,
click_time TIMESTAMP,
__deleted BOOLEAN, -- CDC删除标记
PRIMARY KEY (log_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'source-postgres',
'port' = '5432',
'database-name' = 'recommendation_db',
'schema-name' = 'public',
'table-name' = 'click_log',
'username' = 'admin',
'password' = 'password'
);
INSERT INTO target.click_log (log_id, user_id, item_id, click_time, op_type)
SELECT
log_id,
user_id,
item_id,
click_time,
CASE WHEN __deleted THEN 'DELETE' ELSE 'INSERT' END -- 简化UPDATE处理,实际需处理字段级变更
FROM postgres_source;
2.4 生产环境优化点
并行度配置:根据 CPU 核数设置parallelism.default: 8
容错机制:启用 Checkpoint,配置execution.checkpointing.interval: 10min
延迟监控:通过 Flink Web UI 监控 Source Lag 指标,阈值设置为 1000 条 / 秒
三、维表版本管理:以商品维度表为例
3.1 SCD 类型 2 表结构设计
CREATE TABLE target.item_dim (
item_id BIGINT,
item_name VARCHAR(100),
category_id BIGINT,
brand VARCHAR(50),
version INT, -- 版本号,自增序列
effective_start TIMESTAMP, -- 生效时间
effective_end TIMESTAMP, -- 失效时间,默认NULL表示当前版本
is_current BOOLEAN DEFAULT TRUE, -- 是否当前有效版本
PRIMARY KEY (item_id, version)
);
3.2 版本更新逻辑实现
3.2.1 Flink 自定义 Sink 函数
public class SCD2Sink extends RichSinkFunction<Row> {
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:postgresql://target-postgres:5432/recommendation_db", "admin", "password");
}
@Override
public void invoke(Row row, Context context) throws Exception {
long itemId = row.getFieldAs("item_id");
String itemName = row.getFieldAs("item_name");
// 查询当前最新版本
PreparedStatement selectStmt = conn.prepareStatement(
"SELECT version, effective_end FROM item_dim WHERE item_id = ? AND is_current = true"
);
selectStmt.setLong(1, itemId);
ResultSet rs = selectStmt.executeQuery();
try (Connection txnConn = conn.getConnection();
PreparedStatement updateStmt = txnConn.prepareStatement(
"UPDATE item_dim SET is_current = false, effective_end = NOW() WHERE item_id = ? AND version = ?"
);
PreparedStatement insertStmt = txnConn.prepareStatement(
"INSERT INTO item_dim (item_id, item_name, category_id, brand, version, effective_start) " +
"VALUES (?, ?, ?, ?, ?, NOW())"
) {
if (rs.next()) {
long currentVersion = rs.getLong("version");
// 关闭旧版本
updateStmt.setLong(1, itemId);
updateStmt.setLong(2, currentVersion);
updateStmt.executeUpdate();
}
// 生成新版本号(实际需使用数据库序列)
long newVersion = rs.next() ? rs.getLong("version") + 1 : 1;
insertStmt.setLong(1, itemId);
insertStmt.setString(2, itemName);
// 填充其他字段...
insertStmt.executeUpdate();
txnConn.commit();
}
}
}
3.2.2 版本关联查询
-- 获取用户点击时的商品有效版本
SELECT cl.*, id.item_name, id.brand
FROM target.click_log cl
JOIN target.item_dim id
ON cl.item_id = id.item_id
AND cl.click_time BETWEEN id.effective_start AND COALESCE(id.effective_end, NOW())
四、推荐系统数据集管道实践案例
4.1 实验环境
组件 | 配置 | 数量 |
---|---|---|
PostgreSQL | 16 核 32G, SSD 1TB | 2 台 |
Flink 集群 | CPU 8 核,内存 16G | 3 节点 |
数据规模 | 事实表 500 万条 / 天 | |
维表规模 | 用户表 100 万条,商品表 50 万条 |
4.2 实施步骤
4.2.1 初始化数据同步
# 启动Flink作业
flink run -m yarn-cluster -c com.recommendation.PipelineJob /opt/jar/recommendation-pipeline.jar
4.2.2 模拟数据变更
用户行为实时写入:
import psycopg2
conn = psycopg2.connect("dbname=recommendation_db user=admin")
cur = conn.cursor()
cur.execute("INSERT INTO click_log (user_id, item_id, click_time) VALUES (1001, 2001, NOW())")
conn.commit()
商品维度变更(触发版本更新):
UPDATE source.item_info SET category_id = 1002 WHERE item_id = 2001; -- 类目调整
4.2.3 数据验证
实时同步延迟:通过SELECT NOW() - etl_time FROM click_log,95% 数据延迟 < 500ms
版本完整性:
-- 变更前版本
SELECT * FROM item_dim WHERE item_id = 2001 AND version = 1;
-- 变更后新版本
SELECT * FROM item_dim WHERE item_id = 2001 AND version = 2 AND is_current = true;
验证结果:旧版本effective_end更新为变更时间,新版本effective_start为变更时间,属性值正确更新
关联一致性:
-- 验证点击时间与版本时间对应关系
SELECT COUNT(*)
FROM click_log cl
LEFT JOIN item_dim id
ON cl.item_id = id.item_id
AND cl.click_time BETWEEN id.effective_start AND COALESCE(id.effective_end, NOW())
WHERE id.item_id IS NULL; -- 应返回0条空关联数据
五、生产环境最佳实践
5.1 性能优化
批量处理:维表更新采用 BATCH_SIZE=500 的批量 UPSERT
索引策略:事实表按click_time创建时间分区索引,维表按item_id, effective_start创建联合索引
连接池:Flink 作业使用 HikariCP 连接池,配置maxPoolSize=20
5.2 数据治理
血缘分析:通过 Apache Atlas 记录数据管道血缘关系,追踪维表变更对训练集的影响
质量监控:设置数据校验规则(如用户 ID 非空率 > 99.9%),触发报警阈值 80%
版本清理:定期删除超过 180 天的历史版本,使用 Cron Job 执行:
DELETE FROM item_dim WHERE effective_end < NOW() - INTERVAL '180 days';
5.3 容错恢复
Checkpoint 存储:使用 HDFS 存储 Checkpoint,配置state.backend: rocksdb
故障转移:Flink 自动从最新 Checkpoint 恢复,结合 PostgreSQL 的 WAL 日志保证数据不丢失
六、总结与扩展方向
通过 Flink CDC 与 PostgreSQL 的深度整合,我们实现了推荐系统数据管道的三大核心能力:
实时数据流:毫秒级捕获用户行为变化
维度可追溯:完整保留维表历史版本,支持模型训练数据重放
数据一致性:通过事务控制与版本时间关联,确保跨表数据匹配准确
未来可扩展方向:
数据湖集成:将版本化数据同步至 Delta Lake,构建湖仓一体架构
智能监控:引入 AI 模型预测数据管道瓶颈,自动调整 Flink 并行度
数据沙箱:为算法团队提供独立版本的维表快照,支持实验环境快速搭建
本文分享自 CP的postgresql厨房 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!