跳到主要内容

MCP 渗透测试与安全:基础技术指南

目录

本文面向从零到一构建“可控、可复现”的 MCP(Model Context Protocol)渗透测试体系,聚焦工程化落地与安全边界。包含环境一键初始化、最小服务器模板、Nmap/Nuclei 安全封装、工作流编排、人机回环审批、远程传输与认证样例、性能基准、模板治理(CI)、威胁模型与红队测试。


目录

  • 环境一键初始化
  • MCP 起源与知识点速览
  • MCP 在渗透测试中的用途与价值
  • MCP 最小服务器模板
  • Nmap 接入(白名单 + XML 解析 + 基准)
  • Nuclei 接入(模板白名单 + JSONL 解析 + 治理)
  • 更多工具接入(只读与最小权限版)
  • 远程传输与认证(SSE/WebSocket + Bearer/mTLS)
  • 性能基准:Nmap 与 Nucle
  • 模板治理与供应链安全(CI 示例)
  • 威胁模型与红队测试(受控演练)
  • 端到端示例仓库骨架(可复现)

环境一键初始化

#!/bin/bash
set -e

PROJECT_ROOT="$HOME/mcp-pentest-lab"
mkdir -p "$PROJECT_ROOT"/{servers,tools,workflows,configs,logs}
cd "$PROJECT_ROOT"

# 1) Python 环境与核心依赖
python3.12 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install "mcp[cli]>=1.0.0" pydantic httpx bandit kubernetes dnspython

# 2) 安装核心工具 (Nmap & Nuclei)
if ! command -v nmap &>/dev/null; then
  sudo apt-get update && sudo apt-get install -y nmap
fi
if ! command -v nuclei &>/dev/null; then
  wget -qO nuclei.zip <https://github.com/projectdiscovery/nuclei/releases/latest/download/nuclei_linux_amd64.zip>
  unzip -o nuclei.zip && sudo mv nuclei /usr/local/bin/
  rm nuclei.zip
  nuclei -update-templates
fi

# 3) 验证安装
mcp --version
nmap --version | head -n 1
nuclei -version

