本文面向从零到一构建“可控、可复现”的 MCP(Model Context Protocol)渗透测试体系,聚焦工程化落地与安全边界。包含环境一键初始化、最小服务器模板、Nmap/Nuclei 安全封装、工作流编排、人机回环审批、远程传输与认证样例、性能基准、模板治理(CI)、威胁模型与红队测试。
目录
- 环境一键初始化
- MCP 起源与知识点速览
- MCP 在渗透测试中的用途与价值
- MCP 最小服务器模板
- Nmap 接入(白名单 + XML 解析 + 基准)
- Nuclei 接入(模板白名单 + JSONL 解析 + 治理)
- 更多工具接入(只读与最小权限版)
- 远程传输与认证(SSE/WebSocket + Bearer/mTLS)
- 性能基准:Nmap 与 Nucle
- 模板治理与供应链安全(CI 示例)
- 威胁模型与红队测试(受控演练)
- 端到端示例仓库骨架(可复现)
环境一键初始化
#!/bin/bash
set -e
PROJECT_ROOT="$HOME/mcp-pentest-lab"
mkdir -p "$PROJECT_ROOT"/{servers,tools,workflows,configs,logs}
cd "$PROJECT_ROOT"
# 1) Python 环境与核心依赖
python3.12 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install "mcp[cli]>=1.0.0" pydantic httpx bandit kubernetes dnspython
# 2) 安装核心工具 (Nmap & Nuclei)
if ! command -v nmap &>/dev/null; then
sudo apt-get update && sudo apt-get install -y nmap
fi
if ! command -v nuclei &>/dev/null; then
wget -qO nuclei.zip <https://github.com/projectdiscovery/nuclei/releases/latest/download/nuclei_linux_amd64.zip>
unzip -o nuclei.zip && sudo mv nuclei /usr/local/bin/
rm nuclei.zip
nuclei -update-templates
fi
# 3) 验证安装
mcp --version
nmap --version | head -n 1
nuclei -version
MCP 起源与知识点速览(2024—2025)
- 起源与捐赠:Anthropic 于 2024 提出 MCP,后捐赠至 Linux Foundation 的 Agentic AI Foundation,形成中立治理(与 Block 的 goose、OpenAI 的 AGENTS.md 并列的 Agent 基础设施)。
- 时间线(2024→2025):提出/开源 → 基金会托管 → SDK 多语言扩展(Python/Java/Swift/C#)→ IDE/桌面端支持(VS Code、Claude Desktop)→ 传输从 stdio 扩展至 SSE/WebSocket。
- 协议设计:基于 JSON-RPC 2.0;能力握手(capabilities);工具(tools)、资源(resources)、提示(prompts);采样/审批(sampling);日志/追踪(logging/tracing);Schema 与参数校验。
- 生态角色:Host/Client(宿主,如 IDE/Claude Desktop)、Server(能力提供者)、Tools/Resources(能力载体)。
- 传输方式:本地 stdio;远程 SSE/WebSocket(建议配合 mTLS 或 Bearer)。
MCP 在渗透测试中的用途与价值
- 攻防统一入口:将 Nmap、Nuclei、DNS/TLS、OSINT、CVE/NVD、K8s/Docker 等工具统一在受控接口下,减少脚本胶水与权限散落。
- 工作流编排:把“资产发现→指纹识别→漏洞验证→告警联动”封装为预定义工作流工具,LLM 仅触发工作流而非逐步拼接,降低逻辑劫持风险。
- 审计与合规:Logging/Sampling 原生支持;记录 Tool 调用链、参数与返回摘要;配合限速与审批门满足红队/蓝队留痕与合规要求。
- 自动化与复现:声明式 Schema + 场景模板(
tech_detect/token_leak/cve_critical),扫描可复现、可对比;结合缓存与版本钉死便于基线评估。 - 降低集成成本:一次开发的 MCP Server 可在多宿主(Claude Desktop、VS Code)复用,促进团队协作。
- 新攻击面验证:在受控沙箱安全复现 Prompt Injection、SSRF、过度权限、供应链包投毒、服务器伪装等风险,并用最小权限/白名单/输出净化防护。
MCP 最小服务器模板
import os, json, logging
from mcp.server.fastmcp import FastMCP, Context
from pydantic import Field, BaseModel
logging.basicConfig(
format='{"time":"%(asctime)s","level":"%(levelname)s","msg":"%(message)s"}',
level=logging.INFO
)
logger = logging.getLogger(__name__)
mcp = FastMCP("SecureTemplate")
SANDBOX_ROOT = os.path.abspath("./safe_data")
def validate_path(filename: str) -> str:
target = os.path.abspath(os.path.join(SANDBOX_ROOT, filename))
if not target.startswith(SANDBOX_ROOT):
logger.warning(f"Security Alert: Path traversal attempt: {filename}")
raise ValueError("Access Denied: Path outside sandbox")
return target
class ReadFileParams(BaseModel):
filename: str = Field(..., pattern=r"^[a-zA-Z0-9_\-\.]+$", max_length=64,
description="目标文件名,禁止路径分隔符")
@mcp.tool()
def read_secure_file(ctx: Context, params: ReadFileParams) -> str:
try:
path = validate_path(params.filename)
if not os.path.exists(path):
return json.dumps({"code": 404, "error": "File not found"})
if os.path.getsize(path) > 1024 * 1024:
return json.dumps({"code": 413, "error": "File too large"})
with open(path, "r") as f:
return f.read()
except Exception as e:
logger.error(f"System Error: {str(e)}")
return json.dumps({"code": 500, "error": "Internal Error"})
if __name__ == "__main__":
mcp.run()
Nmap 接入(白名单 + XML 解析 + 基准)
- 安全边界:CIDR 白名单、令牌桶限速、禁止
-p-全端口(除非特许)、强制-oX -输出 XML。 - 参数矩阵(建议):
-sS(SYN 扫描)、-sV(版本探测按需)、-T3/T4(时序)、-n(禁 DNS 反解析)。
示例 MCP 工具:
import time, subprocess, ipaddress, xml.etree.ElementTree as ET
from mcp.server.fastmcp import FastMCP
from pydantic import Field
mcp = FastMCP("NmapScanner")
ALLOWED_CIDR = [ipaddress.ip_network("192.168.1.0/24"), ipaddress.ip_network("10.0.0.0/8")]
LAST_SCAN = 0
def check_target(ip: str) -> bool:
try:
addr = ipaddress.ip_address(ip)
return any(addr in net for net in ALLOWED_CIDR)
except ValueError:
return False
def parse_nmap_xml(xml_str: str) -> list:
try:
root = ET.fromstring(xml_str)
hosts = []
for host in root.findall("host"):
addr = host.find("address").get("addr")
ports = []
for port in host.findall(".//port"):
if port.find("state").get("state") == "open":
svc = port.find("service")
ports.append({
"port": int(port.get("portid")),
"service": svc.get("name", "unknown") if svc else "unknown",
"product": svc.get("product", "") if svc else ""
})
if ports:
hosts.append({"ip": addr, "open_ports": ports})
return hosts
except ET.ParseError:
return [{"error": "XML Parse Failed"}]
@mcp.tool()
def scan_network(target_ip: str = Field(..., description="Target IP")) -> str:
global LAST_SCAN
if time.time() - LAST_SCAN < 2.0:
return "Error: Rate limit exceeded"
if not check_target(target_ip):
return "Error: Target not allowed"
cmd = ["nmap", "-sS", "-sV", "-T3", "-n", "-oX", "-", target_ip]
try:
LAST_SCAN = time.time()
res = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
return str(parse_nmap_xml(res.stdout))
except subprocess.TimeoutExpired:
return "Error: Scan Timeout"
Nuclei 接入(模板白名单 + JSONL 解析 + 治理)
- 安全边界:仅允许审核后的模板集合(
safe/discovery),场景到模板映射;限速(-rl)、并发(-bs)与超时;证据截断与脱敏。
示例 MCP 工具:
import subprocess, json
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("NucleiScanner")
SCENARIOS = {
"tech_detect": ["http/technologies"],
"misconfig": ["http/misconfiguration"],
"cve_critical": ["cves/2023", "cves/2024"], # 慎用
}
def truncate_evidence(data: dict, length: int = 100) -> dict:
if "extracted-results" in data:
data["extracted-results"] = [str(x)[:length] + "..." for x in data["extracted-results"]]
return data
@mcp.tool()
def run_vuln_scan(target: str, scenario: str) -> str:
if scenario not in SCENARIOS:
return "Error: Invalid scenario"
templates = []
for t in SCENARIOS[scenario]:
templates.extend(["-t", t])
cmd = ["nuclei", "-u", target, "-jsonl", "-silent", "-rl", "50"] + templates
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
findings = []
for line in res.stdout.splitlines():
try:
j = json.loads(line)
clean = {
"id": j.get("template-id"),
"name": j.get("info", {}).get("name"),
"severity": j.get("info", {}).get("severity"),
"evidence": truncate_evidence(j).get("extracted-results", [])
}
findings.append(clean)
except: pass
return json.dumps(findings, indent=2)
except Exception as e:
return f"Error: {str(e)}"
更多工具接入、
- Docker/K8s:只读 RBAC/IAM;返回结构化元信息;禁止系统命名空间访问。
- CVE/NVD:带缓存与速率限制,返回精简字段。
- DNS/TLS:限定记录类型与超时;证书到期与 SAN 列表(限长度)。
- OSINT:必须授权与查询白名单;字段最小化;日志脱敏。
- SIEM/Webhook:只写入脱敏摘要;重试与回退;去重与阈值。
远程传输与认证(SSE/WebSocket + Bearer/mTLS)
- 网关层(Nginx/Envoy):mTLS 双向认证;SSE/WebSocket 代理与缓冲关闭;
- 应用层:Bearer Token 中间件;健康检查白名单;
- 证书轮换:定期更新与过期策略,避免长寿命密钥。
Nginx 片段:
server {
listen 443 ssl http2;
server_name mcp.internal.corp;
ssl_certificate /etc/certs/server.crt;
ssl_certificate_key /etc/certs/server.key;
ssl_client_certificate /etc/certs/ca.crt;
ssl_verify_client on;
location /sse {
proxy_pass <http://localhost:8000>
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
}
}
性能基准:Nmap 与 Nuclei(示例对比表)
示例靶场:2C4G/1000Mbps 内网;重复≥5 次取平均。
| 扫描器 | 配置 | 耗时 | 命中率 | 误报率 | 建议 |
|---|---|---|---|---|---|
| Nmap | -T3 -sS Top1000 |
15s | 100% | – | 巡检默认 |
| Nmap | -T4 -sS Top1000 |
4s | 98% | – | 应急摸排 |
| Nmap | -T3 -sV Top100 |
45s | 100% | – | 版本确认 |
| Nuclei | -rl 25 -bs 1 |
120s | 100% | 0% | 低并发低误报 |
| Nuclei | -rl 50 -bs 2 |
55s | 100% | <1% | 推荐生产 |
| Nuclei | -rl 150 -bs 4 |
18s | 80% | 5% | 高并发慎用 |
注:高并发命中率下降通常源于 WAF 拦截或服务侧丢包,建议生产锁定在
-rl 50 -bs 2。
模板治理与供应链安全(CI 示例)
- 私有模板库:同步官方
safe/discovery类;企业自定义模板入库。 - 签名与锁定:模板文件 Hash 进
manifest.lock;运行时拒绝未签名变更。 - CI 安审:
nuclei -validate+bandit/pip-audit;依赖 CVE 映射; - 变更评审:双人审核;变更记录与回滚策略。
GitHub Actions 片段:
name: Template Governance
on: [push, pull_request]
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Nuclei
run: go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest
- name: Validate Templates
run: nuclei -validate -t configs/templates/
- name: Check Hash
run: python3 scripts/verify_hash.py
威胁模型与红队测试
风险→防御对照(示例):
- Prompt Injection:拒绝
shell=True;严格正则与枚举白名单;结构化日志溯源; - SSRF:CIDR/域白名单;容器网络禁止访问
169.254.169.254; - Excessive Privilege:禁止
--script等高危参数;非 root 用户运行; - 供应链包投毒:版本钉死与哈希校验;CI 安审;
- 服务器伪装:mTLS 双向认证;来源校验与网关白名单。
红队演练时间线:准备(靶场与日志)→ 攻击链验证(诱导/绕过)→ 防御有效性检查(拦截与审批门)→ 复盘(留痕与修复)。
端到端示例仓库骨架
mcp-pentest-lab/
├── servers/
│ ├── nmap_server.py
│ ├── nuclei_server.py
│ └── template_server.py
├── tools/
│ ├── auth.py
│ └── limits.py
├── workflows/
│ ├── wf_asset_scan.yaml
│ ├── wf_vuln_verify.yaml
│ └── wf_alert.json
├── configs/
│ ├── whitelist.cidr
│ ├── templates/
│ └── rate_limits.json
├── scripts/
│ ├── init_env.sh
│ ├── bench_nmap.sh
│ └── bench_nuclei.sh
├── Makefile
└── README.md
Makefile 片段:
.PHONY: init bench audit
init:
@bash scripts/init_env.sh
run-servers:
@mcp run servers/nmap_server.py &
@mcp run servers/nuclei_server.py &
bench:
@bash scripts/bench_nmap.sh
@bash scripts/bench_nuclei.sh
audit:
@bandit -r servers/ -ll
@pip-audit
Nuclei 测试用例
注意:仅在自有或书面授权的靶场中执行。启用模板白名单(safe/discovery)与场景映射;统一使用
-jsonl -silent输出,证据字段需截断与脱敏。
用例A:敏感信息泄露检测(token_leak 场景)
- 目标:
<http://192.168.10.55>(内网受控 Web 容器) - 模板集合:
http/exposures/tokens,http/exposures/configs,http/exposures/git-config(已审计) - 命令(MCP 触发):
mcp call NucleiScanner run_vuln_scan '{"target":"<http://192.168.10.55>","scenario":"token_leak"}' - 预期 JSONL/聚合输出(示例):
[ { "id": "http-exposures-env", "name": "Exposed .env file", "severity": "high", "evidence": ["API_KEY=*****..."] }, { "id": "http-exposures-git-config", "name": "Exposed .git config", "severity": "medium", "evidence": ["[remote \"origin\"] url=https://..." ] } ] - 通过标准:至少命中 1 条暴露项;证据字段被截断/脱敏;不包含完整响应体。
- 风险提示:对生产服务使用需下调速率(
-rl 25)与并发(-bs 1)。
用例B:高危 CVE 快速确认(cve_critical 场景)
- 目标:
<http://10.0.0.5>(含已知高危组件的受控靶机) - 模板集合:
cves/2023,cves/2024(企业私有模板库中保留经审计的子集) - 命令:
mcp call NucleiScanner run_vuln_scan '{"target":"<http://10.0.0.5>","scenario":"cve_critical"}' - 预期输出(示例):
[ { "id": "CVE-2024-XXXX", "name": "Vulnerable XYZ component", "severity": "critical", "evidence": ["poc: GET /vulnerable?payload=..." ] } ] - 通过标准:命中
severity >= high的条目;证据字段长度限制(默认 ≤100);同时记录审计日志(调用时间、模板ID、目标)。 - 风险提示:
cve_critical场景可能包含主动探测 Payload,需严格授权与流量控制。
用例C:自定义企业模板验证(治理与签名)
- 目标:
<https://intranet.example.local>(受控内网门户) - 模板:企业私有库
configs/templates/http/baseline/banner-check.yaml(检测不安全横幅/版本泄露) - 前置校验:
nuclei -validate -t configs/templates/http/baseline/banner-check.yaml- 模板 Hash 写入
configs/templates/manifest.lock,运行时校验通过方可加载。
- 命令:
nuclei -u <https://intranet.example.local> \ -t configs/templates/http/baseline/banner-check.yaml \ -jsonl -silent -rl 50 -bs 2 - 预期输出:命中横幅泄露(
severity: low/medium),聚合结果写入 SIEM(仅摘要)。 - 通过标准:模板签名校验通过;输出字段最小化;告警去重后阈值内投递。
用例D:限速/并发行为验证(抗封禁与稳定性)
- 目标集:
targets.txt(受控清单,10 个目标) - 测试维度:
- 低速低并发:
-rl 25 -bs 1 - 推荐生产:
-rl 50 -bs 2 - 高速高并发(对照):
-rl 150 -bs 4
- 低速低并发:
- 命令:
nuclei -l targets.txt -jsonl -silent -rl 25 -bs 1 \ -severity medium,high,critical \ -t http/technologies -t http/misconfiguration - 记录指标:平均耗时、命中率、误报率(基于标注样本)、被 WAF 拦截比例。将结果入库做对比。
- 通过标准:推荐生产档位下命中率与误报率满足阈值(如 命中率≥95%,误报≤1%),且无触发封禁。
用例E:证据截断与脱敏合规测试
- 目标:前述各用例的输出。
- 校验逻辑:
- MCP 层
truncate_evidence()函数对extracted-results字段长度做限制(默认 100); - 校验日志与 SIEM 中不存在敏感令牌/密码/私钥明文;
- MCP 层
- 通过标准:所有输出均通过脱敏检查,未发现敏感数据泄露;审计覆盖率 100%。
失败与异常场景(负面用例)
- 非白名单目标:期望返回
Error: Target not allowed; - 模板未签名或校验失败:拒绝加载并记录审计事件;
- 超时:返回
Error: Scan timed out,并在下一次触发前执行冷却(限速)。
通过自然语言调用 MCP 进行渗透测试
目标:让安全工程师或助手在对话中以“自然语言”驱动 MCP 预定义工作流与工具,确保受控、可复现、可审计。
前提:Host(如 Claude Desktop/VS Code 插件)已连接到你的 MCP 服务器,并且服务器已注册原子工具与工作流工具(示例:
scan_network、run_vuln_scan、auto_risk_assessment、run_workflow等)。
一、自然语言调用的通用结构(5 要素)
- 意图(Intent):你希望执行的安全任务(资产盘点/端口扫描/漏洞验证/证据截断/写入告警等)。
- 范围与约束(Scope/Constraints):限定目标(IP/CIDR/域名白名单)、速率与并发(
rl/bs)、模板/场景(tech_detect、token_leak、cve_critical)。 - 审批门(Approval Gate):是否需要人机回环;高危操作(利用/写入/外联)必须启用审批。
- 输出格式(Output):结果以结构化摘要(JSON 表)返回;证据字段截断与脱敏。
- 后续动作(Follow-up):如命中
severity>=high则触发 SIEM 告警或生成工单。
示例提示模板(通用):
请使用【预定义工作流】完成安全任务:
- Workflow ID:{wf_asset_scan | wf_vuln_verify | wf_active_exploit}
- 目标:{IP/域/CIDR,需在白名单内}
- 限速并发:rl=50,bs=2(如超出则拒绝并提示)
- 模板场景:{tech_detect | token_leak | cve_critical}
- 输出:仅返回 JSON 摘要(id/name/severity/evidence[≤100]),不要返回原始响应体
- 审批:高危操作前先发起审批(Human-in-the-loop),通过后再继续
如遇权限/范围超出,应直接拒绝并记录审计日志
二、典型自然语言用法示例
- 资产盘点与端口指纹(安全巡检)
对 192.168.1.0/24 执行资产巡检:
- 使用工作流 wf_asset_scan
- Nmap:T3 + SYN 扫描,禁 DNS 反解析;仅 Top-1000 端口;rl=50,bs=2
- 输出 IP 与开放端口列表(服务与版本),以 JSON 返回
- 若发现 HTTP/HTTPS 服务,下一步在 tech_detect 场景下做轻量指纹识别
- 敏感信息泄露验证(token_leak 场景)
对 <http://192.168.10.55> 执行敏感信息泄露核查:
- 场景:token_leak(仅使用经过审计的 exposures 类模板)
- 速率并发:rl=50,bs=2;超时 120s
- 输出:模板 id/name/severity,evidence 截断至 100 字符
- 非白名单目标或模板未签名直接拒绝
- 高危 CVE 快速确认(需审批)
请针对 <http://10.0.0.5> 执行高危 CVE 快速确认:
- 工作流:wf_vuln_verify → cve_critical 场景
- 在执行前发起审批门(Human-in-the-loop),通过后方可继续
- 限速并发:rl=50,bs=2;输出仅摘要
- 命中 critical 时,调用 send_security_alert 将摘要写入 SIEM
- 端到端自动评估(封装工作流)
自动评估 10.0.0.8 的风险:
- 使用高阶工具 auto_risk_assessment,内部按顺序执行:
Nmap 指纹 → web 服务 tech_detect → 关联 CVE → 汇总报告
- 输出结构化摘要,不做自由扩展
- 若需要写入告警,请先发起审批门并确认
三、对话保护与防注入指南(Host/Server 联动)
- 明确声明“仅可触发预定义工作流或安全原子工具”,拒绝自由命令或 shell 执行;
- 参数与范围均走白名单:CIDR/域/场景(模板)均需在服务器侧校验;
- 证据字段统一截断与脱敏(如
extracted-results[≤100]),禁止返回原始响应体; - 对话中出现可疑指令(如绕过限制、请求访问元数据 IP
169.254.169.254)→ 直接拒绝并记录审计事件; - 高危动作前,Host 通过 MCP 的 Sampling/Approval 能力发起人机回环(弹窗/复核),审批通过后才继续。
自然语言拒绝模板示例:
请求被拒绝:该操作需要使用预定义工作流且目标在白名单内。
请提供:Workflow ID、授权目标(IP/域/CIDR)、速率并发、模板场景与输出格式。
如为高危操作,需经审批通过后才能继续。
四、输出与后续联动的自然语言约定
- 输出 JSON 摘要字段:
id/name/severity/evidence(≤100)/matched_at/timestamp; - 结果过长时,Host 以分页或“展开更多”二次请求;
- 对话中的“下一步”指令以自然语言触发工作流后续步骤:
若命中 severity≥high:
- 执行 send_security_alert,写入 SIEM(摘要脱敏)
- 生成工单:标题包含目标与漏洞编号;附上结构化摘要
五、落地建议
- 提供“提示模板库”(Prompt Library):把常用任务的自然语言模板固化在知识库;
- 训练协作习惯:所有高危操作都由 Host 强制弹出审批窗;
- 指标与看板:统计自然语言触发的工作流成功率、错误Top5、审批通过率与告警去重率;
- 持续治理:提示模板与工作流版本钉死、定期审查;在 MCP Server 侧强化参数校验与审计留痕。
架构图:MCP 服务器与企业安全网关、IAM、审计联动

