如何打造通用性极强的交互架构,同时精准适配多样化、差异化的业务场景,是智能体业务落地的关键。
本文聚焦:
大模型 IO 接口,依托 Strategy+Adapter 双层抽象架构,实现上层 Agent逻辑与下层LLM通信的完全解耦。
职责:决定"为什么调"和"怎么组合多轮调用"
这一层的组件不直接调用 LLM,而是通过执行层发起业务语义明确的请求。每个调用方的场景、通过 AgentRunSpec 实现不同的参数配置。
设计优势:
AgentRunSpec 声明式地描述需求,不需要关心内部实现。职责:解决"如何让 LLM 反复思考、调用工具、直到完成任务"
这是整个系统的核心引擎。AgentRunner 的 run() 方法实现了一个完整的 ReAct(Reasoning + Acting)循环。
ReAct循环实质是:
通过预设的 max_iterations 循环调用大模型,模型思考返回结果和是否停止标志,如果需要调用工具结果后继续思考,就获取工具结果后继续循环。当大模型认为已经得到最终的结果,就会输出最终结果,触发停止条件,跳出循环,完成整个 ReAct 循环。
在执行层,包装了两个输入和输出的类型。通过输入的参数配置,区分不同的业务需求。通过统一输出,适配不同业务。
输入数据结构:
@dataclass(slots=True)
class AgentRunSpec: # 输入:运行规格(多个可配字段)
initial_messages: list[dict] # 起始消息列表
tools: ToolRegistry # 可用工具集
model: str # 模型名
max_iterations: int # 最大迭代次数
max_tool_result_chars: int # 单工具结果截断长度
hook: AgentHook | None # 生命周期钩子(流式/进度)
concurrent_tools: bool # 是否并行执行工具
fail_on_tool_error: bool # 工具出错是否终止
context_window_tokens: int | None # Token 预算上限
# ... 还有其他字段输出数据结构:
@dataclass(slots=True)
class AgentRunResult: # 输出:运行结果
final_content: str | None # 最终文本回复
messages: list[dict] # 完整对话历史
tools_used: list[str] # 使用过的工具名
usage: dict # Token 用量统计
stop_reason: str # completed / tool_error / max_iterations ...
error: str | None # 错误信息
tool_events: list[dict] # 工具调用事件日志职责:解决"如何与不同的 LLM 服务通信"
统一大模型响应结果LLMResponse, 作为在不同层之间的数据传输对象,即 DTO(Data Transfer Object)。所有 Provider 无论底层是什么 API,返回的都是同一个类型。
即所有 Provider 的输入输出被强制统一为两种类型:
输入: list[dict] ← OpenAI Chat 格式作为"通用中间语言"
[{"role": "system", "content": "..."},
{"role": "user", "content": "..."},
{"role": "assistant", "content": "...", "tool_calls": [...]}]
输出: LLMResponse ← 统一响应信封
├── content: str | None # 文本回复
├── tool_calls: list[ToolCallRequest] # 工具调用请求(归一化)
├── finish_reason: str # stop / tool_calls / length
├── usage: dict # token 用量统计
├── error_*: fields # 结构化错误信息
└── reasoning_content: str # 推理内容 (DeepSeek-R1 等)ToolCallRequest 是关键,无论底层是 Anthropic 的 tool_use 还是 OpenAI 的 function_call,最终都变成:
@dataclass
class ToolCallRequest:
id: str # 统一 ID 格式
name: str # 工具名
arguments: str # JSON 字符串参数DTO 归一化的意义:上层的 AgentRunner 完全不需要知道底层是 Anthropic 还是 DeepSeek,它只检查 LLMResponse 中既定的字段,进行业务逻辑即可。
外部 LLM APILLMProvider(第三层)AgentRunner(第二层)AgentLoop(第一层)用户外部 LLM APILLMProvider(第三层)AgentRunner(第二层)AgentLoop(第一层)用户→ continue 下一轮→ break 结束循环alt[有 tool_calls][纯文本回复]loop[AR ReAct 循环 (最多 N 轮)]"帮我重构这个函数"ContextBuilder.build_messages()组装 system + history + runtime + user msgrun(AgentRunSpec)tools=all, iterations=10, hook=stream_hook① 上下文治理管道drop_orphan → backfill → microcompact → budget → snipchat_stream_with_retry(messages, tools)② 重试引擎_safe → chat(abstract) → retry if needed③ 协议转换 + HTTP 请求(Anthropic/OpenAI/DeepSeek...)原始响应LLMResponse DTO④ 并行/串行执行工具read_file / grep / edit_file ...追加 tool_results 到 messagesAgentRunResultfinal_content + messages + usage_save_turn(session, messages)OutboundMessage(final_content)小结:
第一层回答"做什么"(用户对话/Dream 分析/子代理任务)
第二层回答"怎么做"(ReAct 循环、上下文管理、工具调度)
第三层回答"跟谁通信"(Claude/GPT/DeepSeek/本地模型)
每层通过明确的接口契约交互,各自独立演进。
AgentRunSpec#AgentLoop 调用 AgentRunner
result = await self.runner.run(AgentRunSpec(
initial_messages=messages, # 完整对话上下文
tools=self.tools, # 工具注册表
model=self.model, # 模型名称
max_iterations=10, # 最大循环次数
max_tool_result_chars=16000, # 单工具结果截断长度
hook=hook, # 流式回调钩子
fail_on_tool_error=False, # 工具出错是否终止
concurrent_tools=True, # 允许并行工具执行
...
))这是整个系统的核心引擎,每轮迭代流程如下:
ToolRegistryLLMProviderAgentRunnerToolRegistryLLMProviderAgentRunner① 上下文治理管道② 调用大模型追加到 messages → 下一轮构建最终结果 → breakalt[有工具调用][纯文本回复]loop[每轮迭代 (max N 次)]清理孤立工具结果回填缺失结果微压缩旧工具输出截断超长结果Token 预算滑动窗口裁剪chat_stream_with_retry(messages, tools)LLMResponseexecute(tool_call_1)execute(tool_call_2) [可并行]tool_resultsAgentRunResult上下文治理管道 是一大亮点:
处理步骤 | 解决什么问题 |
|---|---|
_drop_orphan_tool_results() | 删除无父级的工具结果(异常中断后残留) |
_backfill_missing_tool_results() | 为声明但未执行的调用插入占位符 |
_microcompact() | 将 read_file/grep 等旧结果压缩为一行摘要 |
_apply_tool_result_budget() | 截断单个工具返回的超长文本 |
_snip_history() | Token 预算感知的滑动窗口裁剪 |
LLMProvider 重试 + 协议适配# 完整调用链
AgentRunner._request_model() # 流式 or 非流式
→ provider.chat_stream_with_retry(**kwargs)
→ provider._run_with_retry(call=chat_stream, ...) # 重试引擎
→ provider._safe_chat_stream(...) # 异常兜底
→ concrete.chat_stream(...) # 真正的 API 调用
→ [协议转换: 内部格式 → 厂商原生格式]
→ [HTTP 请求]
→ [响应解析: 厂商格式 → LLMResponse DTO]Base 类内置了重试基础设施,这是生产级稳定性的关键
正常响应异常/错误响应瞬时错误永久性错误retry_mode="standard"retry_mode="persistent"指数退避 1s→2s→4s (最多3次)无限重试, 最大60s间隔, 相同错误上限10次HTTP 429 有 Retry-After 头?无 Retry-After按 chunk 周期性回调 UI同上含图片? → 去掉图片重试一次重试图片已去掉仍失败LLMResponse错误 LLMResponseLLMResponse (不抛异常!)调用API成功判断可重试可重试不可重试standard模式persistent模式等待提取RetryAfter固定等待心跳等待特殊策略返回错误响应关键特性:
LLMResponse(error_*=...) 对象,保证返回类型一致使用场景与优势
场景 | 怎么用 | 设计带来的好处 |
|---|---|---|
主对话 | AgentLoop → AgentRunner.run(spec) → ReAct 循环 | 自动工具调用、上下文管理、流式输出 |
Dream Phase 1 | 直接 provider.chat_with_retry(messages, tools=None) | 纯分析,不需要工具,轻量级 |
Dream Phase 2 | 新建 AgentRunner(provider).run(AgentRunSpec(tools=[read_file, edit_file])) | 需要 edit_file 能力,复用完整的 ReAct 循环 |
Consolidator | 直接 provider.chat_with_retry(messages, tools=None) | 单次摘要任务 |
子代理 | SubagentManager 创建新的 AgentRunner | 隔离的执行环境,独立工具集 |
切换模型 | 改配置 model: "claude-sonnet" 或 model: "deepseek-chat" | 零代码改动,Provider Registry 自动路由 |
核心优势一句话:通过 Strategy + Adapter 双层抽象,让上层的 Agent 逻辑(ReAct 循环、工具调度、上下文管理)与下层的 LLM 通信(不同厂商、不同协议、不同认证方式)完全解耦,同时 Base 类内建的生产级重试引擎保证了可靠性。