MCP 起源与知识点速览(2024—2025)

  • 起源与捐赠:Anthropic 于 2024 提出 MCP,后捐赠至 Linux Foundation 的 Agentic AI Foundation,形成中立治理(与 Block 的 goose、OpenAI 的 AGENTS.md 并列的 Agent 基础设施)。
  • 时间线(2024→2025):提出/开源 → 基金会托管 → SDK 多语言扩展(Python/Java/Swift/C#)→ IDE/桌面端支持(VS Code、Claude Desktop)→ 传输从 stdio 扩展至 SSE/WebSocket。
  • 协议设计:基于 JSON-RPC 2.0;能力握手(capabilities);工具(tools)、资源(resources)、提示(prompts);采样/审批(sampling);日志/追踪(logging/tracing);Schema 与参数校验。
  • 生态角色:Host/Client(宿主,如 IDE/Claude Desktop)、Server(能力提供者)、Tools/Resources(能力载体)。
  • 传输方式:本地 stdio;远程 SSE/WebSocket(建议配合 mTLS 或 Bearer)。

MCP 在渗透测试中的用途与价值

  • 攻防统一入口:将 Nmap、Nuclei、DNS/TLS、OSINT、CVE/NVD、K8s/Docker 等工具统一在受控接口下,减少脚本胶水与权限散落。
  • 工作流编排:把“资产发现→指纹识别→漏洞验证→告警联动”封装为预定义工作流工具,LLM 仅触发工作流而非逐步拼接,降低逻辑劫持风险。
  • 审计与合规:Logging/Sampling 原生支持;记录 Tool 调用链、参数与返回摘要;配合限速与审批门满足红队/蓝队留痕与合规要求。
  • 自动化与复现:声明式 Schema + 场景模板(tech_detect/token_leak/cve_critical),扫描可复现、可对比;结合缓存与版本钉死便于基线评估。
  • 降低集成成本:一次开发的 MCP Server 可在多宿主(Claude Desktop、VS Code)复用,促进团队协作。
  • 新攻击面验证:在受控沙箱安全复现 Prompt Injection、SSRF、过度权限、供应链包投毒、服务器伪装等风险,并用最小权限/白名单/输出净化防护。

MCP 最小服务器模板

import os, json, logging
from mcp.server.fastmcp import FastMCP, Context
from pydantic import Field, BaseModel

logging.basicConfig(
    format='{"time":"%(asctime)s","level":"%(levelname)s","msg":"%(message)s"}',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

mcp = FastMCP("SecureTemplate")
SANDBOX_ROOT = os.path.abspath("./safe_data")

def validate_path(filename: str) -> str:
    target = os.path.abspath(os.path.join(SANDBOX_ROOT, filename))
    if not target.startswith(SANDBOX_ROOT):
        logger.warning(f"Security Alert: Path traversal attempt: {filename}")
        raise ValueError("Access Denied: Path outside sandbox")
    return target

class ReadFileParams(BaseModel):
    filename: str = Field(..., pattern=r"^[a-zA-Z0-9_\-\.]+$", max_length=64,
                          description="目标文件名,禁止路径分隔符")

@mcp.tool()
def read_secure_file(ctx: Context, params: ReadFileParams) -> str:
    try:
        path = validate_path(params.filename)
        if not os.path.exists(path):
            return json.dumps({"code": 404, "error": "File not found"})
        if os.path.getsize(path) > 1024 * 1024:
            return json.dumps({"code": 413, "error": "File too large"})
        with open(path, "r") as f:
            return f.read()
    except Exception as e:
        logger.error(f"System Error: {str(e)}")
        return json.dumps({"code": 500, "error": "Internal Error"})

if __name__ == "__main__":
    mcp.run()

Nmap 接入(白名单 + XML 解析 + 基准)

  • 安全边界:CIDR 白名单、令牌桶限速、禁止 -p- 全端口(除非特许)、强制 -oX - 输出 XML。
  • 参数矩阵(建议):-sS(SYN 扫描)、-sV(版本探测按需)、-T3/T4(时序)、-n(禁 DNS 反解析)。

示例 MCP 工具:

import time, subprocess, ipaddress, xml.etree.ElementTree as ET
from mcp.server.fastmcp import FastMCP
from pydantic import Field

mcp = FastMCP("NmapScanner")
ALLOWED_CIDR = [ipaddress.ip_network("192.168.1.0/24"), ipaddress.ip_network("10.0.0.0/8")]
LAST_SCAN = 0

def check_target(ip: str) -> bool:
    try:
        addr = ipaddress.ip_address(ip)
        return any(addr in net for net in ALLOWED_CIDR)
    except ValueError:
        return False

def parse_nmap_xml(xml_str: str) -> list:
    try:
        root = ET.fromstring(xml_str)
        hosts = []
        for host in root.findall("host"):
            addr = host.find("address").get("addr")
            ports = []
            for port in host.findall(".//port"):
                if port.find("state").get("state") == "open":
                    svc = port.find("service")
                    ports.append({
                        "port": int(port.get("portid")),
                        "service": svc.get("name", "unknown") if svc else "unknown",
                        "product": svc.get("product", "") if svc else ""
                    })
            if ports:
                hosts.append({"ip": addr, "open_ports": ports})
        return hosts
    except ET.ParseError:
        return [{"error": "XML Parse Failed"}]

@mcp.tool()
def scan_network(target_ip: str = Field(..., description="Target IP")) -> str:
    global LAST_SCAN
    if time.time() - LAST_SCAN < 2.0:
        return "Error: Rate limit exceeded"
    if not check_target(target_ip):
        return "Error: Target not allowed"
    cmd = ["nmap", "-sS", "-sV", "-T3", "-n", "-oX", "-", target_ip]
    try:
        LAST_SCAN = time.time()
        res = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
        return str(parse_nmap_xml(res.stdout))
    except subprocess.TimeoutExpired:
        return "Error: Scan Timeout"

Nuclei 接入(模板白名单 + JSONL 解析 + 治理)

  • 安全边界:仅允许审核后的模板集合(safe/discovery),场景到模板映射;限速(-rl)、并发(-bs)与超时;证据截断与脱敏。

示例 MCP 工具:

import subprocess, json
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("NucleiScanner")
SCENARIOS = {
    "tech_detect": ["http/technologies"],
    "misconfig": ["http/misconfiguration"],
    "cve_critical": ["cves/2023", "cves/2024"],  # 慎用
}

def truncate_evidence(data: dict, length: int = 100) -> dict:
    if "extracted-results" in data:
        data["extracted-results"] = [str(x)[:length] + "..." for x in data["extracted-results"]]
    return data

@mcp.tool()
def run_vuln_scan(target: str, scenario: str) -> str:
    if scenario not in SCENARIOS:
        return "Error: Invalid scenario"
    templates = []
    for t in SCENARIOS[scenario]:
        templates.extend(["-t", t])
    cmd = ["nuclei", "-u", target, "-jsonl", "-silent", "-rl", "50"] + templates
    try:
        res = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
        findings = []
        for line in res.stdout.splitlines():
            try:
                j = json.loads(line)
                clean = {
                    "id": j.get("template-id"),
                    "name": j.get("info", {}).get("name"),
                    "severity": j.get("info", {}).get("severity"),
                    "evidence": truncate_evidence(j).get("extracted-results", [])
                }
                findings.append(clean)
            except: pass
        return json.dumps(findings, indent=2)
    except Exception as e:
        return f"Error: {str(e)}"

更多工具接入、

  • Docker/K8s:只读 RBAC/IAM;返回结构化元信息;禁止系统命名空间访问。
  • CVE/NVD:带缓存与速率限制,返回精简字段。
  • DNS/TLS:限定记录类型与超时;证书到期与 SAN 列表(限长度)。
  • OSINT:必须授权与查询白名单;字段最小化;日志脱敏。
  • SIEM/Webhook:只写入脱敏摘要;重试与回退;去重与阈值。

远程传输与认证(SSE/WebSocket + Bearer/mTLS)

  • 网关层(Nginx/Envoy):mTLS 双向认证;SSE/WebSocket 代理与缓冲关闭;
  • 应用层:Bearer Token 中间件;健康检查白名单;
  • 证书轮换:定期更新与过期策略,避免长寿命密钥。

Nginx 片段:

server {
  listen 443 ssl http2;
  server_name mcp.internal.corp;
  ssl_certificate     /etc/certs/server.crt;
  ssl_certificate_key /etc/certs/server.key;
  ssl_client_certificate /etc/certs/ca.crt;
  ssl_verify_client on;
  location /sse {
    proxy_pass <http://localhost:8000>
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding off;
    proxy_buffering off;
    proxy_cache off;
  }
}

性能基准:Nmap 与 Nuclei(示例对比表)

示例靶场:2C4G/1000Mbps 内网;重复≥5 次取平均。

扫描器 配置 耗时 命中率 误报率 建议
Nmap -T3 -sS Top1000 15s 100% 巡检默认
Nmap -T4 -sS Top1000 4s 98% 应急摸排
Nmap -T3 -sV Top100 45s 100% 版本确认
Nuclei -rl 25 -bs 1 120s 100% 0% 低并发低误报
Nuclei -rl 50 -bs 2 55s 100% <1% 推荐生产
Nuclei -rl 150 -bs 4 18s 80% 5% 高并发慎用

注:高并发命中率下降通常源于 WAF 拦截或服务侧丢包,建议生产锁定在 -rl 50 -bs 2


模板治理与供应链安全(CI 示例)

  • 私有模板库:同步官方 safe/discovery 类;企业自定义模板入库。
  • 签名与锁定:模板文件 Hash 进 manifest.lock;运行时拒绝未签名变更。
  • CI 安审:nuclei -validate + bandit/pip-audit;依赖 CVE 映射;
  • 变更评审:双人审核;变更记录与回滚策略。

GitHub Actions 片段:

name: Template Governance
on: [push, pull_request]

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Install Nuclei
        run: go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest
      - name: Validate Templates
        run: nuclei -validate -t configs/templates/
      - name: Check Hash
        run: python3 scripts/verify_hash.py

威胁模型与红队测试

风险→防御对照(示例):

  • Prompt Injection:拒绝 shell=True;严格正则与枚举白名单;结构化日志溯源;
  • SSRF:CIDR/域白名单;容器网络禁止访问 169.254.169.254
  • Excessive Privilege:禁止 --script 等高危参数;非 root 用户运行;
  • 供应链包投毒:版本钉死与哈希校验;CI 安审;
  • 服务器伪装:mTLS 双向认证;来源校验与网关白名单。

红队演练时间线:准备(靶场与日志)→ 攻击链验证(诱导/绕过)→ 防御有效性检查(拦截与审批门)→ 复盘(留痕与修复)。


端到端示例仓库骨架

mcp-pentest-lab/
├── servers/
│   ├── nmap_server.py
│   ├── nuclei_server.py
│   └── template_server.py
├── tools/
│   ├── auth.py
│   └── limits.py
├── workflows/
│   ├── wf_asset_scan.yaml
│   ├── wf_vuln_verify.yaml
│   └── wf_alert.json
├── configs/
│   ├── whitelist.cidr
│   ├── templates/
│   └── rate_limits.json
├── scripts/
│   ├── init_env.sh
│   ├── bench_nmap.sh
│   └── bench_nuclei.sh
├── Makefile
└── README.md

Makefile 片段:

.PHONY: init bench audit
init:
	@bash scripts/init_env.sh
run-servers:
	@mcp run servers/nmap_server.py &
	@mcp run servers/nuclei_server.py &
bench:
	@bash scripts/bench_nmap.sh
	@bash scripts/bench_nuclei.sh
audit:
	@bandit -r servers/ -ll
	@pip-audit

Nuclei 测试用例

注意:仅在自有或书面授权的靶场中执行。启用模板白名单(safe/discovery)与场景映射;统一使用 -jsonl -silent 输出,证据字段需截断与脱敏。

用例A:敏感信息泄露检测(token_leak 场景)

  • 目标:<http://192.168.10.55>(内网受控 Web 容器)
  • 模板集合:http/exposures/tokens, http/exposures/configs, http/exposures/git-config(已审计)
  • 命令(MCP 触发):
    mcp call NucleiScanner run_vuln_scan '{"target":"<http://192.168.10.55>","scenario":"token_leak"}'
  • 预期 JSONL/聚合输出(示例):
    [
      {
        "id": "http-exposures-env",
        "name": "Exposed .env file",
        "severity": "high",
        "evidence": ["API_KEY=*****..."]
      },
      {
        "id": "http-exposures-git-config",
        "name": "Exposed .git config",
        "severity": "medium",
        "evidence": ["[remote \"origin\"] url=https://..." ]
      }
    ]
  • 通过标准:至少命中 1 条暴露项;证据字段被截断/脱敏;不包含完整响应体。
  • 风险提示:对生产服务使用需下调速率(-rl 25)与并发(-bs 1)。

用例B:高危 CVE 快速确认(cve_critical 场景)

  • 目标:<http://10.0.0.5>(含已知高危组件的受控靶机)
  • 模板集合:cves/2023, cves/2024(企业私有模板库中保留经审计的子集)
  • 命令:
    mcp call NucleiScanner run_vuln_scan '{"target":"<http://10.0.0.5>","scenario":"cve_critical"}'
  • 预期输出(示例):
    [
      {
        "id": "CVE-2024-XXXX",
        "name": "Vulnerable XYZ component",
        "severity": "critical",
        "evidence": ["poc: GET /vulnerable?payload=..." ]
      }
    ]
  • 通过标准:命中 severity >= high 的条目;证据字段长度限制(默认 ≤100);同时记录审计日志(调用时间、模板ID、目标)。
  • 风险提示:cve_critical 场景可能包含主动探测 Payload,需严格授权与流量控制。

用例C:自定义企业模板验证(治理与签名)

  • 目标:<https://intranet.example.local>(受控内网门户)
  • 模板:企业私有库 configs/templates/http/baseline/banner-check.yaml(检测不安全横幅/版本泄露)
  • 前置校验:
    • nuclei -validate -t configs/templates/http/baseline/banner-check.yaml
    • 模板 Hash 写入 configs/templates/manifest.lock,运行时校验通过方可加载。
  • 命令:
    nuclei -u <https://intranet.example.local> \
      -t configs/templates/http/baseline/banner-check.yaml \
      -jsonl -silent -rl 50 -bs 2
  • 预期输出:命中横幅泄露(severity: low/medium),聚合结果写入 SIEM(仅摘要)。
  • 通过标准:模板签名校验通过;输出字段最小化;告警去重后阈值内投递。

用例D:限速/并发行为验证(抗封禁与稳定性)

  • 目标集:targets.txt(受控清单,10 个目标)
  • 测试维度:
    • 低速低并发:-rl 25 -bs 1
    • 推荐生产:-rl 50 -bs 2
    • 高速高并发(对照):-rl 150 -bs 4
  • 命令:
    nuclei -l targets.txt -jsonl -silent -rl 25 -bs 1 \
      -severity medium,high,critical \
      -t http/technologies -t http/misconfiguration
  • 记录指标:平均耗时、命中率、误报率(基于标注样本)、被 WAF 拦截比例。将结果入库做对比。
  • 通过标准:推荐生产档位下命中率与误报率满足阈值(如 命中率≥95%,误报≤1%),且无触发封禁。

用例E:证据截断与脱敏合规测试

  • 目标:前述各用例的输出。
  • 校验逻辑:
    • MCP 层 truncate_evidence() 函数对 extracted-results 字段长度做限制(默认 100);
    • 校验日志与 SIEM 中不存在敏感令牌/密码/私钥明文;
  • 通过标准:所有输出均通过脱敏检查,未发现敏感数据泄露;审计覆盖率 100%。

失败与异常场景(负面用例)

  • 非白名单目标:期望返回 Error: Target not allowed
  • 模板未签名或校验失败:拒绝加载并记录审计事件;
  • 超时:返回 Error: Scan timed out,并在下一次触发前执行冷却(限速)。

通过自然语言调用 MCP 进行渗透测试

目标:让安全工程师或助手在对话中以“自然语言”驱动 MCP 预定义工作流与工具,确保受控、可复现、可审计。

前提:Host(如 Claude Desktop/VS Code 插件)已连接到你的 MCP 服务器,并且服务器已注册原子工具与工作流工具(示例:scan_networkrun_vuln_scanauto_risk_assessmentrun_workflow 等)。

一、自然语言调用的通用结构(5 要素)

  • 意图(Intent):你希望执行的安全任务(资产盘点/端口扫描/漏洞验证/证据截断/写入告警等)。
  • 范围与约束(Scope/Constraints):限定目标(IP/CIDR/域名白名单)、速率与并发(rl/bs)、模板/场景(tech_detecttoken_leakcve_critical)。
  • 审批门(Approval Gate):是否需要人机回环;高危操作(利用/写入/外联)必须启用审批。
  • 输出格式(Output):结果以结构化摘要(JSON 表)返回;证据字段截断与脱敏。
  • 后续动作(Follow-up):如命中 severity>=high 则触发 SIEM 告警或生成工单。

示例提示模板(通用):

请使用【预定义工作流】完成安全任务:
- Workflow ID:{wf_asset_scan | wf_vuln_verify | wf_active_exploit}
- 目标:{IP/域/CIDR,需在白名单内}
- 限速并发:rl=50,bs=2(如超出则拒绝并提示)
- 模板场景:{tech_detect | token_leak | cve_critical}
- 输出:仅返回 JSON 摘要(id/name/severity/evidence[≤100]),不要返回原始响应体
- 审批:高危操作前先发起审批(Human-in-the-loop),通过后再继续
如遇权限/范围超出,应直接拒绝并记录审计日志

二、典型自然语言用法示例

  1. 资产盘点与端口指纹(安全巡检)
对 192.168.1.0/24 执行资产巡检:
- 使用工作流 wf_asset_scan
- Nmap:T3 + SYN 扫描,禁 DNS 反解析;仅 Top-1000 端口;rl=50,bs=2
- 输出 IP 与开放端口列表(服务与版本),以 JSON 返回
- 若发现 HTTP/HTTPS 服务,下一步在 tech_detect 场景下做轻量指纹识别
  1. 敏感信息泄露验证(token_leak 场景)
对 <http://192.168.10.55> 执行敏感信息泄露核查:
- 场景:token_leak(仅使用经过审计的 exposures 类模板)
- 速率并发:rl=50,bs=2;超时 120s
- 输出:模板 id/name/severity,evidence 截断至 100 字符
- 非白名单目标或模板未签名直接拒绝
  1. 高危 CVE 快速确认(需审批)
请针对 <http://10.0.0.5> 执行高危 CVE 快速确认:
- 工作流:wf_vuln_verify → cve_critical 场景
- 在执行前发起审批门(Human-in-the-loop),通过后方可继续
- 限速并发:rl=50,bs=2;输出仅摘要
- 命中 critical 时,调用 send_security_alert 将摘要写入 SIEM
  1. 端到端自动评估(封装工作流)
自动评估 10.0.0.8 的风险:
- 使用高阶工具 auto_risk_assessment,内部按顺序执行:
  Nmap 指纹 → web 服务 tech_detect → 关联 CVE → 汇总报告
- 输出结构化摘要,不做自由扩展
- 若需要写入告警,请先发起审批门并确认

三、对话保护与防注入指南(Host/Server 联动)

  • 明确声明“仅可触发预定义工作流或安全原子工具”,拒绝自由命令或 shell 执行;
  • 参数与范围均走白名单:CIDR/域/场景(模板)均需在服务器侧校验;
  • 证据字段统一截断与脱敏(如 extracted-results[≤100]),禁止返回原始响应体;
  • 对话中出现可疑指令(如绕过限制、请求访问元数据 IP 169.254.169.254)→ 直接拒绝并记录审计事件;
  • 高危动作前,Host 通过 MCP 的 Sampling/Approval 能力发起人机回环(弹窗/复核),审批通过后才继续。

自然语言拒绝模板示例:

请求被拒绝:该操作需要使用预定义工作流且目标在白名单内。
请提供:Workflow ID、授权目标(IP/域/CIDR)、速率并发、模板场景与输出格式。
如为高危操作,需经审批通过后才能继续。

四、输出与后续联动的自然语言约定

  • 输出 JSON 摘要字段:id/name/severity/evidence(≤100)/matched_at/timestamp
  • 结果过长时,Host 以分页或“展开更多”二次请求;
  • 对话中的“下一步”指令以自然语言触发工作流后续步骤:
若命中 severity≥high:
- 执行 send_security_alert,写入 SIEM(摘要脱敏)
- 生成工单:标题包含目标与漏洞编号;附上结构化摘要

五、落地建议

  • 提供“提示模板库”(Prompt Library):把常用任务的自然语言模板固化在知识库;
  • 训练协作习惯:所有高危操作都由 Host 强制弹出审批窗;
  • 指标与看板:统计自然语言触发的工作流成功率、错误Top5、审批通过率与告警去重率;
  • 持续治理:提示模板与工作流版本钉死、定期审查;在 MCP Server 侧强化参数校验与审计留痕。

架构图:MCP 服务器与企业安全网关、IAM、审计联动

说明:

  • Host(LLM/IDE)→ 企业安全网关(mTLS/Bearer、WAF、RateLimit)→ MCP Router。
  • MCP Router 将请求分发到各工具(NmapScanner / NucleiScanner / TemplateServer),并将结果写入 Resources Store。
  • Router 同步输出 OpenTelemetry Trace 到 Collector,Collector 汇总到 SIEM/日志湖。
  • Router 与 IAM(令牌校验、RBAC)联动,敏感凭证从 Vault 拉取(只读、短时令牌)。

Resources 机制:把扫描结果结构化为 MCP Resource

原则:结果以结构化 JSON/CSV 写入资源存储,并通过 MCP 的 resources/listresources/read 暴露。示例:

from mcp.server.fastmcp import FastMCP
import json, os, time

mcp = FastMCP("ResourcesExample")
RES_ROOT = os.path.abspath("./resources_store")
os.makedirs(RES_ROOT, exist_ok=True)

@mcp.tool()
def save_scan_result(target: str, findings_json: str) -> str:
    # 1) 持久化为资源文件(JSON)
    ts = int(time.time())
    path = os.path.join(RES_ROOT, f"scan_{target}_{ts}.json")
    with open(path, "w") as f:
        f.write(findings_json)
    return path

@mcp.resource("scan://latest")
def latest_scan_resource() -> str:
    # 2) 暴露最新结果作为资源(只读)
    files = sorted([p for p in os.listdir(RES_ROOT) if p.endswith(".json")])
    if not files:
        return json.dumps({"error":"no results"})
    with open(os.path.join(RES_ROOT, files[-1]), "r") as f:
        return f.read()

客户端可通过 resources/listscan://latestresources/read 获取 JSON 结果,用于后续分析/可视化。


合规审计:OpenTelemetry 追踪与结构化日志

建议:统一 Trace ID/Span ID,结构化日志(JSON),并将 Tool 调用、审批门、异常事件写入 SIEM。

import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

# OTel 初始化
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="<https://otel-collector.example/otlp>"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

# 结构化日志
logging.basicConfig(
    format='{"ts":"%(asctime)s","level":"%(levelname)s","event":"%(message)s","trace_id":"%(trace_id)s"}',
    level=logging.INFO
)
logger = logging.getLogger("MCP")

def audit_event(msg: str):
    span = trace.get_current_span()
    tid = span.get_span_context().trace_id
    logger.info(msg, extra={"trace_id": hex(tid)})

audit_event() 嵌入 Tool 调用前后、审批通过/拒绝、异常捕获处;Collector 汇总到 SIEM/日志湖,实现可检索与告警。


秘密管理:引入 Vault 存储敏感凭证

方案:使用 Vault/K8s Secrets 注入短时令牌;在 MCP 层通过只读接口获取,不落盘、不硬编码。

import hvac, os

VAULT_ADDR = os.getenv("VAULT_ADDR")
VAULT_TOKEN = os.getenv("VAULT_TOKEN")  # 仅短时令牌,来自 Sidecar 注入
client = hvac.Client(url=VAULT_ADDR, token=VAULT_TOKEN)

def get_secret(path: str, key: str) -> str:
    # 只读拉取;失败返回空,避免暴露异常细节
    try:
        data = client.secrets.kv.v2.read_secret_version(path=path)
        return data["data"]["data"].get(key, "")
    except Exception:
        return ""

要点:

  • Vault 配置最小权限与短 TTL;使用 AppRole/mTLS 绑定;
  • 禁止在日志中打印密钥值;仅记录路径与成功/失败状态。

异步处理:长耗时扫描任务的状态回调

模式:将任务入队(队列/线程池),即时返回 job_id;客户端通过 Resource 或 Webhook 订阅状态/结果。

import queue, threading, time, json, os
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("AsyncMCP")
JOBS = {}
Q = queue.Queue()
RES_ROOT = os.path.abspath("./async_results")
os.makedirs(RES_ROOT, exist_ok=True)

# 工作线程
def worker():
    while True:
        job_id, target = Q.get()
        # 1) 执行长耗时扫描(示例用 sleep 代替)
        time.sleep(5)
        findings = {"target": target, "status": "done", "open_ports": [80,443]}
        # 2) 写入资源文件
        path = os.path.join(RES_ROOT, f"job_{job_id}.json")
        with open(path, "w") as f:
            f.write(json.dumps(findings))
        JOBS[job_id] = {"status":"done", "path": path}
        Q.task_done()

threading.Thread(target=worker, daemon=True).start()

@mcp.tool()
def submit_scan(target: str) -> str:
    job_id = str(int(time.time()*1000))
    JOBS[job_id] = {"status": "queued"}
    Q.put((job_id, target))
    return json.dumps({"job_id": job_id, "status": "queued"})

@mcp.resource("job://status")
def job_status() -> str:
    # 最新作业状态(也可做 resources/list + resources/read 按 job_id)
    latest = sorted(JOBS.keys())[-1] if JOBS else None
    if not latest:
        return json.dumps({"error":"no jobs"})
    return json.dumps({latest: JOBS[latest]})

可选:

  • Webhook 模式:作业完成后 POST 到内部告警通道(带签名);
  • 轮询模式:客户端每隔 N 秒读取 job://statusjob://{id} 资源。

后续增强建议与路线图

1) 零信任与认证细化

  • mTLS 证书轮换 Playbook(签发、分发、失效与回滚流程)
  • Bearer Token 的签名与校验方案(HMAC/JWT),密钥滚动与吊销策略
  • Host 侧权限分级与会话隔离(多租户场景)

2) RBAC/IAM 最小权限样例库

  • K8s Role/RoleBinding 最小权限 YAML(仅 get/list 一组)
  • AWS IAM Policy(S3/GCS 只读、CloudWatch 只写、Secrets Manager 只读)
  • GitOps 管理策略:PR 审核 + 变更审批门

