首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >[PostgreSQL]PostgreSQL在推荐算法中的5个核心应用场景

[PostgreSQL]PostgreSQL在推荐算法中的5个核心应用场景

原创
作者头像
二一年冬末
发布2026-01-24 16:05:08
发布2026-01-24 16:05:08
1560
举报
文章被收录于专栏:AI学习笔记AI学习笔记

为什么PostgreSQL成为推荐系统的理想选择

从亚马逊的"购买了此商品的顾客也购买了"到Netflix的个性化电影推荐,从抖音的短视频流LinkedIn的职位推荐,推荐算法无处不在。然而,构建一个高效、可扩展的推荐系统面临着诸多技术挑战:海量用户行为数据的实时处理、复杂特征工程的计算效率、多源异构数据的统一管理,以及推荐结果的实时反馈与优化。

传统上,推荐系统的技术栈往往采用多种数据库的混合架构:Redis用于缓存热点数据,MongoDB存储用户画像,Elasticsearch处理全文检索,Spark进行离线计算。这种架构虽然能满足功能需求,但也带来了数据一致性维护困难、运维成本高、技术栈复杂等问题。PostgreSQL作为世界上最先进的开源关系型数据库,近年来通过其强大的扩展生态系统,特别是PostGIS(地理空间扩展)、pgvector(向量扩展)、TimescaleDB(时序扩展)以及Madlib(机器学习库)等,已经发展成为一个能够支撑完整推荐系统链路的多模态数据库平台。


I. 用户画像存储与向量检索

实例分析:电商平台的用户兴趣建模

假设我们正在为一家大型电商平台构建推荐系统,该平台拥有超过5000万注册用户和2000万SKU(库存单位)。传统的用户画像通常以标签形式存储,如"年龄:25-30"、"性别:女"、"偏好品类:美妆护肤"。然而,这种离散标签无法捕捉用户兴趣的连续性和细微差别。现代推荐系统更倾向于使用嵌入向量(Embedding)来表示用户和物品,将高维稀疏特征映射到低维稠密向量空间。

在我们的案例中,用户画像向量维度为512维,通过深度学习模型(如双塔模型)训练得到,能够捕捉用户的长期偏好和实时意图。例如,一个经常浏览高端护肤品、关注成分分析、购买频次稳定的用户,其向量会与平价彩妆用户的向量在空间中形成明显区隔。当新商品上架时,我们只需计算用户向量与商品向量的相似度,即可实现"千人千面"的个性化推荐。

PostgreSQL通过pgvector扩展提供了原生的向量数据类型和相似度搜索功能,支持L2距离、内积、余弦相似度等多种度量方式。相比专用的向量数据库(如Milvus、Pinecone),PostgreSQL的优势在于事务一致性、成熟的权限管理,以及与其他业务数据(订单、库存、用户基础信息)的无缝关联查询能力。

技术实现与代码部署

步骤1:环境准备与扩展安装

代码语言:sql
复制
-- 检查PostgreSQL版本(要求12+)
SELECT version();

-- 安装pgvector扩展
CREATE EXTENSION IF NOT EXISTS vector;

-- 验证安装
SELECT * FROM pg_extension WHERE extname = 'vector';

步骤2:设计用户画像表结构

代码语言:sql
复制
-- 创建用户基础信息表
CREATE TABLE users (
    user_id BIGINT PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    register_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_active TIMESTAMP,
    user_segment VARCHAR(20) -- 用户分层:新客、活跃、沉睡、流失
);

-- 创建用户向量画像表(与基础信息分离,便于独立优化)
CREATE TABLE user_embeddings (
    user_id BIGINT PRIMARY KEY REFERENCES users(user_id) ON DELETE CASCADE,
    embedding VECTOR(512), -- 512维用户兴趣向量
    model_version VARCHAR(20), -- 模型版本管理,便于A/B测试
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    metadata JSONB -- 存储向量生成时的上下文信息
);

