
从亚马逊的"购买了此商品的顾客也购买了"到Netflix的个性化电影推荐,从抖音的短视频流LinkedIn的职位推荐,推荐算法无处不在。然而,构建一个高效、可扩展的推荐系统面临着诸多技术挑战:海量用户行为数据的实时处理、复杂特征工程的计算效率、多源异构数据的统一管理,以及推荐结果的实时反馈与优化。
传统上,推荐系统的技术栈往往采用多种数据库的混合架构:Redis用于缓存热点数据,MongoDB存储用户画像,Elasticsearch处理全文检索,Spark进行离线计算。这种架构虽然能满足功能需求,但也带来了数据一致性维护困难、运维成本高、技术栈复杂等问题。PostgreSQL作为世界上最先进的开源关系型数据库,近年来通过其强大的扩展生态系统,特别是PostGIS(地理空间扩展)、pgvector(向量扩展)、TimescaleDB(时序扩展)以及Madlib(机器学习库)等,已经发展成为一个能够支撑完整推荐系统链路的多模态数据库平台。
假设我们正在为一家大型电商平台构建推荐系统,该平台拥有超过5000万注册用户和2000万SKU(库存单位)。传统的用户画像通常以标签形式存储,如"年龄:25-30"、"性别:女"、"偏好品类:美妆护肤"。然而,这种离散标签无法捕捉用户兴趣的连续性和细微差别。现代推荐系统更倾向于使用嵌入向量(Embedding)来表示用户和物品,将高维稀疏特征映射到低维稠密向量空间。
在我们的案例中,用户画像向量维度为512维,通过深度学习模型(如双塔模型)训练得到,能够捕捉用户的长期偏好和实时意图。例如,一个经常浏览高端护肤品、关注成分分析、购买频次稳定的用户,其向量会与平价彩妆用户的向量在空间中形成明显区隔。当新商品上架时,我们只需计算用户向量与商品向量的相似度,即可实现"千人千面"的个性化推荐。
PostgreSQL通过pgvector扩展提供了原生的向量数据类型和相似度搜索功能,支持L2距离、内积、余弦相似度等多种度量方式。相比专用的向量数据库(如Milvus、Pinecone),PostgreSQL的优势在于事务一致性、成熟的权限管理,以及与其他业务数据(订单、库存、用户基础信息)的无缝关联查询能力。
步骤1:环境准备与扩展安装
-- 检查PostgreSQL版本(要求12+)
SELECT version();
-- 安装pgvector扩展
CREATE EXTENSION IF NOT EXISTS vector;
-- 验证安装
SELECT * FROM pg_extension WHERE extname = 'vector';步骤2:设计用户画像表结构
-- 创建用户基础信息表
CREATE TABLE users (
user_id BIGINT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
register_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_active TIMESTAMP,
user_segment VARCHAR(20) -- 用户分层:新客、活跃、沉睡、流失
);
-- 创建用户向量画像表(与基础信息分离,便于独立优化)
CREATE TABLE user_embeddings (
user_id BIGINT PRIMARY KEY REFERENCES users(user_id) ON DELETE CASCADE,
embedding VECTOR(512), -- 512维用户兴趣向量
model_version VARCHAR(20), -- 模型版本管理,便于A/B测试
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata JSONB -- 存储向量生成时的上下文信息
);
-- 创建商品向量表(用于相似度匹配)
CREATE TABLE item_embeddings (
item_id BIGINT PRIMARY KEY,
category_id INT,
embedding VECTOR(512),
price_range INT, -- 价格档位:1-低价,5-奢侈
popularity_score FLOAT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 为向量列创建IVFFlat索引(平衡查询速度与召回率)
CREATE INDEX idx_user_embedding ON user_embeddings
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100); -- 根据数据量调整,通常设置为sqrt(n)/2
CREATE INDEX idx_item_embedding ON item_embeddings
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 200);步骤3:向量相似度搜索实现
import psycopg2
import numpy as np
from psycopg2.extras import RealDictCursor
class VectorRecommendationEngine:
def __init__(self, db_config):
self.conn = psycopg2.connect(**db_config)
def get_similar_users(self, user_id, top_k=10):
"""
基于向量相似度寻找相似用户(用于UserCF)
使用余弦相似度衡量用户兴趣接近程度
"""
query = """
WITH target_vector AS (
SELECT embedding
FROM user_embeddings
WHERE user_id = %s
)
SELECT
u.user_id,
u.username,
1 - (ue.embedding <=> (SELECT embedding FROM target_vector)) as similarity
FROM user_embeddings ue
JOIN users u ON ue.user_id = u.user_id
WHERE ue.user_id != %s
ORDER BY ue.embedding <=> (SELECT embedding FROM target_vector)
LIMIT %s;
"""
# <=> 操作符计算L2距离,1-distance即为余弦相似度
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, (user_id, user_id, top_k))
return cur.fetchall()
def recommend_items_by_vector(self, user_id, top_k=20, category_filter=None):
"""
基于用户向量推荐相似商品(双塔模型召回)
支持按品类过滤,实现多目标优化
"""
base_query = """
WITH user_vec AS (
SELECT embedding FROM user_embeddings WHERE user_id = %s
)
SELECT
i.item_id,
i.category_id,
1 - (ie.embedding <=> (SELECT embedding FROM user_vec)) as similarity_score,
ie.popularity_score,
(1 - (ie.embedding <=> (SELECT embedding FROM user_vec))) * 0.7
+ ie.popularity_score * 0.3 as final_score
FROM item_embeddings ie
JOIN items i ON ie.item_id = i.item_id
WHERE ie.embedding IS NOT NULL
"""
params = [user_id]
if category_filter:
base_query += " AND i.category_id = ANY(%s)"
params.append(category_filter)
base_query += """
ORDER BY final_score DESC
LIMIT %s;
"""
params.append(top_k)
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(base_query, params)
return cur.fetchall()
def batch_update_embeddings(self, user_embeddings_dict):
"""
批量更新用户向量,使用UPSERT避免重复插入
适用于模型每日全量更新场景
"""
upsert_sql = """
INSERT INTO user_embeddings (user_id, embedding, model_version, updated_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (user_id)
DO UPDATE SET
embedding = EXCLUDED.embedding,
model_version = EXCLUDED.model_version,
updated_at = NOW();
"""
data = [
(uid, emb.tolist(), 'v2.1')
for uid, emb in user_embeddings_dict.items()
]
with self.conn.cursor() as cur:
psycopg2.extras.execute_batch(cur, upsert_sql, data, page_size=1000)
self.conn.commit()步骤4:性能优化策略
优化维度 | 具体措施 | 预期效果 |
|---|---|---|
索引策略 | IVFFlat索引,lists=100(100万用户) | 查询速度从500ms降至20ms |
量化压缩 | 使用vector(512)半精度存储 | 存储减少50%,精度损失<1% |
分区表 | 按model_version分区 | 新旧模型切换零停机 |
预热机制 | pg_prewarm扩展加载热点向量 | 冷启动查询延迟降低80% |

