首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >场景化 AI 回答采集任务,如何做队列调度和结果入库?

场景化 AI 回答采集任务,如何做队列调度和结果入库?

原创
作者头像
用户12582597
发布2026-06-29 11:39:32
发布2026-06-29 11:39:32
630
举报

当业务需求从“爬取网页”变成“批量向多个 AI 平台提问并分析回答”,后端架构面临一系列新问题:上千条任务如何调度?调用失败如何重试?原始回答怎么存?提取出的指标怎么落库?本文围绕消费品牌 AI 回答监测这一典型场景,拆解从队列调度、并发采集、失败重试到分层存储的完整数据链路设计。

一、场景特征:采集任务的三个特殊约束

消费品牌 AI 回答监测与普通数据采集有明显区别:

  1. 任务量波动大:单次测评可能覆盖 7-8 个消费场景、5-6 个 AI 平台,每个组合再乘以 3-5 轮采样,任务数轻松破千。
  2. 响应时间不可控:AI 生成回答从几秒到几十秒不等,联网搜索场景耗时更久。
  3. 结果非确定性:同一问题两次调用可能返回不同答案,单次调用结果不具备统计意义。

这三个约束决定了:架构上必须将“任务生产”与“任务执行”彻底解耦,同时为每条回答保留完整的原始证据。


二、整体数据链路

链路分五个环节:任务生产、队列缓冲、并发消费、分层存储、失败兜底。每一层的设计要点在下文逐一展开。


三、任务生产:用“三要素”拼装消息

调度器不直接调用 AI 平台,而是生成任务消息投递到队列。每条消息包含三个核心维度:

代码语言:javascript
复制
{
  "task_id": "uuid-xxxx",
  "scene_type": "RECOMMEND",
  "platform": "kimi",
  "query_text": "有哪些适合敏感肌的防晒霜推荐?",
  "target_brand": "某品牌",
  "round": 3,
  "max_retries": 3,
  "created_at": "2026-06-29T09:00:00Z"
}

问题来源于预先构建的问题库表,场景标签随任务一同下发,保证后续分析可按场景维度聚合。

