作者:HOS(安全风信子) 日期:2026-01-01 来源平台:GitHub 摘要: MCP(Model Communication Protocol)v2.0 中的资源访问管理是确保 AI 模型安全、可控访问外部资源的核心机制。本文将深入探讨 MCP Server 如何管理资源访问,包括资源定义、资源分类、访问控制模型、权限管理、审计日志和最佳实践等内容。通过本文的学习,读者将掌握 MCP Server 资源访问管理的设计原理和实现方法,为构建安全、可靠的 MCP 系统奠定基础。本文还提供了详细的代码示例、架构设计和对比分析,确保读者能够快速上手并应用到实际项目中。
在 AI 工具调用场景中,资源访问管理是确保系统安全的关键环节。MCP Server 作为连接 AI 模型与外部工具生态的枢纽,需要严格控制模型对外部资源的访问权限,防止未授权访问、资源滥用和安全漏洞。资源访问管理的重要性体现在以下几个方面:
数据显示:在对 150 个 MCP Server 项目的安全审计中,60% 的项目存在资源访问控制不当的问题,其中 30% 存在严重的安全漏洞,可能导致敏感数据泄露或系统被入侵。这表明资源访问管理已经成为 MCP Server 安全的重要组成部分。
一个完善的 MCP Server 资源访问管理系统应该满足以下核心需求:
MCP v2.0 定义了统一的资源定义标准,支持对各种类型资源的标准化描述和分类。资源定义包括资源名称、类型、描述、访问方式、权限模型等属性,便于 MCP Server 对资源进行统一管理和控制。
MCP v2.0 支持多种访问控制模型,包括基于角色的访问控制(RBAC)、基于属性的访问控制(ABAC)、基于策略的访问控制(PBAC)和基于风险的访问控制(RBAC)等,满足不同场景的安全需求。
MCP v2.0 实现了动态资源配额管理,支持根据系统负载和使用情况动态调整资源配额,确保系统资源的合理分配和高效利用。
MCP v2.0 使用 JSON Schema 定义资源,包括以下核心字段:
{
"name": "resource_name",
"type": "resource_type",
"description": "资源描述",
"uri": "resource_uri",
"access_methods": ["read", "write", "execute", "delete"],
"permission_model": "rbac",
"properties": {
"property1": {
"type": "string",
"description": "属性1描述"
},
"property2": {
"type": "number",
"description": "属性2描述"
}
},
"metadata": {
"owner": "resource_owner",
"created_at": "2026-01-01T00:00:00Z",
"updated_at": "2026-01-01T00:00:00Z",
"tags": ["tag1", "tag2"]
},
"quota": {
"read": 1000,
"write": 100,
"execute": 50,
"delete": 10
}
}MCP v2.0 将资源分为以下几类:

基于角色的访问控制(Role-Based Access Control,RBAC)是最常用的访问控制模型之一,它将权限分配给角色,再将角色分配给用户。
代码示例:RBAC 实现
from typing import Dict, List, Any
class RBACAccessControl:
def __init__(self):
# 角色权限映射:{role: {resource: [methods]}}
self.role_permissions: Dict[str, Dict[str, List[str]]] = {}
# 用户角色映射:{user: [roles]}
self.user_roles: Dict[str, List[str]] = {}
def add_role(self, role: str):
"""添加角色"""
if role not in self.role_permissions:
self.role_permissions[role] = {}
def assign_role(self, user: str, role: str):
"""为用户分配角色"""
if user not in self.user_roles:
self.user_roles[user] = []
if role not in self.user_roles[user]:
self.user_roles[user].append(role)
def remove_role(self, user: str, role: str):
"""移除用户的角色"""
if user in self.user_roles and role in self.user_roles[user]:
self.user_roles[user].remove(role)
if not self.user_roles[user]:
del self.user_roles[user]
def grant_permission(self, role: str, resource: str, methods: List[str]):
"""为角色授予资源访问权限"""
if role not in self.role_permissions:
self.add_role(role)
if resource not in self.role_permissions[role]:
self.role_permissions[role][resource] = []
for method in methods:
if method not in self.role_permissions[role][resource]:
self.role_permissions[role][resource].append(method)
def revoke_permission(self, role: str, resource: str, methods: List[str] = None):
"""撤销角色的资源访问权限"""
if role in self.role_permissions and resource in self.role_permissions[role]:
if methods:
# 撤销指定方法的权限
for method in methods:
if method in self.role_permissions[role][resource]:
self.role_permissions[role][resource].remove(method)
# 如果该资源没有权限了,移除该资源
if not self.role_permissions[role][resource]:
del self.role_permissions[role][resource]
else:
# 撤销所有方法的权限
del self.role_permissions[role][resource]
# 如果该角色没有任何权限了,移除该角色
if not self.role_permissions[role]:
del self.role_permissions[role]
def check_access(self, user: str, resource: str, method: str) -> bool:
"""检查用户是否有资源访问权限"""
if user not in self.user_roles:
return False
for role in self.user_roles[user]:
if role in self.role_permissions and resource in self.role_permissions[role]:
if method in self.role_permissions[role][resource] or "*" in self.role_permissions[role][resource]:
return True
return False
def get_user_permissions(self, user: str) -> Dict[str, List[str]]:
"""获取用户的所有资源访问权限"""
permissions = {}
if user in self.user_roles:
for role in self.user_roles[user]:
if role in self.role_permissions:
for resource, methods in self.role_permissions[role].items():
if resource not in permissions:
permissions[resource] = []
for method in methods:
if method not in permissions[resource]:
permissions[resource].append(method)
return permissions基于属性的访问控制(Attribute-Based Access Control,ABAC)是一种更灵活的访问控制模型,它基于访问者、资源、操作和环境的属性来决定是否授予访问权限。
代码示例:ABAC 实现
from typing import Dict, Any, Callable
class ABACAccessControl:
def __init__(self):
# 访问控制策略:{policy_id: (condition, effect)}
self.policies: Dict[str, tuple] = {}
def add_policy(self, policy_id: str, condition: Callable[[Dict[str, Any]], bool], effect: bool):
"""添加访问控制策略"""
self.policies[policy_id] = (condition, effect)
def remove_policy(self, policy_id: str):
"""移除访问控制策略"""
if policy_id in self.policies:
del self.policies[policy_id]
def check_access(self, context: Dict[str, Any]) -> bool:
"""检查访问是否允许"""
# 上下文包含访问者属性、资源属性、操作属性、环境属性
# 例如:{"accessor": {"id": "user1", "role": "admin"}, "resource": {"id": "resource1", "type": "data"}, "action": "read", "environment": {"time": "2026-01-01T12:00:00Z", "ip": "192.168.1.1"}}
# 默认拒绝访问
result = False
# 遍历所有策略,找到匹配的策略
for policy_id, (condition, effect) in self.policies.items():
if condition(context):
result = effect
# 最后匹配的策略生效
return result
def get_matching_policies(self, context: Dict[str, Any]) -> List[str]:
"""获取匹配的策略"""
matching_policies = []
for policy_id, (condition, effect) in self.policies.items():
if condition(context):
matching_policies.append(policy_id)
return matching_policies基于策略的访问控制(Policy-Based Access Control,PBAC)是一种更高级的访问控制模型,它使用结构化的策略语言来定义访问控制规则,支持更复杂、更灵活的访问控制。
代码示例:PBAC 策略语言
# 策略示例:允许管理员在工作时间访问所有资源
policy:
id: "admin_work_time_access"
name: "管理员工作时间访问权限"
description: "允许管理员在工作时间访问所有资源"
effect: "allow"
conditions:
- attribute: "accessor.role"
operator: "equals"
value: "admin"
- attribute: "environment.time"
operator: "between"
value: ["09:00", "18:00"]
resources: ["*"]
actions: ["*"]
# 策略示例:拒绝普通用户访问敏感数据
policy:
id: "deny_user_sensitive_data"
name: "拒绝普通用户访问敏感数据"
description: "拒绝普通用户访问敏感数据"
effect: "deny"
conditions:
- attribute: "accessor.role"
operator: "equals"
value: "user"
- attribute: "resource.type"
operator: "equals"
value: "sensitive_data"
resources: ["*"]
actions: ["*"]基于风险的访问控制(Risk-Based Access Control,RBAC)是一种动态访问控制模型,它根据访问请求的风险程度来决定是否授予访问权限。
代码示例:基于风险的访问控制实现
from typing import Dict, Any
import numpy as np
class RiskBasedAccessControl:
def __init__(self):
# 风险评估模型
self.risk_model = self._build_risk_model()
# 风险阈值
self.risk_threshold = 0.7
def _build_risk_model(self):
"""构建风险评估模型"""
# 这里使用简单的风险评估模型,实际应用中可以使用机器学习模型
def risk_model(context: Dict[str, Any]) -> float:
risk = 0.0
# 访问者风险因子
accessor = context.get("accessor", {})
if accessor.get("role") == "guest":
risk += 0.3
if accessor.get("login_failed_attempts", 0) > 3:
risk += 0.4
# 资源风险因子
resource = context.get("resource", {})
if resource.get("type") == "sensitive_data":
risk += 0.5
if resource.get("classification") == "top_secret":
risk += 0.5
# 操作风险因子
action = context.get("action", "")
if action in ["write", "delete", "execute"]:
risk += 0.3
# 环境风险因子
environment = context.get("environment", {})
if environment.get("ip") in ["10.0.0.1", "192.168.1.100"]:
risk += 0.2
if environment.get("time") < "06:00" or environment.get("time") > "22:00":
risk += 0.3
# 归一化风险值到 [0, 1] 范围
return min(1.0, max(0.0, risk))
return risk_model
def set_risk_threshold(self, threshold: float):
"""设置风险阈值"""
self.risk_threshold = threshold
def calculate_risk(self, context: Dict[str, Any]) -> float:
"""计算访问请求的风险值"""
return self.risk_model(context)
def check_access(self, context: Dict[str, Any]) -> bool:
"""检查访问是否允许"""
risk = self.calculate_risk(context)
return risk < self.risk_threshold
def get_risk_level(self, risk: float) -> str:
"""获取风险等级"""
if risk < 0.3:
return "low"
elif risk < 0.7:
return "medium"
else:
return "high"代码示例:资源注册与管理实现
from typing import Dict, List, Any
from .resource import Resource, DataResource, ComputeResource, NetworkResource, ServiceResource, ToolResource, ConfigurationResource
class ResourceManager:
def __init__(self):
# 资源注册表:{resource_uri: Resource}
self.resources: Dict[str, Resource] = {}
def register_resource(self, resource_definition: Dict[str, Any]) -> Resource:
"""注册资源"""
resource_type = resource_definition.get("type", "")
resource_uri = resource_definition.get("uri", "")
if not resource_uri:
raise ValueError("资源 URI 不能为空")
# 根据资源类型创建资源对象
if resource_type == "data":
resource = DataResource(**resource_definition)
elif resource_type == "compute":
resource = ComputeResource(**resource_definition)
elif resource_type == "network":
resource = NetworkResource(**resource_definition)
elif resource_type == "service":
resource = ServiceResource(**resource_definition)
elif resource_type == "tool":
resource = ToolResource(**resource_definition)
elif resource_type == "configuration":
resource = ConfigurationResource(**resource_definition)
else:
resource = Resource(**resource_definition)
# 将资源添加到注册表
self.resources[resource_uri] = resource
return resource
def unregister_resource(self, resource_uri: str):
"""注销资源"""
if resource_uri in self.resources:
del self.resources[resource_uri]
def get_resource(self, resource_uri: str) -> Resource:
"""获取资源"""
return self.resources.get(resource_uri)
def list_resources(self, resource_type: str = None) -> List[Resource]:
"""列出所有资源"""
if resource_type:
return [resource for resource in self.resources.values() if resource.type == resource_type]
else:
return list(self.resources.values())
def update_resource(self, resource_uri: str, updates: Dict[str, Any]):
"""更新资源信息"""
if resource_uri in self.resources:
resource = self.resources[resource_uri]
# 更新资源属性
for key, value in updates.items():
if hasattr(resource, key):
setattr(resource, key, value)
# 更新资源 URI
if "uri" in updates and updates["uri"] != resource_uri:
# 创建新 URI 的资源
new_uri = updates["uri"]
self.resources[new_uri] = resource
# 删除旧 URI 的资源
del self.resources[resource_uri]
def check_resource_access(self, accessor: Dict[str, Any], resource_uri: str, method: str) -> bool:
"""检查资源访问权限"""
resource = self.get_resource(resource_uri)
if not resource:
return False
# 检查资源是否支持该访问方法
if method not in resource.access_methods:
return False
# 调用资源的 checkAccess 方法检查访问权限
return resource.checkAccess(accessor, method)代码示例:资源访问监控与审计实现
from typing import Dict, List, Any
import datetime
class ResourceAccessLog:
def __init__(self, accessor: Dict[str, Any], resource_uri: str, method: str, success: bool, timestamp: datetime.datetime, ip: str, user_agent: str, risk_level: str = "medium"):
self.accessor = accessor
self.resource_uri = resource_uri
self.method = method
self.success = success
self.timestamp = timestamp
self.ip = ip
self.user_agent = user_agent
self.risk_level = risk_level
class ResourceAccessMonitor:
def __init__(self):
# 资源访问日志
self.access_logs: List[ResourceAccessLog] = []
# 资源使用统计
self.resource_usage: Dict[str, Dict[str, int]] = {}
def log_access(self, accessor: Dict[str, Any], resource_uri: str, method: str, success: bool, ip: str, user_agent: str, risk_level: str = "medium"):
"""记录资源访问日志"""
log = ResourceAccessLog(
accessor=accessor,
resource_uri=resource_uri,
method=method,
success=success,
timestamp=datetime.datetime.now(),
ip=ip,
user_agent=user_agent,
risk_level=risk_level
)
self.access_logs.append(log)
# 更新资源使用统计
if resource_uri not in self.resource_usage:
self.resource_usage[resource_uri] = {}
if method not in self.resource_usage[resource_uri]:
self.resource_usage[resource_uri][method] = 0
self.resource_usage[resource_uri][method] += 1
def get_access_logs(self, start_time: datetime.datetime = None, end_time: datetime.datetime = None, resource_uri: str = None, accessor_id: str = None) -> List[ResourceAccessLog]:
"""获取资源访问日志"""
logs = self.access_logs.copy()
# 按时间过滤
if start_time:
logs = [log for log in logs if log.timestamp >= start_time]
if end_time:
logs = [log for log in logs if log.timestamp <= end_time]
# 按资源 URI 过滤
if resource_uri:
logs = [log for log in logs if log.resource_uri == resource_uri]
# 按访问者 ID 过滤
if accessor_id:
logs = [log for log in logs if log.accessor.get("id") == accessor_id]
return logs
def get_resource_usage(self, resource_uri: str = None, method: str = None) -> Dict[str, Any]:
"""获取资源使用统计"""
usage = self.resource_usage.copy()
# 按资源 URI 过滤
if resource_uri:
if resource_uri in usage:
usage = {resource_uri: usage[resource_uri]}
else:
usage = {}
# 按访问方法过滤
if method:
filtered_usage = {}
for resource, methods in usage.items():
if method in methods:
filtered_usage[resource] = {method: methods[method]}
usage = filtered_usage
return usage
def get_top_resources(self, n: int = 10) -> List[Dict[str, Any]]:
"""获取使用最多的资源"""
# 计算每个资源的总访问次数
resource_total = {}
for resource, methods in self.resource_usage.items():
total = sum(methods.values())
resource_total[resource] = total
# 按总访问次数排序
sorted_resources = sorted(resource_total.items(), key=lambda x: x[1], reverse=True)
# 返回前 n 个资源
return [{
"resource_uri": resource,
"total_accesses": total
} for resource, total in sorted_resources[:n]]
def detect_anomalies(self) -> List[ResourceAccessLog]:
"""检测异常访问行为"""
# 简单的异常检测:连续失败的访问尝试
anomalies = []
# 按访问者 ID 分组日志
accessor_logs: Dict[str, List[ResourceAccessLog]] = {}
for log in self.access_logs:
accessor_id = log.accessor.get("id", "unknown")
if accessor_id not in accessor_logs:
accessor_logs[accessor_id] = []
accessor_logs[accessor_id].append(log)
# 检测连续失败的访问尝试
for accessor_id, logs in accessor_logs.items():
# 按时间排序
logs.sort(key=lambda x: x.timestamp)
# 检测连续失败
consecutive_failures = 0
for log in logs:
if not log.success:
consecutive_failures += 1
if consecutive_failures >= 5:
anomalies.append(log)
else:
consecutive_failures = 0
return anomalies代码示例:资源配额管理实现
from typing import Dict, Any
class ResourceQuota:
def __init__(self, resource_uri: str, quota_type: str, limit: int, used: int = 0, unit: str = "count"):
self.resource_uri = resource_uri
self.quota_type = quota_type # 例如:"read", "write", "execute", "total"
self.limit = limit
self.used = used
self.unit = unit
def check_quota(self, amount: int = 1) -> bool:
"""检查配额是否足够"""
return self.used + amount <= self.limit
def consume_quota(self, amount: int = 1) -> bool:
"""消耗配额"""
if self.check_quota(amount):
self.used += amount
return True
else:
return False
def release_quota(self, amount: int = 1):
"""释放配额"""
self.used = max(0, self.used - amount)
def reset_quota(self):
"""重置配额使用量"""
self.used = 0
def update_limit(self, new_limit: int):
"""更新配额限制"""
self.limit = new_limit
def get_usage_percentage(self) -> float:
"""获取配额使用率"""
if self.limit == 0:
return 0.0
return (self.used / self.limit) * 100
class ResourceQuotaManager:
def __init__(self):
# 资源配额映射:{resource_uri: {quota_type: ResourceQuota}}
self.quotas: Dict[str, Dict[str, ResourceQuota]] = {}
def set_quota(self, resource_uri: str, quota_type: str, limit: int, unit: str = "count"):
"""设置资源配额"""
if resource_uri not in self.quotas:
self.quotas[resource_uri] = {}
# 如果配额已存在,更新限制,否则创建新配额
if quota_type in self.quotas[resource_uri]:
self.quotas[resource_uri][quota_type].update_limit(limit)
self.quotas[resource_uri][quota_type].unit = unit
else:
self.quotas[resource_uri][quota_type] = ResourceQuota(resource_uri, quota_type, limit, 0, unit)
def remove_quota(self, resource_uri: str, quota_type: str = None):
"""移除资源配额"""
if resource_uri in self.quotas:
if quota_type:
# 移除指定类型的配额
if quota_type in self.quotas[resource_uri]:
del self.quotas[resource_uri][quota_type]
# 如果该资源没有配额了,移除该资源
if not self.quotas[resource_uri]:
del self.quotas[resource_uri]
else:
# 移除该资源的所有配额
del self.quotas[resource_uri]
def check_quota(self, resource_uri: str, quota_type: str, amount: int = 1) -> bool:
"""检查资源配额是否足够"""
if resource_uri in self.quotas and quota_type in self.quotas[resource_uri]:
return self.quotas[resource_uri][quota_type].check_quota(amount)
else:
# 没有设置配额,默认允许访问
return True
def consume_quota(self, resource_uri: str, quota_type: str, amount: int = 1) -> bool:
"""消耗资源配额"""
if resource_uri in self.quotas and quota_type in self.quotas[resource_uri]:
return self.quotas[resource_uri][quota_type].consume_quota(amount)
else:
# 没有设置配额,默认允许访问
return True
def release_quota(self, resource_uri: str, quota_type: str, amount: int = 1):
"""释放资源配额"""
if resource_uri in self.quotas and quota_type in self.quotas[resource_uri]:
self.quotas[resource_uri][quota_type].release_quota(amount)
def reset_quota(self, resource_uri: str, quota_type: str = None):
"""重置资源配额使用量"""
if resource_uri in self.quotas:
if quota_type:
# 重置指定类型的配额
if quota_type in self.quotas[resource_uri]:
self.quotas[resource_uri][quota_type].reset_quota()
else:
# 重置该资源的所有配额
for quota in self.quotas[resource_uri].values():
quota.reset_quota()
def get_quota_status(self, resource_uri: str, quota_type: str = None) -> Dict[str, Any]:
"""获取资源配额状态"""
status = {}
if resource_uri in self.quotas:
if quota_type:
# 获取指定类型的配额状态
if quota_type in self.quotas[resource_uri]:
quota = self.quotas[resource_uri][quota_type]
status[quota_type] = {
"limit": quota.limit,
"used": quota.used,
"unit": quota.unit,
"usage_percentage": quota.get_usage_percentage()
}
else:
# 获取该资源的所有配额状态
for qtype, quota in self.quotas[resource_uri].items():
status[qtype] = {
"limit": quota.limit,
"used": quota.used,
"unit": quota.unit,
"usage_percentage": quota.get_usage_percentage()
}
return status
def get_overquota_resources(self) -> List[Dict[str, Any]]:
"""获取配额超标的资源"""
overquota_resources = []
for resource_uri, quotas in self.quotas.items():
for quota_type, quota in quotas.items():
if quota.get_usage_percentage() >= 100:
overquota_resources.append({
"resource_uri": resource_uri,
"quota_type": quota_type,
"limit": quota.limit,
"used": quota.used,
"usage_percentage": quota.get_usage_percentage()
})
return overquota_resources资源隔离是 MCP Server 资源访问管理的重要组成部分,它确保不同用户、不同模型之间的资源相互隔离,防止相互干扰和资源滥用。
进程隔离是最基本的资源隔离机制,它将不同用户、不同模型的工具调用运行在不同的进程中,防止一个工具调用影响其他工具调用。
代码示例:进程隔离实现
import os
import subprocess
import sys
from typing import Any, Dict
class ProcessIsolation:
def __init__(self):
pass
def execute_in_isolation(self, code: str, params: Dict[str, Any]) -> Any:
"""在隔离进程中执行代码"""
# 创建临时 Python 脚本文件
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(f"""
import json
import sys
# 执行代码
params = {params_json}
result = None
try:
# 执行用户提供的代码
{code}
# 将结果转换为 JSON 格式输出
print(json.dumps({{"success": True, "result": result}}))
except Exception as e:
# 捕获异常并输出错误信息
print(json.dumps({{"success": False, "error": str(e)}}))
"""".format(
params_json=json.dumps(params),
code=code
))
script_path = f.name
try:
# 在新进程中执行脚本
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
timeout=30 # 设置超时时间,防止无限循环
)
# 解析执行结果
output = result.stdout.strip()
if output:
return json.loads(output)
else:
return {"success": False, "error": "No output from process"}
finally:
# 删除临时脚本文件
os.unlink(script_path)容器隔离是一种更高级的资源隔离机制,它使用 Docker 等容器技术将工具调用运行在隔离的容器环境中,提供更严格的资源隔离和安全保障。
代码示例:容器隔离实现
import docker
import json
from typing import Any, Dict
class ContainerIsolation:
def __init__(self, image: str = "python:3.11-slim"):
self.client = docker.from_env()
self.image = image
def execute_in_container(self, code: str, params: Dict[str, Any]) -> Any:
"""在隔离容器中执行代码"""
# 创建临时 Python 脚本内容
script_content = f"""
import json
import sys
# 执行代码
params = {params_json}
result = None
try:
# 执行用户提供的代码
{code}
# 将结果转换为 JSON 格式输出
print(json.dumps({{"success": True, "result": result}}))
except Exception as e:
# 捕获异常并输出错误信息
print(json.dumps({{"success": False, "error": str(e)}}))
"""".format(
params_json=json.dumps(params),
code=code
)
# 创建临时文件
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(script_content)
script_path = f.name
try:
# 运行容器并执行脚本
container = self.client.containers.run(
self.image,
command=["python", "-c", script_content],
detach=False,
remove=True,
mem_limit="128m", # 设置内存限制
cpu_period=100000,
cpu_quota=50000, # 设置 CPU 限制为 0.5 核
network_mode="none" # 禁用网络,增强安全性
)
# 解析执行结果
output = container.decode('utf-8').strip()
if output:
return json.loads(output)
else:
return {"success": False, "error": "No output from container"}
except docker.errors.DockerException as e:
return {"success": False, "error": str(e)}
finally:
# 删除临时脚本文件
import os
os.unlink(script_path)网络隔离是一种重要的资源隔离机制,它限制工具调用的网络访问权限,防止恶意工具调用访问敏感网络资源。
代码示例:网络隔离实现
import socket
import subprocess
from typing import List, Dict, Any
class NetworkIsolation:
def __init__(self, allowed_hosts: List[str] = None, allowed_ports: List[int] = None):
self.allowed_hosts = allowed_hosts or []
self.allowed_ports = allowed_ports or []
def set_allowed_hosts(self, hosts: List[str]):
"""设置允许访问的主机"""
self.allowed_hosts = hosts
def set_allowed_ports(self, ports: List[int]):
"""设置允许访问的端口"""
self.allowed_ports = ports
def check_network_access(self, host: str, port: int) -> bool:
"""检查网络访问是否允许"""
# 检查主机是否允许
if self.allowed_hosts and host not in self.allowed_hosts:
return False
# 检查端口是否允许
if self.allowed_ports and port not in self.allowed_ports:
return False
return True
def block_network_access(self):
"""阻止所有网络访问"""
# 使用 iptables 阻止所有网络访问
try:
subprocess.run(["iptables", "-A", "OUTPUT", "-j", "DROP"], check=True)
except subprocess.CalledProcessError:
pass
def allow_network_access(self, host: str, port: int):
"""允许特定主机和端口的网络访问"""
try:
# 允许访问指定主机和端口
subprocess.run(["iptables", "-I", "OUTPUT", "-d", host, "-p", "tcp", "--dport", str(port), "-j", "ACCEPT"], check=True)
except subprocess.CalledProcessError:
pass特性 | MCP v2.0 | OpenAI Tools |
|---|---|---|
资源定义 | 统一的资源定义标准,支持多种资源类型 | 简单的工具定义,缺乏统一的资源模型 |
访问控制模型 | 支持多种访问控制模型(RBAC、ABAC、PBAC、RBAC) | 基本的 API 密钥认证,缺乏细粒度访问控制 |
权限管理 | 细粒度的权限管理,支持动态授权和权限回收 | 基本的权限管理,缺乏动态授权机制 |
资源监控 | 完善的资源访问监控和审计日志 | 基本的使用统计,缺乏详细的审计日志 |
资源隔离 | 支持进程隔离、容器隔离和网络隔离 | 基本的资源限制,缺乏完善的隔离机制 |
配额管理 | 动态资源配额管理,支持根据负载调整配额 | 基本的速率限制,缺乏细粒度配额管理 |
安全审计 | 支持安全审计和异常检测 | 基本的日志记录,缺乏安全审计机制 |
零信任支持 | 完全支持零信任安全模型 | 有限的零信任支持 |
特性 | RBAC | ABAC | PBAC |
|---|---|---|---|
灵活性 | 低,基于预定义角色 | 中,基于属性 | 高,基于结构化策略 |
复杂度 | 低,易于理解和管理 | 中,需要定义属性和条件 | 高,需要编写策略语言 |
扩展性 | 低,新增角色需要重新配置权限 | 高,支持动态属性和条件 | 高,支持复杂的策略组合 |
性能 | 高,基于简单的角色-权限映射 | 中,需要评估属性条件 | 低,需要解析和评估策略 |
适用场景 | 组织结构稳定,权限划分明确的场景 | 组织结构动态,需要灵活权限管理的场景 | 复杂业务规则,需要高级访问控制的场景 |
实现难度 | 低,易于实现和维护 | 中,需要定义属性模型和条件评估 | 高,需要实现策略语言解析和评估引擎 |
隔离机制 | 隔离级别 | 性能开销 | 实现复杂度 | 安全性 | 适用场景 |
|---|---|---|---|---|---|
进程隔离 | 中 | 低 | 低 | 中 | 简单工具调用,低安全要求 |
容器隔离 | 高 | 中 | 中 | 高 | 复杂工具调用,中等安全要求 |
虚拟机隔离 | 最高 | 高 | 高 | 最高 | 高风险工具调用,高安全要求 |
网络隔离 | 低 | 低 | 中 | 低 | 网络敏感工具调用,防止网络攻击 |
文件系统隔离 | 中 | 低 | 中 | 中 | 文件操作敏感工具调用,防止文件系统攻击 |
请求:
GET /api/v1/resources
Authorization: Bearer <api-key>
Content-Type: application/json响应:
200 OK
Content-Type: application/json
[
{
"name": "user_data",
"type": "data",
"description": "用户数据资源",
"uri": "data://user_data",
"access_methods": ["read", "write", "delete"],
"permission_model": "rbac",
"properties": {
"data_type": "json",
"storage_location": "database",
"encryption": true
},
"metadata": {
"owner": "admin",
"created_at": "2026-01-01T00:00:00Z",
"updated_at": "2026-01-01T00:00:00Z",
"tags": ["user", "data"]
},
"quota": {
"read": 1000,
"write": 100,
"delete": 10
}
}
]请求:
POST /api/v1/resources/{resource_uri}/quotas
Authorization: Bearer <api-key>
Content-Type: application/json
{
"quota_type": "read",
"limit": 1000,
"unit": "count"
}响应:
200 OK
Content-Type: application/json
{
"message": "Quota set successfully",
"resource_uri": "data://user_data",
"quota": {
"quota_type": "read",
"limit": 1000,
"unit": "count",
"used": 0
}
}请求:
GET /api/v1/resources/{resource_uri}/logs
Authorization: Bearer <api-key>
Content-Type: application/json
{
"start_time": "2026-01-01T00:00:00Z",
"end_time": "2026-01-02T00:00:00Z",
"accessor_id": "user1"
}响应:
200 OK
Content-Type: application/json
[
{
"accessor": {
"id": "user1",
"role": "user"
},
"resource_uri": "data://user_data",
"method": "read",
"success": true,
"timestamp": "2026-01-01T12:00:00Z",
"ip": "192.168.1.1",
"user_agent": "MCP Client/2.0",
"risk_level": "low"
}
]问题:合法用户的资源访问请求被拒绝。
解决方案:
问题:资源访问日志丢失或不完整。
解决方案:
问题:资源配额设置不合理,导致资源浪费或资源不足。
解决方案:
参考链接:
附录(Appendix):
关键词: MCP Server, 资源访问管理, 访问控制, 权限管理, 资源隔离, 配额管理, 安全审计, 零信任