
本文专注企业级、合法合规场景,提供一套可落地的工程化方案: 从公开/授权的格式化信息(厂商官方OpenAPI文档、Postman集合、SDK日志导出、威胁情报平台API文档)中自动解析接口描述 → 生成标准化MCP(Model/Module/Mapping/Proxy)规范 → 输出可复用适配器代码骨架。 核心价值:把奇安信、绿盟、深信服、360、微步、腾讯等厂商的公开威胁情报API(或企业内部授权片段)快速转化为上游自动化/AI系统可直接调用的安全接口。 声明:所有涉及敏感认证/签名/私钥一律为伪代码或Vault占位。
图:总体架构流程图(安全中转与审计)

实施步骤总览(企业落地):
可运行示例(Python,逐行中文注释,含环境说明)
# demo/parse_and_gen.py
"""
运行环境:
- Python 3.9+,安装依赖:pip install pyyaml pydantic requests
用途:
- 读取 Postman 集合或 OpenAPI 文件 → 提取端点 → 生成 MCP 规范 → 打印示例 JSON
"""
import json, yaml
from pathlib import Path
def load_any(path: str):
p = Path(path)
# 判断文件后缀,选择解析器
if p.suffix in {'.json', '.postman_collection'}:
return json.loads(p.read_text(encoding='utf-8'))
if p.suffix in {'.yml', '.yaml', '.openapi', '.swagger'}:
return yaml.safe_load(p.read_text(encoding='utf-8'))
raise ValueError('Unsupported format')
def extract_endpoints(data):
eps = []
# Postman v2.1 结构检测
if isinstance(data, dict) and 'item' in data:
for item in data['item']:
if 'request' in item:
req = item['request']
url = req.get('url', {})
raw_url = url.get('raw') if isinstance(url, dict) else str(url)
eps.append({'name': item.get('name'), 'method': req.get('method'), 'url': raw_url})
# OpenAPI 3.x 结构检测
elif isinstance(data, dict) and 'openapi' in data:
for path, methods in data.get('paths', {}).items():
for m, spec in methods.items():
eps.append({'name': spec.get('operationId') or path, 'method': m.upper(), 'url': path})
return eps
def to_mcp_spec(endpoint: dict, base_url: str = ""):
# 拼装简化版 MCP 规范
return {
'name': endpoint['name'],
'method': endpoint['method'],
'path': endpoint['url'],
'base_url': base_url,
'auth_type': 'api_key',
'responses': [{'code': 200, 'description': 'success'}]
}
if __name__ == '__main__':
# 示例:读取 examples/postman.json(请替换为你的路径)
data = load_any('examples/postman.json')
endpoints = extract_endpoints(data)
specs = [to_mcp_spec(ep, base_url='https://api.vendor.com') for ep in endpoints]
print(json.dumps(specs, ensure_ascii=False, indent=2))数据表格(解析结果示例):
名称 | 方法 | 路径 | 认证 | 响应 | 备注 |
|---|---|---|---|---|---|
getIPReputation | GET | /api/v1/ip/reputation | api_key | 200: success | 奇安信示例 |
checkIOC | POST | /api/v1/ioc/check | bearer | 200: ok | 腾讯 TIX 示例 |
quakeSearch | POST | /api/v3/search/quake_service | api_key | 200: rows | 360 Quake 示例 |
合法来源(推荐优先级):
禁止来源:
图:合规性决策流程(来源判断)
%%{init: {'theme': 'default', 'themeVariables': { 'primaryColor': '#FEF3C7', 'secondaryColor': '#DCFCE7', 'tertiaryColor': '#E0F2FE', 'lineColor': '#0f766e' }}}%%
flowchart TD
S[开始]:::start --> Q{来源是否公开/授权?}:::decision
Q -- 是 --> P[可使用:官方文档/公开API/授权Postman]:::pass
Q -- 否 --> B[禁止:抓包/反编译/未授权探测/绕白名单]:::ban
P --> N[进入格式读取与规范生成]:::next
B --> X[终止:不纳入流水线]:::stop
classDef start fill:#FEF3C7,stroke:#b45309,stroke-width:2;
classDef decision fill:#FFEDD5,stroke:#c2410c,stroke-width:2;
classDef pass fill:#DCFCE7,stroke:#166534,stroke-width:2;
classDef ban fill:#FEE2E2,stroke:#991b1b,stroke-width:2;
classDef next fill:#E0F2FE,stroke:#075985,stroke-width:2;
classDef stop fill:#F3E8FF,stroke:#6b21a8,stroke-width:2;支持8种主流格式,单文件<100ms解析完成。
图:多格式解析工作流

