
Agent2Agent(A2A)是一个专注于智能体间通信、协作与协调的框架,旨在促进多个智能体(Agent)之间的通信与协作。在人工智能领域,智能体是自主的软件实体,能够感知环境、执行动作、与其他智能体通信,单个智能体的能力有限,而多个智能体通过有效的交互可以解决更复杂的问题。A2A框架提供了标准化的通信协议、消息传递机制和协作模式,使智能体能够像人类团队一样协同工作,解决单个智能体难以处理的复杂问题。

智能体是A2A框架中的基本单位。每个智能体具有以下特性:
在A2A框架中,智能体通过消息进行通信。每个智能体有一个唯一的标识符(Agent ID),并可以注册多种能力(Capabilities),这些能力描述了智能体能够执行的任务类型。
1. 消息(Message)
消息是智能体之间通信的基本单元。A2A框架定义了标准的消息格式,包括:
2. 传输层(Transport)
传输层负责消息的实际传递。它可以是本地的(在同一进程内)或分布式的(通过网络)。传输层的主要职责是:
A2A框架可以支持多种传输实现,例如本地内存传输、基于消息队列(如RabbitMQ)的分布式传输等。
3. 任务分配器
任务分配器负责将任务分配给合适的智能体。它根据智能体的能力和任务的需求进行匹配。任务分配算法可以是简单的轮询,也可以是基于能力匹配的复杂算法。
4. 协商协议
协商协议定义了智能体之间如何通过多轮交互达成一致。常见的协商协议包括合同网协议和拍卖协议。
2.1 角色定义
角色是一组职责、权限和期望行为的封装。智能体通过扮演不同角色参与协作:
2.2 动态角色分配
根据任务需求和组织状态动态分配角色,使系统能够适应环境变化。考虑智能体能力、负载状态等因素。

流程说明:
阶段1:协作准备阶段
阶段2:通信与协商阶段
阶段3:任务执行与监控阶段
阶段4:结果整合与评估阶段
阶段5:协作终止与总结

详细说明:
决策因素分析:
协作模式特性:

任务分解过程:
任务分配机制:
并行执行管理:
结果收集与整合:
反馈与学习:

流程说明:
任务接收与分析:
分解决策过程:
分解策略选择:
子任务规格制定:
智能体能力匹配:
任务分配与依赖管理:

任务需求维度分析:
智能体能力评估:
匹配策略:
示例实现一个简单的Agent2Agent通信框架,展示了多智能体系统的基本工作原理。下面我将逐部分分解代码并说明其设计意图。
1. 基础定义
1.1 消息类型枚举
class MessageType(Enum):
TASK_REQUEST = "task_request" # 任务请求
TASK_RESPONSE = "task_response" # 任务响应
GREETING = "greeting" # 问候消息1.2 消息数据结构
@dataclass
class A2AMessage:
message_id: str # 消息唯一ID
sender_id: str # 发送者ID
receiver_id: str # 接收者ID
message_type: MessageType # 消息类型
content: Dict[str, Any] # 消息内容
timestamp: float # 时间戳2. 通信传输层:SimpleTransport类
class SimpleTransport:
def __init__(self):
self.agents: Dict[str, callable] = {} # 智能体注册表2.1 注册智能体:
def register_agent(self, agent_id: str, message_handler: callable):
self.agents[agent_id] = message_handler2.2 发送消息:
async def send_message(self, message: A2AMessage):
# 检查接收者是否存在
# 模拟网络延迟
# 调用接收者的消息处理函数3. 智能体基类:SimpleAgent类
class SimpleAgent:
def __init__(self, agent_id: str, name: str, transport: SimpleTransport):
self.agent_id = agent_id
self.name = name
self.transport = transport
self.conversation_history: List[A2AMessage] = []3.1 消息发送:
async def send_message(self, receiver_id: str, message_type: MessageType, content: Dict[str, Any]):
# 创建标准化消息
# 记录到对话历史
# 通过传输层发送3.2 消息接收:
async def receive_message(self, message: A2AMessage):
# 记录接收到的消息
# 提供基础处理(子类可扩展)4. 专用智能体实现:TaskManagerAgent任务管理器
class TaskManagerAgent(SimpleAgent):
def __init__(self, agent_id: str, name: str, transport: SimpleTransport):
super().__init__(agent_id, name, transport)
self.pending_tasks: Dict[str, Dict] = {} # 待处理任务
self.completed_tasks: Dict[str, Any] = {} # 已完成任务4.1 任务创建与分配:
async def create_task(self, task_description: str, worker_id: str):
# 生成唯一任务ID
# 保存任务信息到待处理列表
# 发送任务请求给指定工作者4.2 消息处理扩展:
async def receive_message(self, message: A2AMessage):
await super().receive_message(message) # 基础处理
if message.message_type == MessageType.TASK_RESPONSE:
await self.handle_task_response(message)
elif message.message_type == MessageType.GREETING:
await self.handle_greeting(message)5. WorkerAgent类:工作者智能体
class WorkerAgent(SimpleAgent):
def __init__(self, agent_id: str, name: str, transport: SimpleTransport, skills: List[str]):
super().__init__(agent_id, name, transport)
self.skills = skills
self.tasks_completed = 05.1 任务处理:
async def handle_task_request(self, message: A2AMessage):
# 解析任务信息
# 模拟任务执行(异步等待)
# 生成任务结果
# 发送任务完成响应5.2 技能匹配处理:
def process_task(self, description: str) -> str:
if "计算" in description or "数学" in description:
return f"计算完成: 42 (使用 {self.skills} 技能)"
# 其他技能分支...6. 程序启动运行
6.1 基础演示
async def demo_agent_communication():
# 创建传输层和智能体实例
# 演示问候消息交换
# 演示任务分配和执行流程
# 展示统计信息6.2 高级演示
async def advanced_demo():
# 创建多个专业智能体
# 并行分配不同类型任务
# 展示跨领域协作运行结果:
================================================== Agent2Agent 通信演示开始 ================================================== 传输层: 智能体 manager_001 已注册 智能体初始化: 任务管理器 (ID: manager_001) 传输层: 智能体 worker_001 已注册 智能体初始化: AI工作者 (ID: worker_001)
============================== 场景1: 简单问候 ============================== 传输层: manager_001 -> worker_001 [greeting] AI工作者 收到来自 manager_001 的消息: greeting
============================== 场景2: 任务分配 ==============================
--- 任务 1 --- 任务管理器 创建任务: 计算复杂的数学公式 传输层: manager_001 -> worker_001 [task_request] AI工作者 收到来自 manager_001 的消息: task_request AI工作者 开始处理任务: 计算复杂的数学公式 传输层: worker_001 -> manager_001 [task_response] 任务管理器 收到来自 worker_001 的消息: task_response 任务管理器 收到任务完成通知: task_1759064250 结果: 计算完成: 42 (使用 ['Python编程', '数据分析', '机器学习'] 技能) AI工作者 完成任务: 计算复杂的数学公式
--- 任务 2 --- 任务管理器 创建任务: 分析销售数据趋势 传输层: manager_001 -> worker_001 [task_request] AI工作者 收到来自 manager_001 的消息: task_request AI工作者 开始处理任务: 分析销售数据趋势 传输层: worker_001 -> manager_001 [task_response] 任务管理器 收到来自 worker_001 的消息: task_response 任务管理器 收到任务完成通知: task_1759064253 结果: 分析报告: 数据趋势良好 AI工作者 完成任务: 分析销售数据趋势
--- 任务 3 --- 任务管理器 创建任务: 编写Python数据处理代码 传输层: manager_001 -> worker_001 [task_request] AI工作者 收到来自 manager_001 的消息: task_request AI工作者 开始处理任务: 编写Python数据处理代码 传输层: worker_001 -> manager_001 [task_response] 任务管理器 收到来自 worker_001 的消息: task_response 任务管理器 收到任务完成通知: task_1759064256 结果: 代码已编写: def solution(): return 'Hello World' AI工作者 完成任务: 编写Python数据处理代码
--- 任务 4 --- 任务管理器 创建任务: 生成月度报告摘要 传输层: manager_001 -> worker_001 [task_request] AI工作者 收到来自 manager_001 的消息: task_request AI工作者 开始处理任务: 生成月度报告摘要 传输层: worker_001 -> manager_001 [task_response] 任务管理器 收到来自 worker_001 的消息: task_response 任务管理器 收到任务完成通知: task_1759064259 结果: 任务 '生成月度报告摘要' 已处理完成 AI工作者 完成任务: 生成月度报告摘要
============================== 结果统计 ============================== 任务管理器统计: 总消息数: 9 完成任务: 4 待处理任务: 0
工作者统计: 总消息数: 9 完成任务: 4
================================================== Agent2Agent 通信演示完成! ==================================================
******************** 高级演示 ******************** 传输层: 智能体 manager_1 已注册 智能体初始化: 高级管理器 (ID: manager_1) 传输层: 智能体 worker_1 已注册 智能体初始化: 开发专家 (ID: worker_1) 传输层: 智能体 worker_2 已注册 智能体初始化: 数据分析师 (ID: worker_2) 传输层: 智能体 worker_3 已注册 智能体初始化: 技术作家 (ID: worker_3) 高级管理器 创建任务: 开发新的数据处理模块 传输层: manager_1 -> worker_1 [task_request] 高级管理器 创建任务: 分析用户行为数据 传输层: manager_1 -> worker_2 [task_request] 高级管理器 创建任务: 编写项目文档 传输层: manager_1 -> worker_3 [task_request] 高级管理器 创建任务: 优化算法性能 传输层: manager_1 -> worker_1 [task_request] 开发专家 收到来自 manager_1 的消息: task_request 开发专家 开始处理任务: 开发新的数据处理模块 技术作家 收到来自 manager_1 的消息: task_request 传输层: manager_1 -> worker_1 [task_request] 开发专家 收到来自 manager_1 的消息: task_request 开发专家 开始处理任务: 开发新的数据处理模块 技术作家 收到来自 manager_1 的消息: task_request 开发专家 收到来自 manager_1 的消息: task_request 开发专家 开始处理任务: 开发新的数据处理模块 技术作家 收到来自 manager_1 的消息: task_request 技术作家 收到来自 manager_1 的消息: task_request 技术作家 开始处理任务: 编写项目文档 数据分析师 收到来自 manager_1 的消息: task_request 数据分析师 收到来自 manager_1 的消息: task_request 数据分析师 开始处理任务: 分析用户行为数据 数据分析师 开始处理任务: 分析用户行为数据 开发专家 收到来自 manager_1 的消息: task_request 开发专家 开始处理任务: 优化算法性能 传输层: worker_1 -> manager_1 [task_response] 传输层: worker_2 -> manager_1 [task_response] 传输层: worker_3 -> manager_1 [task_response] 传输层: worker_1 -> manager_1 [task_response] 高级管理器 收到来自 worker_2 的消息: task_response 高级管理器 收到任务完成通知: task_1759064261 结果: 分析报告: 数据趋势良好 数据分析师 完成任务: 分析用户行为数据 高级管理器 收到来自 worker_3 的消息: task_response 技术作家 完成任务: 编写项目文档 高级管理器 收到来自 worker_1 的消息: task_response 开发专家 完成任务: 开发新的数据处理模块 高级管理器 收到来自 worker_1 的消息: task_response 开发专家 完成任务: 优化算法性能
高级演示完成! 管理器处理了 8 条消息 完成了 1 个跨领域任务
Agent2Agent框架为构建智能体间通信与协作系统提供了坚实的基础。通过标准化的消息格式、灵活的传输层和丰富的协作模式,开发者可以快速构建能够解决复杂问题的多智能体系统。随着人工智能技术的不断发展,A2A框架将在构建更智能、更自治的AI系统中发挥重要作用。
该框架具有良好的扩展性,可以根据具体应用场景添加新的通信协议、协作算法和容错机制,满足不同领域的需求。
import asyncio
import time
import uuid
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from enum import Enum
# ==================== 基础定义 ====================
class MessageType(Enum):
"""消息类型枚举"""
TASK_REQUEST = "task_request" # 任务请求
TASK_RESPONSE = "task_response" # 任务响应
GREETING = "greeting" # 问候消息
@dataclass
class A2AMessage:
"""
A2A消息类
定义智能体间通信的标准消息格式
"""
message_id: str # 消息唯一ID
sender_id: str # 发送者ID
receiver_id: str # 接收者ID
message_type: MessageType # 消息类型
content: Dict[str, Any] # 消息内容
timestamp: float # 时间戳
def to_dict(self) -> Dict[str, Any]:
"""将消息转换为字典格式,便于传输"""
return {
"message_id": self.message_id,
"sender_id": self.sender_id,
"receiver_id": self.receiver_id,
"message_type": self.message_type.value,
"content": self.content,
"timestamp": self.timestamp
}
class SimpleTransport:
"""
简单传输层
负责智能体间的消息传递(模拟网络通信)
"""
def __init__(self):
# 存储注册的智能体和它们的消息处理函数
self.agents: Dict[str, callable] = {}
def register_agent(self, agent_id: str, message_handler: callable):
"""
注册智能体到传输层
Args:
agent_id: 智能体ID
message_handler: 消息处理函数
"""
self.agents[agent_id] = message_handler
print(f" 传输层: 智能体 {agent_id} 已注册")
async def send_message(self, message: A2AMessage):
"""
发送消息到目标智能体
Args:
message: 要发送的消息
"""
print(f" 传输层: {message.sender_id} -> {message.receiver_id} "
f"[{message.message_type.value}]")
# 检查接收者是否存在
if message.receiver_id not in self.agents:
print(f" 错误: 智能体 {message.receiver_id} 未注册")
return
# 获取接收者的消息处理函数
receiver_handler = self.agents[message.receiver_id]
# 模拟网络延迟
await asyncio.sleep(0.1)
# 将消息传递给接收者
await receiver_handler(message)
# ==================== 智能体定义 ====================
class SimpleAgent:
"""
简单智能体基类
所有智能体的共同属性和行为
"""
def __init__(self, agent_id: str, name: str, transport: SimpleTransport):
"""
初始化智能体
Args:
agent_id: 智能体唯一标识
name: 智能体名称
transport: 传输层实例
"""
self.agent_id = agent_id
self.name = name
self.transport = transport
self.conversation_history: List[A2AMessage] = []
# 向传输层注册自己
self.transport.register_agent(self.agent_id, self.receive_message)
print(f" 智能体初始化: {self.name} (ID: {self.agent_id})")
async def send_message(self, receiver_id: str, message_type: MessageType, content: Dict[str, Any]):
"""
发送消息给其他智能体
Args:
receiver_id: 接收者ID
message_type: 消息类型
content: 消息内容
"""
# 创建消息对象
message = A2AMessage(
message_id=str(uuid.uuid4()), # 生成唯一ID
sender_id=self.agent_id,
receiver_id=receiver_id,
message_type=message_type,
content=content,
timestamp=time.time()
)
# 保存到对话历史
self.conversation_history.append(message)
# 通过传输层发送消息
await self.transport.send_message(message)
async def receive_message(self, message: A2AMessage):
"""
接收消息的通用处理(子类可以重写)
Args:
message: 接收到的消息
"""
print(f" {self.name} 收到来自 {message.sender_id} 的消息: {message.message_type.value}")
# 保存到对话历史
self.conversation_history.append(message)
def get_conversation_summary(self):
"""获取对话摘要"""
return f"{self.name} 进行了 {len(self.conversation_history)} 次消息交换"
class TaskManagerAgent(SimpleAgent):
"""
任务管理器智能体
负责创建和分配任务
"""
def __init__(self, agent_id: str, name: str, transport: SimpleTransport):
super().__init__(agent_id, name, transport)
self.pending_tasks: Dict[str, Dict] = {} # 待处理任务
self.completed_tasks: Dict[str, Any] = {} # 已完成任务
async def create_task(self, task_description: str, worker_id: str):
"""
创建新任务并分配给工作者
Args:
task_description: 任务描述
worker_id: 工作者智能体ID
"""
task_id = f"task_{int(time.time())}"
print(f" {self.name} 创建任务: {task_description}")
# 保存任务信息
self.pending_tasks[task_id] = {
"description": task_description,
"worker": worker_id,
"status": "assigned",
"created_time": time.time()
}
# 发送任务请求
await self.send_message(
receiver_id=worker_id,
message_type=MessageType.TASK_REQUEST,
content={
"task_id": task_id,
"description": task_description,
"priority": "normal"
}
)
async def receive_message(self, message: A2AMessage):
"""
处理接收到的消息(重写父类方法)
"""
await super().receive_message(message) # 调用父类基础处理
if message.message_type == MessageType.TASK_RESPONSE:
# 处理任务响应
await self.handle_task_response(message)
elif message.message_type == MessageType.GREETING:
# 处理问候消息
await self.handle_greeting(message)
async def handle_task_response(self, message: A2AMessage):
"""
处理任务完成响应
Args:
message: 任务响应消息
"""
task_id = message.content.get("task_id")
result = message.content.get("result")
status = message.content.get("status")
if task_id in self.pending_tasks:
if status == "completed":
# 任务完成
self.completed_tasks[task_id] = {
**self.pending_tasks[task_id],
"result": result,
"completion_time": time.time()
}
del self.pending_tasks[task_id]
print(f" {self.name} 收到任务完成通知: {task_id}")
print(f" 结果: {result}")
else:
print(f" 任务 {task_id} 失败: {result}")
async def handle_greeting(self, message: A2AMessage):
"""
处理问候消息
Args:
message: 问候消息
"""
greeting_text = message.content.get("text", "")
print(f" {self.name} 收到问候: {greeting_text}")
class WorkerAgent(SimpleAgent):
"""
工作者智能体
负责执行具体任务
"""
def __init__(self, agent_id: str, name: str, transport: SimpleTransport, skills: List[str]):
"""
初始化工作者智能体
Args:
skills: 技能列表
"""
super().__init__(agent_id, name, transport)
self.skills = skills
self.tasks_completed = 0
async def receive_message(self, message: A2AMessage):
"""
处理接收到的消息
"""
await super().receive_message(message)
if message.message_type == MessageType.TASK_REQUEST:
# 处理任务请求
await self.handle_task_request(message)
async def handle_task_request(self, message: A2AMessage):
"""
处理任务请求并执行任务
Args:
message: 任务请求消息
"""
task_id = message.content.get("task_id")
description = message.content.get("description")
print(f" {self.name} 开始处理任务: {description}")
# 模拟任务执行时间
await asyncio.sleep(1)
# 生成任务结果(根据任务描述和技能)
result = self.process_task(description)
# 发送任务完成响应
await self.send_message(
receiver_id=message.sender_id,
message_type=MessageType.TASK_RESPONSE,
content={
"task_id": task_id,
"status": "completed",
"result": result,
"worker_skills_used": self.skills
}
)
self.tasks_completed += 1
print(f" {self.name} 完成任务: {description}")
def process_task(self, description: str) -> str:
"""
处理具体任务(根据描述生成结果)
Args:
description: 任务描述
Returns:
任务结果
"""
if "计算" in description or "数学" in description:
return f"计算完成: 42 (使用 {self.skills} 技能)"
elif "分析" in description:
return f"分析报告: 数据趋势良好"
elif "编写" in description or "代码" in description:
return f"代码已编写: def solution(): return 'Hello World'"
else:
return f"任务 '{description}' 已处理完成"
# ==================== 演示程序 ====================
async def demo_agent_communication():
"""
演示智能体间通信的主函数
"""
print("=" * 50)
print(" Agent2Agent 通信演示开始")
print("=" * 50)
# 创建传输层实例
transport = SimpleTransport()
# 创建智能体实例
task_manager = TaskManagerAgent(
agent_id="manager_001",
name="任务管理器",
transport=transport
)
worker_agent = WorkerAgent(
agent_id="worker_001",
name="AI工作者",
transport=transport,
skills=["Python编程", "数据分析", "机器学习"]
)
# 等待一下让注册完成
await asyncio.sleep(0.1)
print("\n" + "="*30 + " 场景1: 简单问候 " + "="*30)
# 场景1: 简单问候
await task_manager.send_message(
receiver_id=worker_agent.agent_id,
message_type=MessageType.GREETING,
content={"text": "你好,我是任务管理器!"}
)
await asyncio.sleep(0.5) # 等待消息处理
print("\n" + "="*30 + " 场景2: 任务分配 " + "="*30)
# 场景2: 任务分配和执行
tasks = [
"计算复杂的数学公式",
"分析销售数据趋势",
"编写Python数据处理代码",
"生成月度报告摘要"
]
for i, task_desc in enumerate(tasks, 1):
print(f"\n--- 任务 {i} ---")
await task_manager.create_task(task_desc, worker_agent.agent_id)
await asyncio.sleep(1.5) # 等待任务完成
print("\n" + "="*30 + " 结果统计 " + "="*30)
# 显示结果统计
print(f" 任务管理器统计:")
print(f" 总消息数: {len(task_manager.conversation_history)}")
print(f" 完成任务: {len(task_manager.completed_tasks)}")
print(f" 待处理任务: {len(task_manager.pending_tasks)}")
print(f"\n 工作者统计:")
print(f" 总消息数: {len(worker_agent.conversation_history)}")
print(f" 完成任务: {worker_agent.tasks_completed}")
print("\n" + "="*50)
print(" Agent2Agent 通信演示完成!")
print("=" * 50)
async def advanced_demo():
"""
高级演示:多个智能体协作
"""
print("\n" + "*"*20 + " 高级演示 " + "*"*20)
transport = SimpleTransport()
# 创建多个智能体
manager = TaskManagerAgent("manager_1", "高级管理器", transport)
# 创建具有不同技能的多个工作者
developer = WorkerAgent("worker_1", "开发专家", transport,
["Python", "Java", "系统设计"])
analyst = WorkerAgent("worker_2", "数据分析师", transport,
["统计分析", "数据可视化", "机器学习"])
writer = WorkerAgent("worker_3", "技术作家", transport,
["文档编写", "技术说明", "API文档"])
await asyncio.sleep(0.1)
# 并行分配任务给不同的工作者
tasks = [
("开发新的数据处理模块", developer.agent_id),
("分析用户行为数据", analyst.agent_id),
("编写项目文档", writer.agent_id),
("优化算法性能", developer.agent_id),
]
# 同时启动所有任务
task_coroutines = [
manager.create_task(desc, worker_id)
for desc, worker_id in tasks
]
await asyncio.gather(*task_coroutines)
# 等待所有任务完成
await asyncio.sleep(3)
print(f"\n 高级演示完成!")
print(f" 管理器处理了 {len(manager.conversation_history)} 条消息")
print(f" 完成了 {len(manager.completed_tasks)} 个跨领域任务")
if __name__ == "__main__":
# 运行基础演示
asyncio.run(demo_agent_communication())
# 运行高级演示
asyncio.run(advanced_demo())原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。