说明:
- Host(LLM/IDE)→ 企业安全网关(mTLS/Bearer、WAF、RateLimit)→ MCP Router。
- MCP Router 将请求分发到各工具(NmapScanner / NucleiScanner / TemplateServer),并将结果写入 Resources Store。
- Router 同步输出 OpenTelemetry Trace 到 Collector,Collector 汇总到 SIEM/日志湖。
- Router 与 IAM(令牌校验、RBAC)联动,敏感凭证从 Vault 拉取(只读、短时令牌)。
Resources 机制:把扫描结果结构化为 MCP Resource
原则:结果以结构化 JSON/CSV 写入资源存储,并通过 MCP 的 resources/list、resources/read 暴露。示例:
from mcp.server.fastmcp import FastMCP
import json, os, time
mcp = FastMCP("ResourcesExample")
RES_ROOT = os.path.abspath("./resources_store")
os.makedirs(RES_ROOT, exist_ok=True)
@mcp.tool()
def save_scan_result(target: str, findings_json: str) -> str:
# 1) 持久化为资源文件(JSON)
ts = int(time.time())
path = os.path.join(RES_ROOT, f"scan_{target}_{ts}.json")
with open(path, "w") as f:
f.write(findings_json)
return path
@mcp.resource("scan://latest")
def latest_scan_resource() -> str:
# 2) 暴露最新结果作为资源(只读)
files = sorted([p for p in os.listdir(RES_ROOT) if p.endswith(".json")])
if not files:
return json.dumps({"error":"no results"})
with open(os.path.join(RES_ROOT, files[-1]), "r") as f:
return f.read()
客户端可通过 resources/list → scan://latest → resources/read 获取 JSON 结果,用于后续分析/可视化。
合规审计:OpenTelemetry 追踪与结构化日志
建议:统一 Trace ID/Span ID,结构化日志(JSON),并将 Tool 调用、审批门、异常事件写入 SIEM。
import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# OTel 初始化
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="<https://otel-collector.example/otlp>"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
# 结构化日志
logging.basicConfig(
format='{"ts":"%(asctime)s","level":"%(levelname)s","event":"%(message)s","trace_id":"%(trace_id)s"}',
level=logging.INFO
)
logger = logging.getLogger("MCP")
def audit_event(msg: str):
span = trace.get_current_span()
tid = span.get_span_context().trace_id
logger.info(msg, extra={"trace_id": hex(tid)})
将 audit_event() 嵌入 Tool 调用前后、审批通过/拒绝、异常捕获处;Collector 汇总到 SIEM/日志湖,实现可检索与告警。
秘密管理:引入 Vault 存储敏感凭证
方案:使用 Vault/K8s Secrets 注入短时令牌;在 MCP 层通过只读接口获取,不落盘、不硬编码。
import hvac, os
VAULT_ADDR = os.getenv("VAULT_ADDR")
VAULT_TOKEN = os.getenv("VAULT_TOKEN") # 仅短时令牌,来自 Sidecar 注入
client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN)
def get_secret(path: str, key: str) -> str:
# 只读拉取;失败返回空,避免暴露异常细节
try:
data = client.secrets.kv.v2.read_secret_version(path=path)
return data["data"]["data"].get(key, "")
except Exception:
return ""
要点:
- Vault 配置最小权限与短 TTL;使用 AppRole/mTLS 绑定;
- 禁止在日志中打印密钥值;仅记录路径与成功/失败状态。
异步处理:长耗时扫描任务的状态回调
模式:将任务入队(队列/线程池),即时返回 job_id;客户端通过 Resource 或 Webhook 订阅状态/结果。
import queue, threading, time, json, os
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("AsyncMCP")
JOBS = {}
Q = queue.Queue()
RES_ROOT = os.path.abspath("./async_results")
os.makedirs(RES_ROOT, exist_ok=True)
# 工作线程
def worker():
while True:
job_id, target = Q.get()
# 1) 执行长耗时扫描(示例用 sleep 代替)
time.sleep(5)
findings = {"target": target, "status": "done", "open_ports": [80,443]}
# 2) 写入资源文件
path = os.path.join(RES_ROOT, f"job_{job_id}.json")
with open(path, "w") as f:
f.write(json.dumps(findings))
JOBS[job_id] = {"status":"done", "path": path}
Q.task_done()
threading.Thread(target=worker, daemon=True).start()
@mcp.tool()
def submit_scan(target: str) -> str:
job_id = str(int(time.time()*1000))
JOBS[job_id] = {"status": "queued"}
Q.put((job_id, target))
return json.dumps({"job_id": job_id, "status": "queued"})
@mcp.resource("job://status")
def job_status() -> str:
# 最新作业状态(也可做 resources/list + resources/read 按 job_id)
latest = sorted(JOBS.keys())[-1] if JOBS else None
if not latest:
return json.dumps({"error":"no jobs"})
return json.dumps({latest: JOBS[latest]})
可选:
- Webhook 模式:作业完成后 POST 到内部告警通道(带签名);
- 轮询模式:客户端每隔 N 秒读取
job://status或job://{id}资源。
后续增强建议与路线图
1) 零信任与认证细化
- mTLS 证书轮换 Playbook(签发、分发、失效与回滚流程)
- Bearer Token 的签名与校验方案(HMAC/JWT),密钥滚动与吊销策略
- Host 侧权限分级与会话隔离(多租户场景)
2) RBAC/IAM 最小权限样例库
- K8s Role/RoleBinding 最小权限 YAML(仅
get/list一组) - AWS IAM Policy(S3/GCS 只读、CloudWatch 只写、Secrets Manager 只读)
- GitOps 管理策略:PR 审核 + 变更审批门
3) 模板治理的“白名单矩阵”
- Nuclei 可用模板清单(
safe/discovery分类)与企业自定义模板的审核表 - NSE/Nuclei 模板风险分级(探测/弱探测/POC/潜在DoS)与生产环境允许列表
- 模板变更评审表:影响评估、测试报告、签名与版本锁定
4) 误报/漏报治理与处置流程
- 误报判定标准与处置路径(降级、列入忽略、模板修正)
- 漏报检测方法(样本基准、对照扫描器与人工复核)
- 告警去重与阈值策略(同一目标/模板/指纹的去重规则)
5) 性能与成本看板(SLO/SLA)
- 核心指标:工具调用成功率、P95/P99 时延、失败 Top5、模板命中率、告警去重率
- 成本维度:计算/网络流量占用、模板执行总时长、WAF 命中率与封禁风险
- Prometheus/Grafana 示例仪表(抓取 OTel 指标与自定义计数器)
6) 红队剧本仓库骨架
- 目录结构:
scenarios/(攻防链路)、playbooks/(动作清单)、evidence/(脱敏证据) - 受控演练 SOP:审批门 → 执行 → 取证 → 审计入库 → 复盘报告
- 演练成果沉淀:问题清单、修复项、二次验证脚本
7) 合规映射与审计留痕
- 合规条款映射(ISO 27001/27002、SOC 2、NIST 800 系列)到控制项(认证、审计、密钥管理、最小权限)
- 数据保留策略与隐私条款:日志保留期限、脱敏字段、访问稽核
- 电子取证(Forensics)要点:时间线、证据保全、链路完整性
8) 容器与网络硬化增补
- 容器运行时加固(Seccomp、AppArmor、只读根文件系统、非 root 用户)
- 出站 egress 控制示例(仅白名单域/IP,阻断云元数据地址)
- 供应链安全(SBOM、依赖扫描、镜像签名与策略)
9) 高级异步与队列治理
- 重试/回退策略(指数退避 + 抖动),最大重试次数与死信队列
- Webhook 签名(HMAC)与幂等处理、事件去重
- 队列指标:积压量、处理速率、失败率与最长等待时间
10) 测试与质量保障
- 单元测试(工具参数校验、资源读写函数)
- 集成测试(端到端工作流:资产 → 指纹 → 漏洞 → 告警)
- Mock/Stub:外部 API 与扫描器输出的可控模拟;回归测试集
11) Host 专用提示模板库(Prompt Library)
- Claude Desktop/VS Code 插件的自然语言模板,绑定工作流 ID 与场景参数
高危操作→审批门的统一提示语与拒绝模板(可直接复制)- 常见任务的“安全提示模板”卡片(资产巡检、泄露核查、CVE确认、阈值告警)
零信任与认证细化(mTLS 轮换 · Bearer 签名/滚动 · 会话隔离)
A) mTLS 证书轮换 Playbook(示例:OpenSSL)
目标:为 MCP Router 与 Gateway 配置双向 TLS,支持定期轮换与快速回滚。
- 生成 CA 与服务端/客户端证书(短 TTL)
# 生成 CA(离线保存)
openssl genrsa -out ca.key 4096
openssl req -x509 -new -nodes -key ca.key -sha256 -days 365 \
-subj "/C=CN/ST=Shanghai/O=Corp/OU=Sec/CN=Corp-CA" -out ca.crt
# 服务端证书(72h TTL,示例)
openssl genrsa -out server.key 4096
openssl req -new -key server.key -out server.csr -subj \
"/C=CN/ST=Shanghai/O=Corp/OU=MCP/CN=mcp.internal.corp"
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial \
-out server.crt -days 3 -sha256 -extfile <(cat <<EOF
subjectAltName=DNS:mcp.internal.corp,IP:10.10.10.10
EOF
)
# 客户端证书(Host)
openssl genrsa -out client.key 4096
openssl req -new -key client.key -out client.csr -subj \
"/C=CN/ST=Shanghai/O=Corp/OU=Host/CN=host.client"
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial \
-out client.crt -days 3 -sha256
- Gateway(Nginx/Envoy)加载新证书并灰度:
server {
listen 443 ssl http2;
server_name mcp.internal.corp;
ssl_certificate /etc/certs/server.crt;
ssl_certificate_key /etc/certs/server.key;
ssl_client_certificate /etc/certs/ca.crt;
ssl_verify_client on;
# 通过路由权重实现灰度(可在 Envoy 使用 weighted_clusters)
location /sse { proxy_pass <http://mcp-router:8000> }
}
- 轮换策略:
- TTL:≤72h;重发前 24h 通知;
- 回滚:保留上一版证书与配置副本,支持一键回退;
- 监控:连接失败率、握手错误、证书即将过期告警。
B) Bearer Token(JWT)签名/滚动/吊销
建议使用 RS256(不对称)或 HS256(对称,需严格保管)
验证中间件(示例):
import jwt, time, os
from starlette.responses import JSONResponse
JWT_ISSUER = "mcp.corp"
JWT_AUDIENCE = "mcp-host"
JWT_PUBKEY = os.getenv("JWT_PUBKEY")
def verify_jwt(auth_header: str):
if not auth_header or not auth_header.startswith("Bearer "):
return False, "Missing Token"
token = auth_header.split(" ")[1]
try:
payload = jwt.decode(token, JWT_PUBKEY, algorithms=["RS256"], \
audience=JWT_AUDIENCE, issuer=JWT_ISSUER)
now = int(time.time())
if payload.get("nbf",0) > now or payload.get("exp",0) < now:
return False, "Token expired/not before"
if payload.get("scope") not in ["read", "workflow:scan"]:
return False, "Invalid scope"
# 可选:去 Redis 检查吊销列表(jti)
return True, payload
except Exception as e:
return False, str(e)
滚动与吊销:
- Key Rotation:采用 JWKS(公钥集合)发布,Host 定期拉取;
- 吊销列表:按
jti存储于 Redis,命中则拒绝; - 短 TTL:建议 ≤1h;高危操作使用更短 TTL 的 step-up token。
C) 会话隔离(多租户/多 Host)
- 令牌作用域(scope):按工作流/只读资源划分,例如
workflow:asset_scan、resource:read; - 命名空间隔离:在 MCP Router 对每个租户/Host 绑定隔离的资源目录与速率阈值;
- 速率与并发:按
tenant_id/host_id维度限速,避免相互影响; - 会话 TTL:闲置超时自动关闭;审批门过期需重新确认。
误报/漏报治理
A) 判定标准
- 误报(False Positive):模板命中但不满足业务/技术证据标准(如无有效凭据泄露、非生产路径),要求证据字段与二次验证失败;
- 漏报(False Negative):已知漏洞或暴露项应命中但未命中,通常由速率/并发、WAF 拦截或模板缺陷导致;
- 严重性映射:结合 CVSS、业务资产重要性,统一规则表(critical/high/medium/low)。
B) 去重与阈值
- 去重键:
(target, template_id, fingerprint); - 时间窗口:在 24h 内同一去重键仅保留一次告警;
- 阈值策略:相同目标/模板在 1h 内超过 N 次(如 5 次)则进入降噪,不再投递 IM,仅入库;
示例(Python 聚合):
from collections import defaultdict
def dedup_alerts(alerts):
seen = set(); result = []
for a in alerts:
fp = (a["target"], a["id"], a.get("fingerprint",""))
if fp in seen: continue
seen.add(fp); result.append(a)
return result
C) 复核流程(SOP)
- 自动聚合 → 去重 → 阈值过滤 → 写入 SIEM(仅摘要);
- 复核人领取:检查证据(脱敏后)与模板版本;
- 二次验证:轻量二次扫描或静态校验;
- 结论与处置:误报→忽略并记录;漏报→修复模板或限速策略;
- 反馈闭环:更新白名单/黑名单与场景映射;每周审查误报/漏报比。
Host 提示模板库(Claude / VS Code 专用)
A) Claude Desktop(对话模板)
- 安全巡检(资产 → 指纹)
请使用【预定义工作流】完成安全巡检:
- Workflow ID:wf_asset_scan
- 目标:192.168.1.0/24(白名单网段)
- Nmap:T3 + SYN,Top-1000,rl=50,bs=2;禁 DNS 反解析
- 输出:JSON 摘要(ip/open_ports/service/version,evidence≤100)
- 注意:若发现 HTTP/HTTPS 服务,再执行 tech_detect 场景指纹识别
- 泄露核查(exposures/token)
对 <http://192.168.10.55> 执行敏感信息泄露核查:
- 场景:token_leak(模板白名单)
- 速率并发:rl=50,bs=2;超时 120s
- 审批门:无(只读场景)
- 输出:id/name/severity/evidence≤100;不返回原始响应体
- 高危 CVE 快速确认(需审批)
请针对 <http://10.0.0.5> 执行高危 CVE 快速确认:
- 工作流:wf_vuln_verify → cve_critical 场景
- 审批门:先发起审批窗,通过后继续
- 限速并发:rl=50,bs=2
- 输出:JSON 摘要;若 severity≥high,调用 send_security_alert 写入 SIEM
- 拒绝模板(超范围/异常)
请求被拒绝:该操作需要预定义工作流且目标在白名单内。
请提供:Workflow ID、授权目标(IP/域/CIDR)、速率并发、模板场景与输出格式。
如为高危操作,需经审批通过后才能继续。
B) VS Code(命令面板 / 任务片段)
tasks.json 示例,将自然语言意图映射到 MCP 调用脚本:
{
"version": "2.0.0",
"tasks": [
{
"label": "MCP: Asset Scan",
"type": "shell",
"command": "mcp call NmapScanner scan_network '{\"target_ip\":\"${input:ip}\"}'",
"problemMatcher": []
},
{
"label": "MCP: Nuclei token_leak",
"type": "shell",
"command": "mcp call NucleiScanner run_vuln_scan '{\"target\":\"${input:url}\",\"scenario\":\"token_leak\"}'"
}
],
"inputs": [
{ "id": "ip", "type": "promptString", "description": "目标 IP (白名单内)" },
{ "id": "url", "type": "promptString", "description": "目标 URL (白名单与授权)" }
]
}
审批门:通过执行前置脚本 precheck.sh 检查当前用户是否通过审批窗:
#!/bin/bash
# precheck.sh:检验审批状态(可从 IM/工单系统查询)
STATUS=$(curl -s <https://approval.example/api/status?user=$USER>
if [[ "$STATUS" != "approved" ]]; then
echo "需要审批:请在 IM 中确认高危操作。"
exit 1
fi
在 tasks.json 里为高危任务设置 dependsOn: precheck 或将 command 包装为 precheck.sh && 实际命令。