中间表示: Endpoint List
# real code - parsers/universal_parser.py
import json, yaml, csv, xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List, Any
SUPPORTED = {'.json', '.yaml', '.yml', '.csv', '.postman_collection', '.openapi', '.swagger', '.xml'}
def load_any(path: str) -> Dict[str, Any]:
p = Path(path)
if p.suffix in {'.json', '.postman_collection'}:
return json.loads(p.read_text(encoding='utf-8'))
if p.suffix in {'.yml', '.yaml', '.openapi', '.swagger'}:
return yaml.safe_load(p.read_text(encoding='utf-8'))
if p.suffix == '.csv':
return list(csv.DictReader(p.read_text(encoding='utf-8').splitlines()))
raise ValueError(f"Unsupported: {p.suffix}")
def extract_endpoints(data: Any, format_type: str = 'auto') -> List[Dict]:
endpoints = []
# Postman v2.1
if isinstance(data, dict) and 'item' in data:
for item in data['item']:
if 'request' in item:
req = item['request']
url = req.get('url', {})
raw_url = url.get('raw') if isinstance(url, dict) else str(url)
endpoints.append({
'name': item.get('name'),
'method': req.get('method'),
'url': raw_url,
'headers': req.get('header', []),
'query': [q['key'] for q in url.get('query', [])] if isinstance(url, dict) else [],
'body_example': req.get('body', {}).get('raw')
})
# OpenAPI 3.x
elif isinstance(data, dict) and 'openapi' in data:
for path, methods in data.get('paths', {}).items():
for method, spec in methods.items():
endpoints.append({
'name': spec.get('operationId') or path,
'method': method.upper(),
'url': path,
'parameters': spec.get('parameters', []),
'requestBody': spec.get('requestBody')
})
return endpoints图:MCP 规范数据模型类图