在直播电商场景中,推荐系统面临极高的实时性要求。当用户进入直播间,系统需要在100毫秒内根据用户的实时行为(点击商品、发送弹幕、停留时长、点赞互动)调整推荐策略。例如,用户刚点击了一款轻奢手袋,接下来应该推荐同品牌的配饰还是相似价位的竞品?用户连续三次快速滑动(负反馈),是否需要立即切换推荐策略?
传统的T+1离线计算模式无法满足这种需求。我们需要构建流批一体的特征工程管道:实时流处理(处理当前会话行为)与离线批处理(长期兴趣建模)相结合。PostgreSQL的TimescaleDB扩展(时序数据库)与pg_partman(分区管理)为此提供了理想解决方案。
在我们的直播平台案例中,每秒产生超过10万条行为事件(点击、收藏、分享、购买)。通过PostgreSQL的流式摄取能力,结合物化视图(Materialized View)进行预聚合,我们能够实时计算用户的短期意图特征(最近5分钟浏览品类分布、实时价格敏感度)和长期统计特征(30天购买频次、品类偏好稳定性)。
步骤1:时序表设计与分区策略
-- 安装TimescaleDB扩展(需提前在postgresql.conf中配置shared_preload_libraries)
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- 创建原始行为日志表(超表)
CREATE TABLE user_behavior_events (
event_id BIGSERIAL,
user_id BIGINT NOT NULL,
session_id UUID,
event_type VARCHAR(20) CHECK (event_type IN ('click', 'view', 'cart', 'purchase', 'like', 'share')),
item_id BIGINT,
category_id INT,
channel VARCHAR(20), -- 来源:live_stream, search, recommendation
event_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
event_data JSONB, -- 存储额外属性:停留时长、价格、主播ID等
device_info JSONB
);
-- 转换为超表,按时间自动分区,7天为一个chunk
SELECT create_hypertable('user_behavior_events', 'event_time',
chunk_time_interval => INTERVAL '7 days',
if_not_exists => TRUE);
-- 创建复合索引优化查询
CREATE INDEX idx_behavior_user_time ON user_behavior_events (user_id, event_time DESC);
CREATE INDEX idx_behavior_event_type ON user_behavior_events (event_type, event_time DESC);步骤2:实时特征物化视图
-- 创建实时特征聚合物化视图(5分钟窗口)
CREATE MATERIALIZED VIEW user_realtime_features AS
WITH time_windows AS (
SELECT
user_id,
-- 最近5分钟行为统计
COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '5 minutes') as actions_5min,
COUNT(DISTINCT item_id) FILTER (WHERE event_time > NOW() - INTERVAL '5 minutes') as unique_items_5min,
-- 价格敏感度(最近浏览商品的平均价格分位)
AVG((event_data->>'price')::FLOAT) FILTER (WHERE event_type = 'view' AND event_time > NOW() - INTERVAL '1 hour') as avg_view_price_1h,
-- 实时意图强度(点击/浏览比)
CASE
WHEN COUNT(*) FILTER (WHERE event_type = 'view' AND event_time > NOW() - INTERVAL '15 minutes') > 0
THEN COUNT(*) FILTER (WHERE event_type = 'click' AND event_time > NOW() - INTERVAL '15 minutes')::FLOAT /
COUNT(*) FILTER (WHERE event_type = 'view' AND event_time > NOW() - INTERVAL '15 minutes')
ELSE 0
END as ctr_15min,
-- 品类集中度(熵值计算)
MODE() WITHIN GROUP (ORDER BY category_id) as top_category_1h,
NOW() as computed_at
FROM user_behavior_events
WHERE event_time > NOW() - INTERVAL '1 hour'
GROUP BY user_id
)
SELECT * FROM time_windows
WITH DATA;
-- 创建唯一索引支持并发刷新
CREATE UNIQUE INDEX idx_rt_features_user ON user_realtime_features (user_id);
-- 设置自动刷新(使用pg_cron或外部调度,每30秒刷新)
-- 注意:高频刷新对IO压力大,生产环境建议基于触发器增量更新步骤3:流式摄取与特征服务
import asyncio
import asyncpg
import json
from datetime import datetime
from typing import Dict, List
class RealtimeFeaturePipeline:
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
async def initialize(self):
"""初始化连接池"""
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=10,
max_size=50,
command_timeout=60
)
async def ingest_event_batch(self, events: List[Dict]):
"""
批量摄取行为事件,使用COPY协议优化写入性能
单批次建议1000-5000条,平衡延迟与吞吐量
"""
if not events:
return
# 转换为元组列表
records = [
(
e['user_id'],
e.get('session_id'),
e['event_type'],
e.get('item_id'),
e.get('category_id'),
e.get('channel', 'unknown'),
e.get('event_time', datetime.now()),
json.dumps(e.get('event_data', {})),
json.dumps(e.get('device_info', {}))
)
for e in events
]
async with self.pool.acquire() as conn:
# 使用COPY实现批量高速写入,比INSERT快10-20倍
await conn.copy_records_to_table(
'user_behavior_events',
records=records,
columns=['user_id', 'session_id', 'event_type', 'item_id',
'category_id', 'channel', 'event_time', 'event_data', 'device_info']
)
async def get_user_realtime_profile(self, user_id: int) -> Dict:
"""
获取用户实时画像,用于在线推荐API
查询物化视图,响应时间<10ms
"""
query = """
SELECT
user_id,
actions_5min,
unique_items_5min,
avg_view_price_1h,
ctr_15min,
top_category_1h,
computed_at
FROM user_realtime_features
WHERE user_id = $1;
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(query, user_id)
if row:
return dict(row)
return {
'user_id': user_id,
'actions_5min': 0,
'ctr_15min': 0.0,
'is_cold_start': True
}
async def compute_session_sequence_features(self, session_id: str) -> Dict:
"""
计算会话级序列特征(用于RNN/Transformer模型)
提取最近20个行为的序列模式
"""
query = """
WITH session_events AS (
SELECT
event_type,
item_id,
category_id,
event_time,
(event_data->>'dwell_time')::INT as dwell_time,
ROW_NUMBER() OVER (ORDER BY event_time DESC) as rn
FROM user_behavior_events
WHERE session_id = $1
ORDER BY event_time DESC
LIMIT 20
)
SELECT
ARRAY_AGG(event_type ORDER BY rn DESC) as event_sequence,
ARRAY_AGG(item_id ORDER BY rn DESC) as item_sequence,
ARRAY_AGG(category_id ORDER BY rn DESC) as category_sequence,
AVG(dwell_time) as avg_dwell_time,
MAX(event_time) - MIN(event_time) as session_duration
FROM session_events;
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(query, session_id)
return dict(row) if row else {}
async def continuous_aggregate_maintenance(self):
"""
维护连续聚合(Continuous Aggregate)视图
TimescaleDB特性,比物化视图更高效
"""
# 创建连续聚合(仅需执行一次)
create_ca = """
CREATE MATERIALIZED VIEW user_hourly_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', event_time) as bucket,
user_id,
COUNT(*) as event_count,
COUNT(DISTINCT item_id) as unique_items,
COUNT(*) FILTER (WHERE event_type = 'purchase') as purchase_count
FROM user_behavior_events
GROUP BY bucket, user_id;
"""
# 手动调用刷新(通常由策略自动执行)
refresh_sql = "CALL refresh_continuous_aggregate('user_hourly_stats', NULL, NULL);"
async with self.pool.acquire() as conn:
await conn.execute(refresh_sql)步骤4:特征存储优化配置
配置项 | 推荐值 | 说明 |
|---|---|---|
| 8 | 后台压缩/刷新并行度 |
| 25% of RAM | 时序数据缓存 |
| 7 days | 平衡查询与维护开销 |
| 90 | 为原地更新预留空间 |
| 10s | 及时清理过期元组 |