3) 模板治理的“白名单矩阵”

  • Nuclei 可用模板清单(safe/discovery 分类)与企业自定义模板的审核表
  • NSE/Nuclei 模板风险分级(探测/弱探测/POC/潜在DoS)与生产环境允许列表
  • 模板变更评审表:影响评估、测试报告、签名与版本锁定

4) 误报/漏报治理与处置流程

  • 误报判定标准与处置路径(降级、列入忽略、模板修正)
  • 漏报检测方法(样本基准、对照扫描器与人工复核)
  • 告警去重与阈值策略(同一目标/模板/指纹的去重规则)

5) 性能与成本看板(SLO/SLA)

  • 核心指标:工具调用成功率、P95/P99 时延、失败 Top5、模板命中率、告警去重率
  • 成本维度:计算/网络流量占用、模板执行总时长、WAF 命中率与封禁风险
  • Prometheus/Grafana 示例仪表(抓取 OTel 指标与自定义计数器)

6) 红队剧本仓库骨架

  • 目录结构:scenarios/(攻防链路)、playbooks/(动作清单)、evidence/(脱敏证据)
  • 受控演练 SOP:审批门 → 执行 → 取证 → 审计入库 → 复盘报告
  • 演练成果沉淀:问题清单、修复项、二次验证脚本