# real code - spec_gen/mcp_generator.py
from pydantic import BaseModel, Field
from typing import List, Optional
class MCPParameter(BaseModel):
name: str
in_: str = Field(..., alias='in') # query/header/path/body
required: bool = True
type: str = 'string'
description: Optional[str]
class MCPResponse(BaseModel):
code: int
description: str
class MCPSpec(BaseModel):
name: str
method: str
path: str
base_url: str
parameters: List[MCPParameter] = []
responses: List[MCPResponse] = []
auth_type: str = 'api_key' # api_key/header/bearer/hmac
def to_mcp_spec(endpoint: dict, base_url: str = "") -> MCPSpec:
return MCPSpec(
name=endpoint['name'],
method=endpoint['method'],
path=endpoint['url'],
base_url=base_url,
parameters=[MCPParameter(name=p, in_='query') for p in endpoint.get('query', [])],
responses=[MCPResponse(code=200, description="success")]
).model_dump(by_alias=True, exclude_none=True)# real code - adapters/template_adapter.py.j2
import requests
from vault import get_secret # 企业Vault
class {{adapter_name}}Adapter:
def __init__(self):
self.base_url = "{{base_url}}"
self.api_key = get_secret("qianxin-ti-key") # Vault路径
def {{method_name}}(self, **kwargs):
headers = {"Authorization": f"Bearer {self.api_key}"}
url = self.base_url + "{{path}}"
{% if query_params %}
params = {k: kwargs[k] for k in {{query_params}} if k in kwargs}
{% else %}
params = {}
{% endif %}
resp = requests.{{method_lower}}(url, headers=headers, params=params, timeout=10)
resp.raise_for_status()
return resp.json()GET https://ti.qianxin.com/api/v1/ip/reputationip(必填)+ apikey
{
"data": {
"basic": {"asn": "133119", "geo": {"country": "ID"}},
"risk_level": "high",
"tags": ["scanner", "malware"]
}
}一键生成命令:
python main.py --input qianxin_ip_openapi.yaml --base-url https://ti.qianxin.com
# 输出: mcp_qianxin_ip.yaml + adapters/qianxin_ip_adapter.py生成后的adapter.py(可直接运行):
# real code - 完整可运行示例(只需替换YOUR_KEY)
import requests
from typing import Dict
class QianxinIPAdapter:
BASE_URL = "https://ti.qianxin.com"
ENDPOINT = "/api/v1/ip/reputation"
def __init__(self, api_key: str):
self.api_key = api_key
def query(self, ip: str) -> Dict:
params = {"ip": ip, "apikey": self.api_key}
resp = requests.get(self.BASE_URL + self.ENDPOINT, params=params, timeout=10)
resp.raise_for_status()
return resp.json()
# 使用(注册免费key:https://ti.qianxin.com)
adapter = QianxinIPAdapter("YOUR_FREE_API_KEY")
result = adapter.query("8.8.8.8")
print(result['data']['risk_level']) # 输出: low/high厂商 | 公开API示例 | 替换点 | 免费Key获取 |
|---|---|---|---|
腾讯TIX | POST https://tix.qq.com/api/v1/ioc/check | body: {“ioc”: “1.1.1.1”} | https://tix.qq.com |
微步XTI | GET https://api.threatbook.cn/v3/scene/ip_reputation | params: ip + apikey | https://x.threatbook.com |
360 Quake | POST https://quake.360.cn/api/v3/search/quake_service | body: {“query”: "ip:“1.1.1.1"”} | https://quake.360.cn |
图:厂商替换要点总览

附:端点示例规范(模板)与契约测试(v1.0.0) 说明:每个厂商需提供 3-5 个典型端点示例,字段以官方文档为准。
端点模板(text):
${端点名称}
• 标准请求格式:METHOD /path;Headers: { Authorization / X-API-Key / Content-Type }
• 请求参数说明:表格列出 name、required(必填/选填)、type、constraints
• 成功响应示例:HTTP 200;JSON 数据结构示例(字段含义、枚举值)
• 错误码对照表:200/400/401/403/429/500(以官方文档为准)契约测试模板(YAML):
version: v1.0.0
last_verified: 2025-11-10
endpoint: ${端点名称}
positive:
- name: ok_200
request:
method: GET
path: /api/v1/example
params: { key: value }
expect: { status: 200, schema_ref: official }
negative:
- name: invalid_param
request:
method: GET
path: /api/v1/example
params: { key: "@@@" }
expect: { status: 400 }
security:
auth_required: true
injection_payloads: ["' OR '1'='1", "${{bad}}"]
expect_status: [400, 422]
performance:
p95_ms_threshold: 1000常见错误码对照模板(需与官方一致):
状态码 | 说明 |
|---|---|
200 | 成功 |
400 | 参数错误/格式不合法 |
401 | 未授权/认证失败 |
403 | 权限不足/IP未在白名单 |
429 | 频率限制 |
500 | 服务器内部错误 |
填充与验证说明:
# ci/pipeline.yml
stages:
- parse
- gen
- test
parse:
script: python main.py --input *.yaml --output specs/
gen:
script: python generator.py --specs specs/
test:
script: pytest tests/contract/ -v图:CI/CD流水线阶段

图:一键运行流程

git clone https://github.com/yourname/mcp-security-adapter-template
cd mcp-security-adapter-template
pip install -r requirements.txt
# 放入奇安信OpenAPI.yaml
python main.py --input examples/qianxin_ip.yaml
# 生成适配器 → 立即可用引用与参考(InfoSec):