代码语言:javascript
复制
CREATE TABLE question_library (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    scene_type VARCHAR(30) NOT NULL COMMENT '场景: RECOMMEND/COMPARE/RISK等',
    query_text TEXT NOT NULL,
    target_brand VARCHAR(100),
    status TINYINT DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_scene (scene_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

四、队列调度:削峰填谷与并发控制

消息队列在整个链路中承担三个角色:

  • 解耦:任务生产与执行互不阻塞。
  • 削峰:瞬时产生的上千条任务排队消费,避免打爆下游 API。
  • 并发控制:通过消费者实例数控制对 AI 平台的并发请求量。

以腾讯云 TDMQ 为例,推荐配置要点:

  • 开启批量投递,减少网络开销;
  • 设置消费限流,避免对同一 AI 平台短时间发起过高并发;
  • 配置消息超时时间与云函数执行超时一致(建议 120 秒)。

云函数侧采用单条消息单次处理的模式。核心消费逻辑伪代码:

代码语言:javascript
复制
def handle_message(message):
    task = parse_message(message)
    adapter = PlatformAdapterFactory.get(task.platform)
    
    try:
        raw_response = adapter.call_api(task.query_text, timeout=60)
        cos_key = save_raw_to_cos(task, raw_response)
        structured = extract_metrics(task, raw_response)
        save_to_db(task, cos_key, structured)
        return "SUCCESS"
    except TimeoutError:
        handle_retry(task)
    except Exception as e:
        send_to_dlq(task, str(e))

五、失败重试:分级策略与死信兜底

消费品牌监测场景中,失败主要有三类,应对策略不同:

失败类型

典型原因

处理策略

临时故障

AI 平台限流、网络抖动

指数退避重试(1min/2min/4min)

超时

联网搜索耗时过长

延长云函数超时或降级为非联网模式

确定性失败

API Key 失效、参数错误

直接进入死信队列,人工介入

重试次数通过消息体中的 max_retries 字段控制,每次重试 retry_count + 1。超出上限的消息投递到死信队列,避免无限重试浪费资源。

代码语言:javascript
复制
def handle_retry(task):
    task['retry_count'] = task.get('retry_count', 0) + 1
    if task['retry_count'] <= task['max_retries']:
        delay = 60 * (2 ** (task['retry_count'] - 1))  # 指数退避
        requeue_with_delay(task, delay)
    else:
        send_to_dlq(task, "MAX_RETRIES_EXCEEDED")

六、分层存储:原始留档 + 结构化入库

存储分为两条独立路径,写入互不影响。

路径一:对象存储 COS,留档原始回答

代码语言:javascript
复制
// COS 文件命名: {date}/{platform}/{scene_type}/{task_id}.json
{
  "task_id": "uuid-xxxx",
  "platform": "kimi",
  "scene_type": "RECOMMEND",
  "query": "有哪些适合敏感肌的防晒霜推荐?",
  "raw_response": "针对敏感肌,以下是几个值得考虑的品牌:...",
  "response_at": "2026-06-29T09:00:35Z",
  "duration_ms": 34800
}

这一层不做任何加工,目的是事后可复核。任何指标争议都可以回溯到原始回答。

路径二:关系型数据库,结构化指标落库

代码语言:javascript
复制
CREATE TABLE monitoring_result (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    task_id VARCHAR(64) NOT NULL UNIQUE,
    platform VARCHAR(30) NOT NULL,
    scene_type VARCHAR(30) NOT NULL COMMENT '场景分类',
    target_brand VARCHAR(100) NOT NULL,
    
    is_mentioned TINYINT(1) DEFAULT 0 COMMENT '是否被提及',
    is_recommended TINYINT(1) DEFAULT 0 COMMENT '是否被推荐',
    recommendation_rank INT DEFAULT 0 COMMENT '推荐位次,0表示未推荐',
    has_citation TINYINT(1) DEFAULT 0 COMMENT '是否引用来源',
    
    is_valid TINYINT(1) DEFAULT 1 COMMENT '样本有效性',
    raw_data_url VARCHAR(512) COMMENT 'COS文件链接',
    sampled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_brand_scene (target_brand, scene_type),
    INDEX idx_platform_date (platform, sampled_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

入库时注意两个要点:

  • task_id 设唯一约束,防止消息重复消费导致数据重复。
  • is_valid 标记区分有效样本与超时、报错等无效样本。后续指标计算只统计 is_valid=1 的数据。

七、场景化查询:数据如何被消费

结构化入库后,按场景维度聚合变得简单。消费品牌可以快速看到自己在不同决策环节的表现差异:

代码语言:javascript
复制
SELECT 
    scene_type,
    COUNT(*) AS total,
    SUM(is_mentioned) AS mentioned,
    SUM(is_recommended) AS recommended,
    ROUND(SUM(is_mentioned)*100.0/COUNT(*), 1) AS mention_rate,
    ROUND(SUM(is_recommended)*100.0/COUNT(*), 1) AS recommend_rate
FROM monitoring_result
WHERE target_brand = '某品牌防晒霜'
  AND is_valid = 1
  AND sampled_at >= '2026-06-01'
GROUP BY scene_type
ORDER BY mention_rate DESC;

输出示例:

场景

样本数

提及率

推荐率

推荐决策

45

73.3%

51.1%

对比分析

30

60.0%

36.7%

风险判断

20

25.0%

5.0%

这份数据直接反映出:该品牌在推荐场景中可见度不错,但在用户“查风险/口碑”环节几乎隐身,提示品牌方需要加强第三方测评、用户反馈类公开内容的建设。


八、三个工程实践建议

1. 云函数超时设置要留余量

AI 平台在联网搜索模式下,单次调用可能耗时 60-90 秒。云函数超时建议设为 120 秒,并在代码层面设 API 调用超时(如 60 秒),两者配合使用。

2. 消息去重需要业务层保障

消息队列通常提供“至少一次”投递语义,可能重复消费。通过 task_id 唯一约束 + 插入前检查,或使用幂等写入(INSERT IGNORE / ON DUPLICATE KEY UPDATE),避免同一条采集结果重复入库。

3. 无效样本单独标记而非直接丢弃

超时、报错、AI 回复“我还不清楚”等情况,不要直接删除,统一标记 is_valid=0 并保留。事后分析无效样本比例本身也能反映系统稳定性。如果某平台无效率突然飙升,可能是接口变动或限流策略调整的信号。


九、结语

场景化 AI 回答采集的工程核心,不在于“调用一次大模型”,而在于把成百上千次调用组织成一条可靠、可追溯、可分析的数据链路。消息队列负责调度与解耦,分层存储兼顾证据留存与指标提取,失败重试机制保证采集完整度,场景标签贯穿全链路让数据从采集那一刻就具备了业务语义。

这套设计已在消费品牌、企业服务等行业的多平台 AI 回答监测中实际使用,开发者可参考本文思路,结合自身业务场景在腾讯云上快速搭建原型。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、场景特征:采集任务的三个特殊约束
  • 二、整体数据链路
  • 三、任务生产:用“三要素”拼装消息
  • 四、队列调度:削峰填谷与并发控制
  • 五、失败重试:分级策略与死信兜底
  • 六、分层存储:原始留档 + 结构化入库
  • 七、场景化查询:数据如何被消费
  • 八、三个工程实践建议
  • 九、结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档