-- 创建商品向量表(用于相似度匹配)
CREATE TABLE item_embeddings (
    item_id BIGINT PRIMARY KEY,
    category_id INT,
    embedding VECTOR(512),
    price_range INT, -- 价格档位:1-低价,5-奢侈
    popularity_score FLOAT,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 为向量列创建IVFFlat索引(平衡查询速度与召回率)
CREATE INDEX idx_user_embedding ON user_embeddings 
USING ivfflat (embedding vector_cosine_ops) 
WITH (lists = 100); -- 根据数据量调整,通常设置为sqrt(n)/2

CREATE INDEX idx_item_embedding ON item_embeddings 
USING ivfflat (embedding vector_cosine_ops) 
WITH (lists = 200);

步骤3:向量相似度搜索实现

代码语言:python
复制
import psycopg2
import numpy as np
from psycopg2.extras import RealDictCursor

class VectorRecommendationEngine:
    def __init__(self, db_config):
        self.conn = psycopg2.connect(**db_config)
        
    def get_similar_users(self, user_id, top_k=10):
        """
        基于向量相似度寻找相似用户(用于UserCF)
        使用余弦相似度衡量用户兴趣接近程度
        """
        query = """
        WITH target_vector AS (
            SELECT embedding 
            FROM user_embeddings 
            WHERE user_id = %s
        )
        SELECT 
            u.user_id,
            u.username,
            1 - (ue.embedding <=> (SELECT embedding FROM target_vector)) as similarity
        FROM user_embeddings ue
        JOIN users u ON ue.user_id = u.user_id
        WHERE ue.user_id != %s
        ORDER BY ue.embedding <=> (SELECT embedding FROM target_vector)
        LIMIT %s;
        """
        # <=> 操作符计算L2距离,1-distance即为余弦相似度
        
        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(query, (user_id, user_id, top_k))
            return cur.fetchall()
    
    def recommend_items_by_vector(self, user_id, top_k=20, category_filter=None):
        """
        基于用户向量推荐相似商品(双塔模型召回)
        支持按品类过滤,实现多目标优化
        """
        base_query = """
        WITH user_vec AS (
            SELECT embedding FROM user_embeddings WHERE user_id = %s
        )
        SELECT 
            i.item_id,
            i.category_id,
            1 - (ie.embedding <=> (SELECT embedding FROM user_vec)) as similarity_score,
            ie.popularity_score,
            (1 - (ie.embedding <=> (SELECT embedding FROM user_vec))) * 0.7 
                + ie.popularity_score * 0.3 as final_score
        FROM item_embeddings ie
        JOIN items i ON ie.item_id = i.item_id
        WHERE ie.embedding IS NOT NULL
        """
        
        params = [user_id]
        
        if category_filter:
            base_query += " AND i.category_id = ANY(%s)"
            params.append(category_filter)
            
        base_query += """
        ORDER BY final_score DESC
        LIMIT %s;
        """
        params.append(top_k)
        
        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(base_query, params)
            return cur.fetchall()
    
    def batch_update_embeddings(self, user_embeddings_dict):
        """
        批量更新用户向量,使用UPSERT避免重复插入
        适用于模型每日全量更新场景
        """
        upsert_sql = """
        INSERT INTO user_embeddings (user_id, embedding, model_version, updated_at)
        VALUES (%s, %s, %s, NOW())
        ON CONFLICT (user_id) 
        DO UPDATE SET 
            embedding = EXCLUDED.embedding,
            model_version = EXCLUDED.model_version,
            updated_at = NOW();
        """
        
        data = [
            (uid, emb.tolist(), 'v2.1') 
            for uid, emb in user_embeddings_dict.items()
        ]
        
        with self.conn.cursor() as cur:
            psycopg2.extras.execute_batch(cur, upsert_sql, data, page_size=1000)
            self.conn.commit()

步骤4:性能优化策略

优化维度

具体措施

预期效果

索引策略

IVFFlat索引,lists=100(100万用户)

查询速度从500ms降至20ms

量化压缩

使用vector(512)半精度存储

存储减少50%,精度损失<1%

分区表

按model_version分区

新旧模型切换零停机

预热机制

pg_prewarm扩展加载热点向量

冷启动查询延迟降低80%


II. 实时行为数据摄取与特征工程

实例分析:直播电商的实时推荐挑战

在直播电商场景中,推荐系统面临极高的实时性要求。当用户进入直播间,系统需要在100毫秒内根据用户的实时行为(点击商品、发送弹幕、停留时长、点赞互动)调整推荐策略。例如,用户刚点击了一款轻奢手袋,接下来应该推荐同品牌的配饰还是相似价位的竞品?用户连续三次快速滑动(负反馈),是否需要立即切换推荐策略?

传统的T+1离线计算模式无法满足这种需求。我们需要构建流批一体的特征工程管道:实时流处理(处理当前会话行为)与离线批处理(长期兴趣建模)相结合。PostgreSQL的TimescaleDB扩展(时序数据库)与pg_partman(分区管理)为此提供了理想解决方案。

在我们的直播平台案例中,每秒产生超过10万条行为事件(点击、收藏、分享、购买)。通过PostgreSQL的流式摄取能力,结合物化视图(Materialized View)进行预聚合,我们能够实时计算用户的短期意图特征(最近5分钟浏览品类分布、实时价格敏感度)和长期统计特征(30天购买频次、品类偏好稳定性)。

技术实现与代码部署

步骤1:时序表设计与分区策略

代码语言:sql
复制
-- 安装TimescaleDB扩展(需提前在postgresql.conf中配置shared_preload_libraries)
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- 创建原始行为日志表(超表)
CREATE TABLE user_behavior_events (
    event_id BIGSERIAL,
    user_id BIGINT NOT NULL,
    session_id UUID,
    event_type VARCHAR(20) CHECK (event_type IN ('click', 'view', 'cart', 'purchase', 'like', 'share')),
    item_id BIGINT,
    category_id INT,
    channel VARCHAR(20), -- 来源:live_stream, search, recommendation
    event_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    event_data JSONB, -- 存储额外属性:停留时长、价格、主播ID等
    device_info JSONB
);

-- 转换为超表,按时间自动分区,7天为一个chunk
SELECT create_hypertable('user_behavior_events', 'event_time', 
                         chunk_time_interval => INTERVAL '7 days',
                         if_not_exists => TRUE);

-- 创建复合索引优化查询
CREATE INDEX idx_behavior_user_time ON user_behavior_events (user_id, event_time DESC);
CREATE INDEX idx_behavior_event_type ON user_behavior_events (event_type, event_time DESC);

步骤2:实时特征物化视图

代码语言:sql
复制
-- 创建实时特征聚合物化视图(5分钟窗口)
CREATE MATERIALIZED VIEW user_realtime_features AS
WITH time_windows AS (
    SELECT 
        user_id,
        -- 最近5分钟行为统计
        COUNT(*) FILTER (WHERE event_time > NOW() - INTERVAL '5 minutes') as actions_5min,
        COUNT(DISTINCT item_id) FILTER (WHERE event_time > NOW() - INTERVAL '5 minutes') as unique_items_5min,
        -- 价格敏感度(最近浏览商品的平均价格分位)
        AVG((event_data->>'price')::FLOAT) FILTER (WHERE event_type = 'view' AND event_time > NOW() - INTERVAL '1 hour') as avg_view_price_1h,
        -- 实时意图强度(点击/浏览比)
        CASE 
            WHEN COUNT(*) FILTER (WHERE event_type = 'view' AND event_time > NOW() - INTERVAL '15 minutes') > 0 
            THEN COUNT(*) FILTER (WHERE event_type = 'click' AND event_time > NOW() - INTERVAL '15 minutes')::FLOAT / 
                 COUNT(*) FILTER (WHERE event_type = 'view' AND event_time > NOW() - INTERVAL '15 minutes')
            ELSE 0 
        END as ctr_15min,
        -- 品类集中度(熵值计算)
        MODE() WITHIN GROUP (ORDER BY category_id) as top_category_1h,
        NOW() as computed_at
    FROM user_behavior_events
    WHERE event_time > NOW() - INTERVAL '1 hour'
    GROUP BY user_id
)
SELECT * FROM time_windows
WITH DATA;

-- 创建唯一索引支持并发刷新
CREATE UNIQUE INDEX idx_rt_features_user ON user_realtime_features (user_id);

-- 设置自动刷新(使用pg_cron或外部调度,每30秒刷新)
-- 注意:高频刷新对IO压力大,生产环境建议基于触发器增量更新

步骤3:流式摄取与特征服务

代码语言:python
复制
import asyncio
import asyncpg
import json
from datetime import datetime
from typing import Dict, List

class RealtimeFeaturePipeline:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None
        
    async def initialize(self):
        """初始化连接池"""
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=10,
            max_size=50,
            command_timeout=60
        )
    
    async def ingest_event_batch(self, events: List[Dict]):
        """
        批量摄取行为事件,使用COPY协议优化写入性能
        单批次建议1000-5000条,平衡延迟与吞吐量
        """
        if not events:
            return
            
        # 转换为元组列表
        records = [
            (
                e['user_id'],
                e.get('session_id'),
                e['event_type'],
                e.get('item_id'),
                e.get('category_id'),
                e.get('channel', 'unknown'),
                e.get('event_time', datetime.now()),
                json.dumps(e.get('event_data', {})),
                json.dumps(e.get('device_info', {}))
            )
            for e in events
        ]
        
        async with self.pool.acquire() as conn:
            # 使用COPY实现批量高速写入,比INSERT快10-20倍
            await conn.copy_records_to_table(
                'user_behavior_events',
                records=records,
                columns=['user_id', 'session_id', 'event_type', 'item_id', 
                        'category_id', 'channel', 'event_time', 'event_data', 'device_info']
            )
    
    async def get_user_realtime_profile(self, user_id: int) -> Dict:
        """
        获取用户实时画像,用于在线推荐API
        查询物化视图,响应时间<10ms
        """
        query = """
        SELECT 
            user_id,
            actions_5min,
            unique_items_5min,
            avg_view_price_1h,
            ctr_15min,
            top_category_1h,
            computed_at
        FROM user_realtime_features
        WHERE user_id = $1;
        """
        
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(query, user_id)
            if row:
                return dict(row)
            return {
                'user_id': user_id,
                'actions_5min': 0,
                'ctr_15min': 0.0,
                'is_cold_start': True
            }
    
    async def compute_session_sequence_features(self, session_id: str) -> Dict:
        """
        计算会话级序列特征(用于RNN/Transformer模型)
        提取最近20个行为的序列模式
        """
        query = """
        WITH session_events AS (
            SELECT 
                event_type,
                item_id,
                category_id,
                event_time,
                (event_data->>'dwell_time')::INT as dwell_time,
                ROW_NUMBER() OVER (ORDER BY event_time DESC) as rn
            FROM user_behavior_events
            WHERE session_id = $1
            ORDER BY event_time DESC
            LIMIT 20
        )
        SELECT 
            ARRAY_AGG(event_type ORDER BY rn DESC) as event_sequence,
            ARRAY_AGG(item_id ORDER BY rn DESC) as item_sequence,
            ARRAY_AGG(category_id ORDER BY rn DESC) as category_sequence,
            AVG(dwell_time) as avg_dwell_time,
            MAX(event_time) - MIN(event_time) as session_duration
        FROM session_events;
        """
        
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(query, session_id)
            return dict(row) if row else {}
    
    async def continuous_aggregate_maintenance(self):
        """
        维护连续聚合(Continuous Aggregate)视图
        TimescaleDB特性,比物化视图更高效
        """
        # 创建连续聚合(仅需执行一次)
        create_ca = """
        CREATE MATERIALIZED VIEW user_hourly_stats
        WITH (timescaledb.continuous) AS
        SELECT 
            time_bucket('1 hour', event_time) as bucket,
            user_id,
            COUNT(*) as event_count,
            COUNT(DISTINCT item_id) as unique_items,
            COUNT(*) FILTER (WHERE event_type = 'purchase') as purchase_count
        FROM user_behavior_events
        GROUP BY bucket, user_id;
        """
        
        # 手动调用刷新(通常由策略自动执行)
        refresh_sql = "CALL refresh_continuous_aggregate('user_hourly_stats', NULL, NULL);"
        
        async with self.pool.acquire() as conn:
            await conn.execute(refresh_sql)