考虑一个拥有1000万书籍和500万活跃用户的在线图书社区,用户行为包括评分(1-5星)、收藏、写书评、加入书单等。我们需要实现"猜你喜欢"功能,基于用户历史行为找到相似用户(User-based CF)或相似物品(Item-based CF),并预测用户对未读书籍的评分。
矩阵分解(Matrix Factorization)是解决此类问题的经典算法,将高维的用户-物品评分矩阵分解为低维的用户因子矩阵和物品因子矩阵。PostgreSQL通过MADlib(机器学习库)扩展提供了原生的矩阵分解实现,支持ALS(交替最小二乘)、SVD(奇异值分解)、NMF(非负矩阵分解)等算法。
在我们的图书社区案例中,用户评分数据稀疏度高达99.2%(大多数用户只评过分极少数书籍)。我们采用ALS算法进行隐式反馈(Implicit Feedback)建模,不仅考虑显式评分,还纳入浏览时长、收藏行为等隐式信号。通过在PostgreSQL内部完成矩阵分解,避免了数据导出到Spark/Mahout的ETL开销,实现了模型训练与在线服务的无缝衔接。
步骤1:MADlib安装与评分矩阵构建
-- 安装MADlib(需根据PG版本下载对应二进制包)
CREATE EXTENSION IF NOT EXISTS madlib;
-- 创建用户-书籍评分表(显式+隐式反馈)
CREATE TABLE user_book_ratings (
user_id BIGINT,
book_id BIGINT,
explicit_rating FLOAT CHECK (explicit_rating BETWEEN 1 AND 5),
implicit_score FLOAT DEFAULT 0, -- 基于行为的隐式分数
combined_score FLOAT GENERATED ALWAYS AS (
COALESCE(explicit_rating * 2, 0) +
implicit_score * 0.5
) STORED,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, book_id)
);
-- 创建稀疏矩阵视图(MADlib要求特定格式)
CREATE VIEW rating_matrix AS
SELECT
user_id,
book_id,
combined_score as rating
FROM user_book_ratings
WHERE combined_score > 0;步骤2:ALS矩阵分解模型训练
-- 使用MADlib的ALS算法训练矩阵分解模型
-- 分解为100维的隐因子
SELECT madlib.als_train(
'rating_matrix', -- 输入表
'book_als_model', -- 输出模型表前缀
'user_id', -- 用户列
'book_id', -- 物品列
'rating', -- 评分列
100, -- 因子数(隐向量维度)
0.01, -- 正则化参数(防止过拟合)
0.01, -- 收敛阈值
20 -- 最大迭代次数
);
-- 生成的表包括:
-- book_als_model_u: 用户因子矩阵 (user_id, factors[])
-- book_als_model_v: 书籍因子矩阵 (book_id, factors[])
-- book_als_model_summary: 训练摘要(RMSE等)步骤3:相似度计算与推荐生成
import numpy as np
import psycopg2
from scipy.spatial.distance import cosine
from typing import List, Tuple
class CollaborativeFilteringEngine:
def __init__(self, db_config):
self.conn = psycopg2.connect(**db_config)
self.user_factors = {}
self.item_factors = {}
self._load_factors()
def _load_factors(self):
"""加载因子矩阵到内存(适合中小规模数据)"""
with self.conn.cursor() as cur:
# 加载用户因子
cur.execute("SELECT user_id, factors FROM book_als_model_u")
for row in cur.fetchall():
self.user_factors[row[0]] = np.array(row[1])
# 加载书籍因子
cur.execute("SELECT book_id, factors FROM book_als_model_v")
for row in cur.fetchall():
self.item_factors[row[0]] = np.array(row[1])
def predict_rating(self, user_id: int, book_id: int) -> float:
"""
预测用户对书籍的评分
计算用户向量与书籍向量的内积
"""
if user_id not in self.user_factors or book_id not in self.item_factors:
return 3.0 # 冷启动默认评分
user_vec = self.user_factors[user_id]
item_vec = self.item_factors[book_id]
# 内积预测评分(已包含偏置项在MADlib输出中)
prediction = np.dot(user_vec, item_vec)
return np.clip(prediction, 1.0, 5.0)
def find_similar_books(self, book_id: int, top_k: int = 10) -> List[Tuple[int, float]]:
"""
Item-based CF:找到相似书籍(基于隐向量余弦相似度)
"""
if book_id not in self.item_factors:
return []
target_vec = self.item_factors[book_id]
similarities = []
for other_id, other_vec in self.item_factors.items():
if other_id != book_id:
sim = 1 - cosine(target_vec, other_vec)
similarities.append((other_id, sim))
return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k]
def recommend_for_user(self, user_id: int, top_k: int = 20,
exclude_read: bool = True) -> List[Tuple[int, float]]:
"""
为用户推荐书籍(基于预测评分排序)
排除已读过的书籍
"""
if user_id not in self.user_factors:
return self._cold_start_recommend(top_k)
# 获取用户已读列表
read_books = set()
if exclude_read:
with self.conn.cursor() as cur:
cur.execute(
"SELECT book_id FROM user_book_ratings WHERE user_id = %s",
(user_id,)
)
read_books = {row[0] for row in cur.fetchall()}
# 计算对所有未读书籍的预测评分
predictions = []
user_vec = self.user_factors[user_id]
for book_id, book_vec in self.item_factors.items():
if book_id not in read_books:
score = np.dot(user_vec, book_vec)
predictions.append((book_id, score))
return sorted(predictions, key=lambda x: x[1], reverse=True)[:top_k]
def _cold_start_recommend(self, top_k: int) -> List[Tuple[int, float]]:
"""冷启动策略:推荐热门高分书籍"""
with self.conn.cursor() as cur:
cur.execute("""
SELECT book_id, AVG(combined_score) as avg_score, COUNT(*) as cnt
FROM user_book_ratings
GROUP BY book_id
HAVING COUNT(*) > 100
ORDER BY avg_score DESC, cnt DESC
LIMIT %s
""", (top_k,))
return [(row[0], row[1]) for row in cur.fetchall()]
def update_model_incremental(self, new_ratings_table: str):
"""
增量更新模型(使用MADlib的增量ALS)
适用于新用户/新物品加入,避免全量重训练
"""
# MADlib支持增量更新,只需提供新数据
update_sql = f"""
SELECT madlib.als_train(
'{new_ratings_table}',
'book_als_model',
'user_id',
'book_id',
'rating',
100, 0.01, 0.01, 5,
TRUE -- 增量模式
);
"""
with self.conn.cursor() as cur:
cur.execute(update_sql)
self.conn.commit()
# 重新加载因子
self._load_factors()步骤4:混合推荐策略实现
-- 创建推荐结果表(融合协同过滤与内容特征)
CREATE TABLE hybrid_recommendations (
user_id BIGINT,
book_id BIGINT,
cf_score FLOAT, -- 协同过滤分数
content_score FLOAT, -- 基于内容的分数(标签匹配度)
popularity_score FLOAT, -- 热度分(解决冷启动)
final_score FLOAT GENERATED ALWAYS AS (
cf_score * 0.6 + content_score * 0.3 + popularity_score * 0.1
) STORED,
reason VARCHAR(50), -- 推荐理由:相似用户喜欢、同类热门等
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, book_id)
);
-- 创建函数实现实时混合排序
CREATE OR REPLACE FUNCTION get_hybrid_recommendations(
p_user_id BIGINT,
p_limit INT DEFAULT 20
)
RETURNS TABLE (
book_id BIGINT,
title VARCHAR,
final_score FLOAT,
reason VARCHAR
) AS $$
BEGIN
RETURN QUERY
WITH cf_recs AS (
-- 从ALS模型获取候选(这里简化,实际需调用Python或PL/Python)
SELECT
r.book_id,
r.combined_score as cf_score
FROM user_book_ratings r
WHERE r.user_id IN (
-- 找到相似用户(Jaccard相似度)
SELECT other.user_id
FROM user_book_ratings other
JOIN user_book_ratings me ON me.book_id = other.book_id
WHERE me.user_id = p_user_id AND other.user_id != p_user_id
GROUP BY other.user_id
HAVING COUNT(*) > 3
ORDER BY COUNT(*) DESC
LIMIT 20
)
AND r.book_id NOT IN (
SELECT book_id FROM user_book_ratings WHERE user_id = p_user_id
)
GROUP BY r.book_id
),
content_recs AS (
-- 基于内容:同作者、同标签
SELECT
b.book_id,
0.5 as content_score
FROM books b
JOIN user_book_ratings ur ON ur.book_id = b.book_id
WHERE ur.user_id = p_user_id
AND b.author_id IN (
SELECT author_id FROM books
WHERE book_id IN (
SELECT book_id FROM user_book_ratings
WHERE user_id = p_user_id
)
)
)
SELECT
b.book_id,
b.title,
COALESCE(c.cf_score, 0) * 0.6 + COALESCE(ct.content_score, 0) * 0.3 + b.popularity * 0.1,
CASE
WHEN c.cf_score > 4 THEN '相似用户强烈推荐'
WHEN ct.content_score > 0 THEN '你喜欢的作者新作'
ELSE '社区热门书籍'
END
FROM books b
LEFT JOIN cf_recs c ON c.book_id = b.book_id
LEFT JOIN content_recs ct ON ct.book_id = b.book_id
WHERE c.cf_score IS NOT NULL OR ct.content_score IS NOT NULL
ORDER BY final_score DESC
LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;
LinkedIn的"你可能认识的人"功能是图推荐算法的经典案例。在职场社交场景中,推荐逻辑不仅基于相似度,更依赖关系链:共同好友、同事关系、校友网络、技能重叠等。这类问题本质上是在异构图(Heterogeneous Graph)上进行路径推理。
PostgreSQL通过递归CTE(Common Table Expressions)和Apache AGE扩展(图数据库扩展,支持OpenCypher查询语言)提供了强大的图数据处理能力。在我们的职场社交案例中,构建了一个包含用户、公司、学校、技能、职位五类节点的异构图,边类型包括"关注"、"同事"、"校友"、"拥有技能"、"任职"等。
通过图算法,我们可以发现"二度人脉"(朋友的朋友)、"强三角关系"(三人互相关注)、"关键意见领袖"(高PageRank节点)。更重要的是,基于路径的推荐可解释性强,例如"你们有3位共同好友,曾在同一家公司工作,都擅长Python",这种推荐理由显著提升用户信任度和点击率。
步骤1:图数据模型设计(使用AGE扩展)
-- 安装Apache AGE扩展(PostgreSQL的图数据库扩展)
CREATE EXTENSION IF NOT EXISTS age;
LOAD 'age';
SET search_path = ag_catalog, "$user", public;
-- 创建图(如果不存在)
SELECT create_graph('professional_network');
-- 创建顶点(用户)
SELECT * FROM cypher('professional_network', $$
CREATE (u:User {
id: 1,
name: 'Alice',
current_company: 'TechCorp',
job_title: 'Senior Engineer'
})
RETURN u
$$) as (u agtype);
-- 批量插入顶点(使用SQL生成Cypher)
INSERT INTO ag_catalog.ag_vertex
SELECT * FROM cypher('professional_network', $$
UNWIND $users as user
CREATE (u:User)
SET u = user
RETURN id(u), 'User', user
$$, '{\"users\": [{\"id\": 2, \"name\": \"Bob\", \"company\": \"TechCorp\"}, ...]}')
as (id bigint, label text, properties agtype);
-- 创建边(关注关系)
SELECT * FROM cypher('professional_network', $$
MATCH (a:User {id: 1}), (b:User {id: 2})
CREATE (a)-[:FOLLOWS {since: '2023-01-15', weight: 1.0}]->(b)
$$) as (result agtype);
-- 创建其他类型边
-- [:WORKS_AT {start_date, end_date}] - 任职关系
-- [:STUDIED_AT {year}] - 校友关系
-- [:HAS_SKILL {proficiency}] - 技能关系步骤2:递归CTE实现图遍历(纯SQL方案)
-- 不使用AGE扩展,仅用原生递归CTE实现图算法
-- 创建邻接表存储图结构
CREATE TABLE graph_edges (
source_id BIGINT NOT NULL,
target_id BIGINT NOT NULL,
edge_type VARCHAR(20) NOT NULL, -- FOLLOWS, COLLEAGUE, ALUMNI
weight FLOAT DEFAULT 1.0,
properties JSONB,
PRIMARY KEY (source_id, target_id, edge_type)
);
CREATE INDEX idx_graph_source ON graph_edges(source_id, edge_type);
CREATE INDEX idx_graph_target ON graph_edges(target_id, edge_type);
-- 查找二度人脉(朋友的朋友,排除直接好友)
WITH RECURSIVE connection_paths AS (
-- 锚点:用户直接关注的人(一度)
SELECT
source_id as root,
target_id as connected_id,
1 as degree,
ARRAY[source_id, target_id] as path,
weight as total_weight,
ARRAY[edge_type] as relationship_chain
FROM graph_edges
WHERE source_id = 12345 -- 目标用户ID
AND edge_type = 'FOLLOWS'
UNION ALL
-- 递归:继续查找下一度连接
SELECT
cp.root,
ge.target_id,
cp.degree + 1,
cp.path || ge.target_id,
cp.total_weight * ge.weight,
cp.relationship_chain || ge.edge_type
FROM connection_paths cp
JOIN graph_edges ge ON cp.connected_id = ge.source_id
WHERE cp.degree < 3 -- 限制深度为2度
AND ge.target_id <> ALL(cp.path) -- 避免环
AND ge.edge_type = 'FOLLOWS'
),
-- 计算连接强度分数
scored_connections AS (
SELECT
connected_id,
degree,
path,
total_weight,
relationship_chain,
-- 路径衰减:二度连接权重为一度连接的0.3倍
CASE degree
WHEN 1 THEN total_weight * 1.0
WHEN 2 THEN total_weight * 0.3
ELSE total_weight * 0.1
END as connection_score,
-- 共同好友数量(路径中中间节点数)
cardinality(path) - 2 as common_friends_count
FROM connection_paths
WHERE degree <= 2 -- 只保留二度以内
)
SELECT
sc.connected_id,
u.name,
u.job_title,
u.current_company,
sc.degree,
sc.connection_score,
sc.common_friends_count,
sc.path as connection_path,
-- 生成推荐理由
CASE
WHEN sc.degree = 1 THEN '你已关注'
WHEN sc.degree = 2 THEN
sc.common_friends_count || '位共同好友'
ELSE '人脉网络关联'
END as reason
FROM scored_connections sc
JOIN users u ON sc.connected_id = u.user_id
WHERE sc.connected_id NOT IN (
-- 排除已关注的人
SELECT target_id FROM graph_edges
WHERE source_id = 12345 AND edge_type = 'FOLLOWS'
)
ORDER BY sc.connection_score DESC, sc.common_friends_count DESC
LIMIT 20;步骤3:PageRank算法实现(迭代计算)
-- 使用递归CTE实现PageRank算法(简化版)
-- 适用于中小规模图(<100万节点)
WITH RECURSIVE nodes AS (
SELECT DISTINCT source_id as node_id FROM graph_edges
UNION
SELECT DISTINCT target_id as node_id FROM graph_edges
),
-- 初始化PageRank值(1/N)
initial_pr AS (
SELECT node_id, 1.0 / COUNT(*) OVER () as pr_value
FROM nodes
),
-- 迭代计算(这里演示5轮迭代)
iteration AS (
SELECT
n.node_id,
0.15 / (SELECT COUNT(*) FROM nodes) + -- 阻尼因子d=0.15
0.85 * COALESCE(SUM(
in_edge.weight * prev_pr.pr_value / out_counts.out_degree
), 0) as new_pr,
1 as iter_num
FROM nodes n
LEFT JOIN graph_edges in_edge ON n.node_id = in_edge.target_id
LEFT JOIN initial_pr prev_pr ON in_edge.source_id = prev_pr.node_id
LEFT JOIN (
SELECT source_id, COUNT(*) as out_degree
FROM graph_edges
GROUP BY source_id
) out_counts ON in_edge.source_id = out_counts.source_id
GROUP BY n.node_id
UNION ALL
SELECT
n.node_id,
0.15 / (SELECT COUNT(*) FROM nodes) +
0.85 * COALESCE(SUM(
in_edge.weight * prev_pr.new_pr / out_counts.out_degree
), 0),
prev_pr.iter_num + 1
FROM nodes n
LEFT JOIN graph_edges in_edge ON n.node_id = in_edge.target_id
JOIN iteration prev_pr ON in_edge.source_id = prev_pr.node_id
LEFT JOIN (
SELECT source_id, COUNT(*) as out_degree
FROM graph_edges
GROUP BY source_id
) out_counts ON in_edge.source_id = out_counts.source_id
WHERE prev_pr.iter_num < 5
GROUP BY n.node_id
),
-- 取最终迭代结果
final_pr AS (
SELECT node_id, new_pr as pagerank,
ROW_NUMBER() OVER (ORDER BY new_pr DESC) as rank
FROM iteration
WHERE iter_num = 5
)
SELECT
fp.node_id,
u.name,
u.job_title,
fp.pagerank,
fp.rank
FROM final_pr fp
JOIN users u ON fp.node_id = u.user_id
ORDER BY fp.pagerank DESC
LIMIT 100;步骤4:Python图推荐服务
from typing import List, Dict, Set, Tuple
import psycopg2
from collections import defaultdict, deque
class GraphRecommendationEngine:
def __init__(self, db_config: Dict):
self.conn = psycopg2.connect(**db_config)
self.graph_cache = {} # 本地缓存热点子图
def get_connection_recommendations(self, user_id: int,
max_depth: int = 2,
min_common: int = 2) -> List[Dict]:
"""
基于共同连接数的推荐(Adamic-Adar指数)
"""
query = """
WITH mutual_connections AS (
SELECT
e2.target_id as candidate_id,
COUNT(*) as common_neighbors,
ARRAY_AGG(DISTINCT e1.target_id) as shared_connections,
SUM(1.0 / LOG(neighbor_degrees.degree + 1)) as adamic_adar_score
FROM graph_edges e1
JOIN graph_edges e2 ON e1.target_id = e2.source_id
JOIN (
SELECT source_id, COUNT(*) as degree
FROM graph_edges
GROUP BY source_id
) neighbor_degrees ON e1.target_id = neighbor_degrees.source_id
WHERE e1.source_id = %s
AND e1.edge_type = 'FOLLOWS'
AND e2.edge_type = 'FOLLOWS'
AND e2.target_id != %s
AND e2.target_id NOT IN (
SELECT target_id FROM graph_edges
WHERE source_id = %s AND edge_type = 'FOLLOWS'
)
GROUP BY e2.target_id
HAVING COUNT(*) >= %s
)
SELECT
mc.candidate_id,
u.name,
u.job_title,
u.current_company,
mc.common_neighbors,
mc.adamic_adar_score,
mc.shared_connections
FROM mutual_connections mc
JOIN users u ON mc.candidate_id = u.user_id
ORDER BY mc.adamic_adar_score DESC
LIMIT 20;
"""
with self.conn.cursor() as cur:
cur.execute(query, (user_id, user_id, user_id, min_common))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
def find_career_path_recommendations(self, user_id: int,
target_company: str) -> List[Dict]:
"""
找到通往目标公司的最短路径(基于BFS)
用于"内推"场景推荐
"""
# 使用PostgreSQL递归CTE进行最短路径搜索
query = """
WITH RECURSIVE path_search AS (
-- 起始点:用户的直接连接
SELECT
target_id as current_node,
ARRAY[source_id, target_id] as path,
1 as depth,
edge_type as last_relation
FROM graph_edges
WHERE source_id = %s
AND edge_type IN ('FOLLOWS', 'COLLEAGUE')
UNION ALL
-- 递归扩展
SELECT
ge.target_id,
ps.path || ge.target_id,
ps.depth + 1,
ge.edge_type
FROM path_search ps
JOIN graph_edges ge ON ps.current_node = ge.source_id
JOIN users u ON ge.target_id = u.user_id
WHERE ps.depth < 4 -- 最大深度3跳
AND ge.target_id <> ALL(ps.path) -- 避免环
AND u.current_company = %s -- 目标公司
)
SELECT
path,
depth,
(SELECT name FROM users WHERE user_id = path[array_length(path,1)]) as contact_name,
(SELECT job_title FROM users WHERE user_id = path[array_length(path,1)]) as contact_title
FROM path_search
ORDER BY depth ASC
LIMIT 5;
"""
with self.conn.cursor() as cur:
cur.execute(query, (user_id, target_company))
return cur.fetchall()
def detect_communities(self, user_id: int) -> Dict:
"""
使用标签传播算法(Label Propagation)检测用户所在社区
用于发现潜在兴趣群体
"""
# 获取用户 ego network(一度+二度连接)
ego_query = """
WITH ego_network AS (
SELECT DISTINCT target_id as node
FROM graph_edges
WHERE source_id = %s
UNION
SELECT DISTINCT e2.target_id
FROM graph_edges e1
JOIN graph_edges e2 ON e1.target_id = e2.source_id
WHERE e1.source_id = %s
)
SELECT source_id, target_id, weight
FROM graph_edges
WHERE source_id IN (SELECT node FROM ego_network)
AND target_id IN (SELECT node FROM ego_network);
"""
with self.conn.cursor() as cur:
cur.execute(ego_query, (user_id, user_id))
edges = cur.fetchall()
# 在Python中实现标签传播(适合小规模子图)
nodes = set()
graph = defaultdict(list)
for src, tgt, weight in edges:
nodes.add(src)
nodes.add(tgt)
graph[src].append((tgt, weight))
graph[tgt].append((src, weight))
# 初始化标签
labels = {node: node for node in nodes}
# 迭代传播
for _ in range(10): # 10轮迭代
updated = False
for node in nodes:
if not graph[node]:
continue
# 统计邻居标签频率(加权)
label_counts = defaultdict(float)
for neighbor, weight in graph[node]:
label_counts[labels[neighbor]] += weight
if label_counts:
new_label = max(label_counts, key=label_counts.get)
if new_label != labels[node]:
labels[node] = new_label
updated = True
if not updated:
break
# 统计社区
communities = defaultdict(list)
for node, label in labels.items():
communities[label].append(node)
return {
'user_community': labels.get(user_id),
'community_size': len(communities.get(labels.get(user_id), [])),
'communities': dict(communities)
}
def build_weighted_subgraph(self, center_user: int, radius: int = 2) -> Dict:
"""
构建以用户为中心的加权子图(用于可视化)
"""
query = """
WITH RECURSIVE subgraph AS (
SELECT
source_id,
target_id,
edge_type,
weight,
1 as distance
FROM graph_edges
WHERE source_id = %s
UNION ALL
SELECT
ge.source_id,
ge.target_id,
ge.edge_type,
ge.weight,
sg.distance + 1
FROM graph_edges ge
JOIN subgraph sg ON ge.source_id = sg.target_id
WHERE sg.distance < %s
)
SELECT DISTINCT * FROM subgraph;
"""
with self.conn.cursor() as cur:
cur.execute(query, (center_user, radius))
edges = cur.fetchall()
nodes = set()
links = []
for src, tgt, etype, weight, dist in edges:
nodes.add(src)
nodes.add(tgt)
links.append({
'source': src,
'target': tgt,
'type': etype,
'weight': weight,
'distance': dist
})
return {
'nodes': list(nodes),
'links': links,
'center': center_user
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。