7) 合规映射与审计留痕

  • 合规条款映射(ISO 27001/27002、SOC 2、NIST 800 系列)到控制项(认证、审计、密钥管理、最小权限)
  • 数据保留策略与隐私条款:日志保留期限、脱敏字段、访问稽核
  • 电子取证(Forensics)要点:时间线、证据保全、链路完整性

8) 容器与网络硬化增补

  • 容器运行时加固(Seccomp、AppArmor、只读根文件系统、非 root 用户)
  • 出站 egress 控制示例(仅白名单域/IP,阻断云元数据地址)
  • 供应链安全(SBOM、依赖扫描、镜像签名与策略)

9) 高级异步与队列治理

  • 重试/回退策略(指数退避 + 抖动),最大重试次数与死信队列
  • Webhook 签名(HMAC)与幂等处理、事件去重
  • 队列指标:积压量、处理速率、失败率与最长等待时间

10) 测试与质量保障

  • 单元测试(工具参数校验、资源读写函数)
  • 集成测试(端到端工作流:资产 → 指纹 → 漏洞 → 告警)
  • Mock/Stub:外部 API 与扫描器输出的可控模拟;回归测试集

11) Host 专用提示模板库(Prompt Library)

  • Claude Desktop/VS Code 插件的自然语言模板,绑定工作流 ID 与场景参数
  • 高危操作→审批门 的统一提示语与拒绝模板(可直接复制)
  • 常见任务的“安全提示模板”卡片(资产巡检、泄露核查、CVE确认、阈值告警)