步骤4:特征存储优化配置

配置项

推荐值

说明

timescaledb.max_background_workers

8

后台压缩/刷新并行度

shared_buffers

25% of RAM

时序数据缓存

chunk_time_interval

7 days

平衡查询与维护开销

fillfactor

90

为原地更新预留空间

autovacuum_naptime

10s

及时清理过期元组


III. 协同过滤与矩阵分解

实例分析:图书社区的"猜你喜欢"功能

考虑一个拥有1000万书籍和500万活跃用户的在线图书社区,用户行为包括评分(1-5星)、收藏、写书评、加入书单等。我们需要实现"猜你喜欢"功能,基于用户历史行为找到相似用户(User-based CF)或相似物品(Item-based CF),并预测用户对未读书籍的评分。

矩阵分解(Matrix Factorization)是解决此类问题的经典算法,将高维的用户-物品评分矩阵分解为低维的用户因子矩阵和物品因子矩阵。PostgreSQL通过MADlib(机器学习库)扩展提供了原生的矩阵分解实现,支持ALS(交替最小二乘)、SVD(奇异值分解)、NMF(非负矩阵分解)等算法。

在我们的图书社区案例中,用户评分数据稀疏度高达99.2%(大多数用户只评过分极少数书籍)。我们采用ALS算法进行隐式反馈(Implicit Feedback)建模,不仅考虑显式评分,还纳入浏览时长、收藏行为等隐式信号。通过在PostgreSQL内部完成矩阵分解,避免了数据导出到Spark/Mahout的ETL开销,实现了模型训练与在线服务的无缝衔接。

