当业务需求从“爬取网页”变成“批量向多个 AI 平台提问并分析回答”,后端架构面临一系列新问题:上千条任务如何调度?调用失败如何重试?原始回答怎么存?提取出的指标怎么落库?本文围绕消费品牌 AI 回答监测这一典型场景,拆解从队列调度、并发采集、失败重试到分层存储的完整数据链路设计。
消费品牌 AI 回答监测与普通数据采集有明显区别:
这三个约束决定了:架构上必须将“任务生产”与“任务执行”彻底解耦,同时为每条回答保留完整的原始证据。
链路分五个环节:任务生产、队列缓冲、并发消费、分层存储、失败兜底。每一层的设计要点在下文逐一展开。
调度器不直接调用 AI 平台,而是生成任务消息投递到队列。每条消息包含三个核心维度:
{
"task_id": "uuid-xxxx",
"scene_type": "RECOMMEND",
"platform": "kimi",
"query_text": "有哪些适合敏感肌的防晒霜推荐?",
"target_brand": "某品牌",
"round": 3,
"max_retries": 3,
"created_at": "2026-06-29T09:00:00Z"
}问题来源于预先构建的问题库表,场景标签随任务一同下发,保证后续分析可按场景维度聚合。
CREATE TABLE question_library (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
scene_type VARCHAR(30) NOT NULL COMMENT '场景: RECOMMEND/COMPARE/RISK等',
query_text TEXT NOT NULL,
target_brand VARCHAR(100),
status TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_scene (scene_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;消息队列在整个链路中承担三个角色:
以腾讯云 TDMQ 为例,推荐配置要点:
云函数侧采用单条消息单次处理的模式。核心消费逻辑伪代码:
def handle_message(message):
task = parse_message(message)
adapter = PlatformAdapterFactory.get(task.platform)
try:
raw_response = adapter.call_api(task.query_text, timeout=60)
cos_key = save_raw_to_cos(task, raw_response)
structured = extract_metrics(task, raw_response)
save_to_db(task, cos_key, structured)
return "SUCCESS"
except TimeoutError:
handle_retry(task)
except Exception as e:
send_to_dlq(task, str(e))消费品牌监测场景中,失败主要有三类,应对策略不同:
失败类型 | 典型原因 | 处理策略 |
|---|---|---|
临时故障 | AI 平台限流、网络抖动 | 指数退避重试(1min/2min/4min) |
超时 | 联网搜索耗时过长 | 延长云函数超时或降级为非联网模式 |
确定性失败 | API Key 失效、参数错误 | 直接进入死信队列,人工介入 |
重试次数通过消息体中的 max_retries 字段控制,每次重试 retry_count + 1。超出上限的消息投递到死信队列,避免无限重试浪费资源。
def handle_retry(task):
task['retry_count'] = task.get('retry_count', 0) + 1
if task['retry_count'] <= task['max_retries']:
delay = 60 * (2 ** (task['retry_count'] - 1)) # 指数退避
requeue_with_delay(task, delay)
else:
send_to_dlq(task, "MAX_RETRIES_EXCEEDED")存储分为两条独立路径,写入互不影响。
路径一:对象存储 COS,留档原始回答
// COS 文件命名: {date}/{platform}/{scene_type}/{task_id}.json
{
"task_id": "uuid-xxxx",
"platform": "kimi",
"scene_type": "RECOMMEND",
"query": "有哪些适合敏感肌的防晒霜推荐?",
"raw_response": "针对敏感肌,以下是几个值得考虑的品牌:...",
"response_at": "2026-06-29T09:00:35Z",
"duration_ms": 34800
}这一层不做任何加工,目的是事后可复核。任何指标争议都可以回溯到原始回答。
路径二:关系型数据库,结构化指标落库
CREATE TABLE monitoring_result (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL UNIQUE,
platform VARCHAR(30) NOT NULL,
scene_type VARCHAR(30) NOT NULL COMMENT '场景分类',
target_brand VARCHAR(100) NOT NULL,
is_mentioned TINYINT(1) DEFAULT 0 COMMENT '是否被提及',
is_recommended TINYINT(1) DEFAULT 0 COMMENT '是否被推荐',
recommendation_rank INT DEFAULT 0 COMMENT '推荐位次,0表示未推荐',
has_citation TINYINT(1) DEFAULT 0 COMMENT '是否引用来源',
is_valid TINYINT(1) DEFAULT 1 COMMENT '样本有效性',
raw_data_url VARCHAR(512) COMMENT 'COS文件链接',
sampled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_brand_scene (target_brand, scene_type),
INDEX idx_platform_date (platform, sampled_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;入库时注意两个要点:
task_id 设唯一约束,防止消息重复消费导致数据重复。is_valid 标记区分有效样本与超时、报错等无效样本。后续指标计算只统计 is_valid=1 的数据。结构化入库后,按场景维度聚合变得简单。消费品牌可以快速看到自己在不同决策环节的表现差异:
SELECT
scene_type,
COUNT(*) AS total,
SUM(is_mentioned) AS mentioned,
SUM(is_recommended) AS recommended,
ROUND(SUM(is_mentioned)*100.0/COUNT(*), 1) AS mention_rate,
ROUND(SUM(is_recommended)*100.0/COUNT(*), 1) AS recommend_rate
FROM monitoring_result
WHERE target_brand = '某品牌防晒霜'
AND is_valid = 1
AND sampled_at >= '2026-06-01'
GROUP BY scene_type
ORDER BY mention_rate DESC;输出示例:
场景 | 样本数 | 提及率 | 推荐率 |
|---|---|---|---|
推荐决策 | 45 | 73.3% | 51.1% |
对比分析 | 30 | 60.0% | 36.7% |
风险判断 | 20 | 25.0% | 5.0% |
这份数据直接反映出:该品牌在推荐场景中可见度不错,但在用户“查风险/口碑”环节几乎隐身,提示品牌方需要加强第三方测评、用户反馈类公开内容的建设。
1. 云函数超时设置要留余量
AI 平台在联网搜索模式下,单次调用可能耗时 60-90 秒。云函数超时建议设为 120 秒,并在代码层面设 API 调用超时(如 60 秒),两者配合使用。
2. 消息去重需要业务层保障
消息队列通常提供“至少一次”投递语义,可能重复消费。通过 task_id 唯一约束 + 插入前检查,或使用幂等写入(INSERT IGNORE / ON DUPLICATE KEY UPDATE),避免同一条采集结果重复入库。
3. 无效样本单独标记而非直接丢弃
超时、报错、AI 回复“我还不清楚”等情况,不要直接删除,统一标记 is_valid=0 并保留。事后分析无效样本比例本身也能反映系统稳定性。如果某平台无效率突然飙升,可能是接口变动或限流策略调整的信号。
场景化 AI 回答采集的工程核心,不在于“调用一次大模型”,而在于把成百上千次调用组织成一条可靠、可追溯、可分析的数据链路。消息队列负责调度与解耦,分层存储兼顾证据留存与指标提取,失败重试机制保证采集完整度,场景标签贯穿全链路让数据从采集那一刻就具备了业务语义。
这套设计已在消费品牌、企业服务等行业的多平台 AI 回答监测中实际使用,开发者可参考本文思路,结合自身业务场景在腾讯云上快速搭建原型。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。