零信任与认证细化(mTLS 轮换 · Bearer 签名/滚动 · 会话隔离)

A) mTLS 证书轮换 Playbook(示例:OpenSSL)

目标:为 MCP Router 与 Gateway 配置双向 TLS,支持定期轮换与快速回滚。

  1. 生成 CA 与服务端/客户端证书(短 TTL)
# 生成 CA(离线保存)
openssl genrsa -out ca.key 4096
openssl req -x509 -new -nodes -key ca.key -sha256 -days 365 \
  -subj "/C=CN/ST=Shanghai/O=Corp/OU=Sec/CN=Corp-CA" -out ca.crt

# 服务端证书(72h TTL,示例)
openssl genrsa -out server.key 4096
openssl req -new -key server.key -out server.csr -subj \
  "/C=CN/ST=Shanghai/O=Corp/OU=MCP/CN=mcp.internal.corp"
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial \
  -out server.crt -days 3 -sha256 -extfile <(cat <<EOF
subjectAltName=DNS:mcp.internal.corp,IP:10.10.10.10
EOF
)

# 客户端证书(Host)
openssl genrsa -out client.key 4096
openssl req -new -key client.key -out client.csr -subj \
  "/C=CN/ST=Shanghai/O=Corp/OU=Host/CN=host.client"
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial \
  -out client.crt -days 3 -sha256
  1. Gateway(Nginx/Envoy)加载新证书并灰度:
server {
  listen 443 ssl http2;
  server_name mcp.internal.corp;
  ssl_certificate     /etc/certs/server.crt;
  ssl_certificate_key /etc/certs/server.key;
  ssl_client_certificate /etc/certs/ca.crt;
  ssl_verify_client on;
  # 通过路由权重实现灰度(可在 Envoy 使用 weighted_clusters)
  location /sse { proxy_pass <http://mcp-router:8000> }
}
  1. 轮换策略:
  • TTL:≤72h;重发前 24h 通知;
  • 回滚:保留上一版证书与配置副本,支持一键回退;
  • 监控:连接失败率、握手错误、证书即将过期告警。

B) Bearer Token(JWT)签名/滚动/吊销

建议使用 RS256(不对称)或 HS256(对称,需严格保管)

验证中间件(示例):

import jwt, time, os
from starlette.responses import JSONResponse

JWT_ISSUER = "mcp.corp"
JWT_AUDIENCE = "mcp-host"
JWT_PUBKEY = os.getenv("JWT_PUBKEY")

def verify_jwt(auth_header: str):
    if not auth_header or not auth_header.startswith("Bearer "):
        return False, "Missing Token"
    token = auth_header.split(" ")[1]
    try:
        payload = jwt.decode(token, JWT_PUBKEY, algorithms=["RS256"], \
            audience=JWT_AUDIENCE, issuer=JWT_ISSUER)
        now = int(time.time())
        if payload.get("nbf",0) > now or payload.get("exp",0) < now:
            return False, "Token expired/not before"
        if payload.get("scope") not in ["read", "workflow:scan"]:
            return False, "Invalid scope"
        # 可选:去 Redis 检查吊销列表(jti)
        return True, payload
    except Exception as e:
        return False, str(e)