技术实现与代码部署

步骤1:MADlib安装与评分矩阵构建

代码语言:sql
复制
-- 安装MADlib(需根据PG版本下载对应二进制包)
CREATE EXTENSION IF NOT EXISTS madlib;

-- 创建用户-书籍评分表(显式+隐式反馈)
CREATE TABLE user_book_ratings (
    user_id BIGINT,
    book_id BIGINT,
    explicit_rating FLOAT CHECK (explicit_rating BETWEEN 1 AND 5),
    implicit_score FLOAT DEFAULT 0, -- 基于行为的隐式分数
    combined_score FLOAT GENERATED ALWAYS AS (
        COALESCE(explicit_rating * 2, 0) + 
        implicit_score * 0.5
    ) STORED,
    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, book_id)
);

-- 创建稀疏矩阵视图(MADlib要求特定格式)
CREATE VIEW rating_matrix AS
SELECT 
    user_id,
    book_id,
    combined_score as rating
FROM user_book_ratings
WHERE combined_score > 0;

步骤2:ALS矩阵分解模型训练

代码语言:sql
复制
-- 使用MADlib的ALS算法训练矩阵分解模型
-- 分解为100维的隐因子
SELECT madlib.als_train(
    'rating_matrix',           -- 输入表
    'book_als_model',          -- 输出模型表前缀
    'user_id',                 -- 用户列
    'book_id',                 -- 物品列
    'rating',                  -- 评分列
    100,                       -- 因子数(隐向量维度)
    0.01,                      -- 正则化参数(防止过拟合)
    0.01,                      -- 收敛阈值
    20                         -- 最大迭代次数
);

-- 生成的表包括:
-- book_als_model_u: 用户因子矩阵 (user_id, factors[])
-- book_als_model_v: 书籍因子矩阵 (book_id, factors[])
-- book_als_model_summary: 训练摘要(RMSE等)

步骤3:相似度计算与推荐生成

代码语言:python
复制
import numpy as np
import psycopg2
from scipy.spatial.distance import cosine
from typing import List, Tuple

class CollaborativeFilteringEngine:
    def __init__(self, db_config):
        self.conn = psycopg2.connect(**db_config)
        self.user_factors = {}
        self.item_factors = {}
        self._load_factors()
    
    def _load_factors(self):
        """加载因子矩阵到内存(适合中小规模数据)"""
        with self.conn.cursor() as cur:
            # 加载用户因子
            cur.execute("SELECT user_id, factors FROM book_als_model_u")
            for row in cur.fetchall():
                self.user_factors[row[0]] = np.array(row[1])
            
            # 加载书籍因子
            cur.execute("SELECT book_id, factors FROM book_als_model_v")
            for row in cur.fetchall():
                self.item_factors[row[0]] = np.array(row[1])
    
    def predict_rating(self, user_id: int, book_id: int) -> float:
        """
        预测用户对书籍的评分
        计算用户向量与书籍向量的内积
        """
        if user_id not in self.user_factors or book_id not in self.item_factors:
            return 3.0  # 冷启动默认评分
        
        user_vec = self.user_factors[user_id]
        item_vec = self.item_factors[book_id]
        
        # 内积预测评分(已包含偏置项在MADlib输出中)
        prediction = np.dot(user_vec, item_vec)
        return np.clip(prediction, 1.0, 5.0)
    
    def find_similar_books(self, book_id: int, top_k: int = 10) -> List[Tuple[int, float]]:
        """
        Item-based CF:找到相似书籍(基于隐向量余弦相似度)
        """
        if book_id not in self.item_factors:
            return []
        
        target_vec = self.item_factors[book_id]
        similarities = []
        
        for other_id, other_vec in self.item_factors.items():
            if other_id != book_id:
                sim = 1 - cosine(target_vec, other_vec)
                similarities.append((other_id, sim))
        
        return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k]
    
    def recommend_for_user(self, user_id: int, top_k: int = 20, 
                          exclude_read: bool = True) -> List[Tuple[int, float]]:
        """
        为用户推荐书籍(基于预测评分排序)
        排除已读过的书籍
        """
        if user_id not in self.user_factors:
            return self._cold_start_recommend(top_k)
        
        # 获取用户已读列表
        read_books = set()
        if exclude_read:
            with self.conn.cursor() as cur:
                cur.execute(
                    "SELECT book_id FROM user_book_ratings WHERE user_id = %s", 
                    (user_id,)
                )
                read_books = {row[0] for row in cur.fetchall()}
        
        # 计算对所有未读书籍的预测评分
        predictions = []
        user_vec = self.user_factors[user_id]
        
        for book_id, book_vec in self.item_factors.items():
            if book_id not in read_books:
                score = np.dot(user_vec, book_vec)
                predictions.append((book_id, score))
        
        return sorted(predictions, key=lambda x: x[1], reverse=True)[:top_k]
    
    def _cold_start_recommend(self, top_k: int) -> List[Tuple[int, float]]:
        """冷启动策略:推荐热门高分书籍"""
        with self.conn.cursor() as cur:
            cur.execute("""
                SELECT book_id, AVG(combined_score) as avg_score, COUNT(*) as cnt
                FROM user_book_ratings
                GROUP BY book_id
                HAVING COUNT(*) > 100
                ORDER BY avg_score DESC, cnt DESC
                LIMIT %s
            """, (top_k,))
            return [(row[0], row[1]) for row in cur.fetchall()]
    
    def update_model_incremental(self, new_ratings_table: str):
        """
        增量更新模型(使用MADlib的增量ALS)
        适用于新用户/新物品加入,避免全量重训练
        """
        # MADlib支持增量更新,只需提供新数据
        update_sql = f"""
        SELECT madlib.als_train(
            '{new_ratings_table}',
            'book_als_model',
            'user_id',
            'book_id',
            'rating',
            100, 0.01, 0.01, 5,
            TRUE  -- 增量模式
        );
        """
        with self.conn.cursor() as cur:
            cur.execute(update_sql)
            self.conn.commit()
        # 重新加载因子
        self._load_factors()

步骤4:混合推荐策略实现

代码语言:sql
复制
-- 创建推荐结果表(融合协同过滤与内容特征)
CREATE TABLE hybrid_recommendations (
    user_id BIGINT,
    book_id BIGINT,
    cf_score FLOAT, -- 协同过滤分数
    content_score FLOAT, -- 基于内容的分数(标签匹配度)
    popularity_score FLOAT, -- 热度分(解决冷启动)
    final_score FLOAT GENERATED ALWAYS AS (
        cf_score * 0.6 + content_score * 0.3 + popularity_score * 0.1
    ) STORED,
    reason VARCHAR(50), -- 推荐理由:相似用户喜欢、同类热门等
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, book_id)
);

-- 创建函数实现实时混合排序
CREATE OR REPLACE FUNCTION get_hybrid_recommendations(
    p_user_id BIGINT,
    p_limit INT DEFAULT 20
)
RETURNS TABLE (
    book_id BIGINT,
    title VARCHAR,
    final_score FLOAT,
    reason VARCHAR
) AS $$
BEGIN
    RETURN QUERY
    WITH cf_recs AS (
        -- 从ALS模型获取候选(这里简化,实际需调用Python或PL/Python)
        SELECT 
            r.book_id,
            r.combined_score as cf_score
        FROM user_book_ratings r
        WHERE r.user_id IN (
            -- 找到相似用户(Jaccard相似度)
            SELECT other.user_id
            FROM user_book_ratings other
            JOIN user_book_ratings me ON me.book_id = other.book_id
            WHERE me.user_id = p_user_id AND other.user_id != p_user_id
            GROUP BY other.user_id
            HAVING COUNT(*) > 3
            ORDER BY COUNT(*) DESC
            LIMIT 20
        )
        AND r.book_id NOT IN (
            SELECT book_id FROM user_book_ratings WHERE user_id = p_user_id
        )
        GROUP BY r.book_id
    ),
    content_recs AS (
        -- 基于内容:同作者、同标签
        SELECT 
            b.book_id,
            0.5 as content_score
        FROM books b
        JOIN user_book_ratings ur ON ur.book_id = b.book_id
        WHERE ur.user_id = p_user_id
        AND b.author_id IN (
            SELECT author_id FROM books 
            WHERE book_id IN (
                SELECT book_id FROM user_book_ratings 
                WHERE user_id = p_user_id
            )
        )
    )
    SELECT 
        b.book_id,
        b.title,
        COALESCE(c.cf_score, 0) * 0.6 + COALESCE(ct.content_score, 0) * 0.3 + b.popularity * 0.1,
        CASE 
            WHEN c.cf_score > 4 THEN '相似用户强烈推荐'
            WHEN ct.content_score > 0 THEN '你喜欢的作者新作'
            ELSE '社区热门书籍'
        END
    FROM books b
    LEFT JOIN cf_recs c ON c.book_id = b.book_id
    LEFT JOIN content_recs ct ON ct.book_id = b.book_id
    WHERE c.cf_score IS NOT NULL OR ct.content_score IS NOT NULL
    ORDER BY final_score DESC
    LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;

IV. 图关系推理与路径推荐

实例分析:职场社交的"二度人脉"推荐