滚动与吊销:

  • Key Rotation:采用 JWKS(公钥集合)发布,Host 定期拉取;
  • 吊销列表:按 jti 存储于 Redis,命中则拒绝;
  • 短 TTL:建议 ≤1h;高危操作使用更短 TTL 的 step-up token。

C) 会话隔离(多租户/多 Host)

  • 令牌作用域(scope):按工作流/只读资源划分,例如 workflow:asset_scanresource:read
  • 命名空间隔离:在 MCP Router 对每个租户/Host 绑定隔离的资源目录与速率阈值;
  • 速率与并发:按 tenant_id/host_id 维度限速,避免相互影响;
  • 会话 TTL:闲置超时自动关闭;审批门过期需重新确认。

误报/漏报治理

A) 判定标准

  • 误报(False Positive):模板命中但不满足业务/技术证据标准(如无有效凭据泄露、非生产路径),要求证据字段与二次验证失败;
  • 漏报(False Negative):已知漏洞或暴露项应命中但未命中,通常由速率/并发、WAF 拦截或模板缺陷导致;
  • 严重性映射:结合 CVSS、业务资产重要性,统一规则表(critical/high/medium/low)。

B) 去重与阈值

  • 去重键:(target, template_id, fingerprint)
  • 时间窗口:在 24h 内同一去重键仅保留一次告警;
  • 阈值策略:相同目标/模板在 1h 内超过 N 次(如 5 次)则进入降噪,不再投递 IM,仅入库;