LinkedIn的"你可能认识的人"功能是图推荐算法的经典案例。在职场社交场景中,推荐逻辑不仅基于相似度,更依赖关系链:共同好友、同事关系、校友网络、技能重叠等。这类问题本质上是在异构图(Heterogeneous Graph)上进行路径推理。

PostgreSQL通过递归CTE(Common Table Expressions)和Apache AGE扩展(图数据库扩展,支持OpenCypher查询语言)提供了强大的图数据处理能力。在我们的职场社交案例中,构建了一个包含用户、公司、学校、技能、职位五类节点的异构图,边类型包括"关注"、"同事"、"校友"、"拥有技能"、"任职"等。

通过图算法,我们可以发现"二度人脉"(朋友的朋友)、"强三角关系"(三人互相关注)、"关键意见领袖"(高PageRank节点)。更重要的是,基于路径的推荐可解释性强,例如"你们有3位共同好友,曾在同一家公司工作,都擅长Python",这种推荐理由显著提升用户信任度和点击率。

技术实现与代码部署

步骤1:图数据模型设计(使用AGE扩展)

代码语言:sql
复制
-- 安装Apache AGE扩展(PostgreSQL的图数据库扩展)
CREATE EXTENSION IF NOT EXISTS age;
LOAD 'age';
SET search_path = ag_catalog, "$user", public;

-- 创建图(如果不存在)
SELECT create_graph('professional_network');

-- 创建顶点(用户)
SELECT * FROM cypher('professional_network', $$
    CREATE (u:User {
        id: 1, 
        name: 'Alice', 
        current_company: 'TechCorp',
        job_title: 'Senior Engineer'
    })
    RETURN u
$$) as (u agtype);

-- 批量插入顶点(使用SQL生成Cypher)
INSERT INTO ag_catalog.ag_vertex 
SELECT * FROM cypher('professional_network', $$
    UNWIND $users as user
    CREATE (u:User)
    SET u = user
    RETURN id(u), 'User', user
$$, '{\"users\": [{\"id\": 2, \"name\": \"Bob\", \"company\": \"TechCorp\"}, ...]}') 
as (id bigint, label text, properties agtype);

-- 创建边(关注关系)
SELECT * FROM cypher('professional_network', $$
    MATCH (a:User {id: 1}), (b:User {id: 2})
    CREATE (a)-[:FOLLOWS {since: '2023-01-15', weight: 1.0}]->(b)
$$) as (result agtype);

-- 创建其他类型边
-- [:WORKS_AT {start_date, end_date}] - 任职关系
-- [:STUDIED_AT {year}] - 校友关系  
-- [:HAS_SKILL {proficiency}] - 技能关系

步骤2:递归CTE实现图遍历(纯SQL方案)

代码语言:sql
复制
-- 不使用AGE扩展,仅用原生递归CTE实现图算法
-- 创建邻接表存储图结构
CREATE TABLE graph_edges (
    source_id BIGINT NOT NULL,
    target_id BIGINT NOT NULL,
    edge_type VARCHAR(20) NOT NULL, -- FOLLOWS, COLLEAGUE, ALUMNI
    weight FLOAT DEFAULT 1.0,
    properties JSONB,
    PRIMARY KEY (source_id, target_id, edge_type)
);

CREATE INDEX idx_graph_source ON graph_edges(source_id, edge_type);
CREATE INDEX idx_graph_target ON graph_edges(target_id, edge_type);

-- 查找二度人脉(朋友的朋友,排除直接好友)
WITH RECURSIVE connection_paths AS (
    -- 锚点:用户直接关注的人(一度)
    SELECT 
        source_id as root,
        target_id as connected_id,
        1 as degree,
        ARRAY[source_id, target_id] as path,
        weight as total_weight,
        ARRAY[edge_type] as relationship_chain
    FROM graph_edges
    WHERE source_id = 12345  -- 目标用户ID
    AND edge_type = 'FOLLOWS'
    
    UNION ALL
    
    -- 递归:继续查找下一度连接
    SELECT 
        cp.root,
        ge.target_id,
        cp.degree + 1,
        cp.path || ge.target_id,
        cp.total_weight * ge.weight,
        cp.relationship_chain || ge.edge_type
    FROM connection_paths cp
    JOIN graph_edges ge ON cp.connected_id = ge.source_id
    WHERE cp.degree < 3  -- 限制深度为2度
    AND ge.target_id <> ALL(cp.path)  -- 避免环
    AND ge.edge_type = 'FOLLOWS'
),
-- 计算连接强度分数
scored_connections AS (
    SELECT 
        connected_id,
        degree,
        path,
        total_weight,
        relationship_chain,
        -- 路径衰减:二度连接权重为一度连接的0.3倍
        CASE degree
            WHEN 1 THEN total_weight * 1.0
            WHEN 2 THEN total_weight * 0.3
            ELSE total_weight * 0.1
        END as connection_score,
        -- 共同好友数量(路径中中间节点数)
        cardinality(path) - 2 as common_friends_count
    FROM connection_paths
    WHERE degree <= 2  -- 只保留二度以内
)
SELECT 
    sc.connected_id,
    u.name,
    u.job_title,
    u.current_company,
    sc.degree,
    sc.connection_score,
    sc.common_friends_count,
    sc.path as connection_path,
    -- 生成推荐理由
    CASE 
        WHEN sc.degree = 1 THEN '你已关注'
        WHEN sc.degree = 2 THEN 
            sc.common_friends_count || '位共同好友'
        ELSE '人脉网络关联'
    END as reason