示例(Python 聚合):

from collections import defaultdict

def dedup_alerts(alerts):
    seen = set(); result = []
    for a in alerts:
        fp = (a["target"], a["id"], a.get("fingerprint",""))
        if fp in seen: continue
        seen.add(fp); result.append(a)
    return result

C) 复核流程(SOP)

  1. 自动聚合 → 去重 → 阈值过滤 → 写入 SIEM(仅摘要);
  2. 复核人领取:检查证据(脱敏后)与模板版本;
  3. 二次验证:轻量二次扫描或静态校验;
  4. 结论与处置:误报→忽略并记录;漏报→修复模板或限速策略;
  5. 反馈闭环:更新白名单/黑名单与场景映射;每周审查误报/漏报比。

Host 提示模板库(Claude / VS Code 专用)

A) Claude Desktop(对话模板)

  1. 安全巡检(资产 → 指纹)
请使用【预定义工作流】完成安全巡检:
- Workflow ID:wf_asset_scan
- 目标:192.168.1.0/24(白名单网段)
- Nmap:T3 + SYN,Top-1000,rl=50,bs=2;禁 DNS 反解析
- 输出:JSON 摘要(ip/open_ports/service/version,evidence≤100)
- 注意:若发现 HTTP/HTTPS 服务,再执行 tech_detect 场景指纹识别
  1. 泄露核查(exposures/token)
对 <http://192.168.10.55> 执行敏感信息泄露核查:
- 场景:token_leak(模板白名单)
- 速率并发:rl=50,bs=2;超时 120s
- 审批门:无(只读场景)
- 输出:id/name/severity/evidence≤100;不返回原始响应体
  1. 高危 CVE 快速确认(需审批)
请针对 <http://10.0.0.5> 执行高危 CVE 快速确认:
- 工作流:wf_vuln_verify → cve_critical 场景
- 审批门:先发起审批窗,通过后继续
- 限速并发:rl=50,bs=2
- 输出:JSON 摘要;若 severity≥high,调用 send_security_alert 写入 SIEM
  1. 拒绝模板(超范围/异常)
请求被拒绝:该操作需要预定义工作流且目标在白名单内。
请提供:Workflow ID、授权目标(IP/域/CIDR)、速率并发、模板场景与输出格式。
如为高危操作,需经审批通过后才能继续。

B) VS Code(命令面板 / 任务片段)

tasks.json 示例,将自然语言意图映射到 MCP 调用脚本:

{
  "version": "2.0.0",
  "tasks": [
    {
      "label": "MCP: Asset Scan",
      "type": "shell",
      "command": "mcp call NmapScanner scan_network '{\"target_ip\":\"${input:ip}\"}'",
      "problemMatcher": []
    },
    {
      "label": "MCP: Nuclei token_leak",
      "type": "shell",
      "command": "mcp call NucleiScanner run_vuln_scan '{\"target\":\"${input:url}\",\"scenario\":\"token_leak\"}'"
    }
  ],
  "inputs": [
    { "id": "ip", "type": "promptString", "description": "目标 IP (白名单内)" },
    { "id": "url", "type": "promptString", "description": "目标 URL (白名单与授权)" }
  ]
}

审批门:通过执行前置脚本 precheck.sh 检查当前用户是否通过审批窗:

#!/bin/bash
# precheck.sh:检验审批状态(可从 IM/工单系统查询)
STATUS=$(curl -s <https://approval.example/api/status?user=$USER>
if [[ "$STATUS" != "approved" ]]; then
  echo "需要审批:请在 IM 中确认高危操作。"
  exit 1
fi

tasks.json 里为高危任务设置 dependsOn: precheck 或将 command 包装为 precheck.sh && 实际命令