FROM scored_connections sc
JOIN users u ON sc.connected_id = u.user_id
WHERE sc.connected_id NOT IN (
    -- 排除已关注的人
    SELECT target_id FROM graph_edges 
    WHERE source_id = 12345 AND edge_type = 'FOLLOWS'
)
ORDER BY sc.connection_score DESC, sc.common_friends_count DESC
LIMIT 20;

步骤3:PageRank算法实现(迭代计算)

代码语言:sql
复制
-- 使用递归CTE实现PageRank算法(简化版)
-- 适用于中小规模图(<100万节点)

WITH RECURSIVE nodes AS (
    SELECT DISTINCT source_id as node_id FROM graph_edges
    UNION
    SELECT DISTINCT target_id as node_id FROM graph_edges
),
-- 初始化PageRank值(1/N)
initial_pr AS (
    SELECT node_id, 1.0 / COUNT(*) OVER () as pr_value
    FROM nodes
),
-- 迭代计算(这里演示5轮迭代)
iteration AS (
    SELECT 
        n.node_id,
        0.15 / (SELECT COUNT(*) FROM nodes) +  -- 阻尼因子d=0.15
        0.85 * COALESCE(SUM(
            in_edge.weight * prev_pr.pr_value / out_counts.out_degree
        ), 0) as new_pr,
        1 as iter_num
    FROM nodes n
    LEFT JOIN graph_edges in_edge ON n.node_id = in_edge.target_id
    LEFT JOIN initial_pr prev_pr ON in_edge.source_id = prev_pr.node_id
    LEFT JOIN (
        SELECT source_id, COUNT(*) as out_degree 
        FROM graph_edges 
        GROUP BY source_id
    ) out_counts ON in_edge.source_id = out_counts.source_id
    GROUP BY n.node_id
    
    UNION ALL
    
    SELECT 
        n.node_id,
        0.15 / (SELECT COUNT(*) FROM nodes) +
        0.85 * COALESCE(SUM(
            in_edge.weight * prev_pr.new_pr / out_counts.out_degree
        ), 0),
        prev_pr.iter_num + 1
    FROM nodes n
    LEFT JOIN graph_edges in_edge ON n.node_id = in_edge.target_id
    JOIN iteration prev_pr ON in_edge.source_id = prev_pr.node_id
    LEFT JOIN (
        SELECT source_id, COUNT(*) as out_degree 
        FROM graph_edges 
        GROUP BY source_id
    ) out_counts ON in_edge.source_id = out_counts.source_id
    WHERE prev_pr.iter_num < 5
    GROUP BY n.node_id
),
-- 取最终迭代结果
final_pr AS (
    SELECT node_id, new_pr as pagerank,
           ROW_NUMBER() OVER (ORDER BY new_pr DESC) as rank
    FROM iteration
    WHERE iter_num = 5
)
SELECT 
    fp.node_id,
    u.name,
    u.job_title,
    fp.pagerank,
    fp.rank
FROM final_pr fp
JOIN users u ON fp.node_id = u.user_id
ORDER BY fp.pagerank DESC
LIMIT 100;

步骤4:Python图推荐服务

代码语言:python
复制
from typing import List, Dict, Set, Tuple
import psycopg2
from collections import defaultdict, deque

class GraphRecommendationEngine:
    def __init__(self, db_config: Dict):
        self.conn = psycopg2.connect(**db_config)
        self.graph_cache = {}  # 本地缓存热点子图
        
    def get_connection_recommendations(self, user_id: int, 
                                      max_depth: int = 2,
                                      min_common: int = 2) -> List[Dict]:
        """
        基于共同连接数的推荐(Adamic-Adar指数)
        """
        query = """
        WITH mutual_connections AS (
            SELECT 
                e2.target_id as candidate_id,
                COUNT(*) as common_neighbors,
                ARRAY_AGG(DISTINCT e1.target_id) as shared_connections,
                SUM(1.0 / LOG(neighbor_degrees.degree + 1)) as adamic_adar_score
            FROM graph_edges e1
            JOIN graph_edges e2 ON e1.target_id = e2.source_id
            JOIN (
                SELECT source_id, COUNT(*) as degree 
                FROM graph_edges 
                GROUP BY source_id
            ) neighbor_degrees ON e1.target_id = neighbor_degrees.source_id
            WHERE e1.source_id = %s
            AND e1.edge_type = 'FOLLOWS'
            AND e2.edge_type = 'FOLLOWS'
            AND e2.target_id != %s
            AND e2.target_id NOT IN (
                SELECT target_id FROM graph_edges 
                WHERE source_id = %s AND edge_type = 'FOLLOWS'
            )
            GROUP BY e2.target_id
            HAVING COUNT(*) >= %s
        )
        SELECT 
            mc.candidate_id,
            u.name,
            u.job_title,
            u.current_company,
            mc.common_neighbors,
            mc.adamic_adar_score,
            mc.shared_connections
        FROM mutual_connections mc
        JOIN users u ON mc.candidate_id = u.user_id
        ORDER BY mc.adamic_adar_score DESC
        LIMIT 20;
        """
        
        with self.conn.cursor() as cur:
            cur.execute(query, (user_id, user_id, user_id, min_common))
            columns = [desc[0] for desc in cur.description]
            return [dict(zip(columns, row)) for row in cur.fetchall()]
    
    def find_career_path_recommendations(self, user_id: int, 
                                        target_company: str) -> List[Dict]:
        """
        找到通往目标公司的最短路径(基于BFS)
        用于"内推"场景推荐
        """
        # 使用PostgreSQL递归CTE进行最短路径搜索
        query = """
        WITH RECURSIVE path_search AS (
            -- 起始点:用户的直接连接
            SELECT 
                target_id as current_node,
                ARRAY[source_id, target_id] as path,
                1 as depth,
                edge_type as last_relation
            FROM graph_edges
            WHERE source_id = %s
            AND edge_type IN ('FOLLOWS', 'COLLEAGUE')
            
            UNION ALL
            
            -- 递归扩展
            SELECT 
                ge.target_id,
                ps.path || ge.target_id,
                ps.depth + 1,
                ge.edge_type
            FROM path_search ps
            JOIN graph_edges ge ON ps.current_node = ge.source_id
            JOIN users u ON ge.target_id = u.user_id
            WHERE ps.depth < 4  -- 最大深度3跳
            AND ge.target_id <> ALL(ps.path)  -- 避免环
            AND u.current_company = %s  -- 目标公司
        )
        SELECT 
            path,
            depth,
            (SELECT name FROM users WHERE user_id = path[array_length(path,1)]) as contact_name,
            (SELECT job_title FROM users WHERE user_id = path[array_length(path,1)]) as contact_title
        FROM path_search
        ORDER BY depth ASC
        LIMIT 5;
        """
        
        with self.conn.cursor() as cur:
            cur.execute(query, (user_id, target_company))
            return cur.fetchall()
    
    def detect_communities(self, user_id: int) -> Dict:
        """
        使用标签传播算法(Label Propagation)检测用户所在社区
        用于发现潜在兴趣群体
        """
        # 获取用户 ego network(一度+二度连接)
        ego_query = """
        WITH ego_network AS (
            SELECT DISTINCT target_id as node
            FROM graph_edges
            WHERE source_id = %s
            UNION
            SELECT DISTINCT e2.target_id
            FROM graph_edges e1
            JOIN graph_edges e2 ON e1.target_id = e2.source_id
            WHERE e1.source_id = %s
        )
        SELECT source_id, target_id, weight 
        FROM graph_edges
        WHERE source_id IN (SELECT node FROM ego_network)
        AND target_id IN (SELECT node FROM ego_network);
        """
        
        with self.conn.cursor() as cur:
            cur.execute(ego_query, (user_id, user_id))
            edges = cur.fetchall()
            
        # 在Python中实现标签传播(适合小规模子图)
        nodes = set()
        graph = defaultdict(list)
        for src, tgt, weight in edges:
            nodes.add(src)
            nodes.add(tgt)
            graph[src].append((tgt, weight))
            graph[tgt].append((src, weight))
        
        # 初始化标签
        labels = {node: node for node in nodes}
        
        # 迭代传播
        for _ in range(10):  # 10轮迭代
            updated = False
            for node in nodes:
                if not graph[node]:
                    continue
                # 统计邻居标签频率(加权)
                label_counts = defaultdict(float)
                for neighbor, weight in graph[node]:
                    label_counts[labels[neighbor]] += weight
                
                if label_counts:
                    new_label = max(label_counts, key=label_counts.get)
                    if new_label != labels[node]:
                        labels[node] = new_label
                        updated = True
            
            if not updated:
                break
        
        # 统计社区
        communities = defaultdict(list)
        for node, label in labels.items():
            communities[label].append(node)
            
        return {
            'user_community': labels.get(user_id),
            'community_size': len(communities.get(labels.get(user_id), [])),
            'communities': dict(communities)
        }
    
    def build_weighted_subgraph(self, center_user: int, radius: int = 2) -> Dict:
        """
        构建以用户为中心的加权子图(用于可视化)
        """
        query = """
        WITH RECURSIVE subgraph AS (
            SELECT 
                source_id,
                target_id,
                edge_type,
                weight,
                1 as distance
            FROM graph_edges
            WHERE source_id = %s
            
            UNION ALL
            
            SELECT 
                ge.source_id,
                ge.target_id,
                ge.edge_type,
                ge.weight,
                sg.distance + 1
            FROM graph_edges ge
            JOIN subgraph sg ON ge.source_id = sg.target_id
            WHERE sg.distance < %s
        )
        SELECT DISTINCT * FROM subgraph;
        """
        
        with self.conn.cursor() as cur:
            cur.execute(query, (center_user, radius))
            edges = cur.fetchall()
            
        nodes = set()
        links = []
        for src, tgt, etype, weight, dist in edges:
            nodes.add(src)
            nodes.add(tgt)
            links.append({
                'source': src,
                'target': tgt,
                'type': etype,
                'weight': weight,
                'distance': dist
            })
            
        return {
            'nodes': list(nodes),
            'links': links,
            'center': center_user
        }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么PostgreSQL成为推荐系统的理想选择
  • I. 用户画像存储与向量检索
    • 实例分析:电商平台的用户兴趣建模
    • 技术实现与代码部署
  • II. 实时行为数据摄取与特征工程
    • 实例分析:直播电商的实时推荐挑战
    • 技术实现与代码部署
  • III. 协同过滤与矩阵分解
    • 实例分析:图书社区的"猜你喜欢"功能
    • 技术实现与代码部署
  • IV. 图关系推理与路径推荐
    • 实例分析:职场社交的"二度人脉"推荐
    • 技术实现与代码部署
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档