From d01c4d3d0665f1a289f5735ab33b29e474c71e61 Mon Sep 17 00:00:00 2001 From: dark Date: Mon, 1 Jun 2026 16:45:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E5=96=84=E4=BA=A4=E4=BA=92?= =?UTF-8?q?=E5=BC=8F=E9=83=A8=E7=BD=B2=E4=B8=8E=20MCP/LLM=20=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 MCP client 配置加载,支持 CLI/chat 通过配置文件接入 MCP - 完善 chat 交互命令,支持参数查看、事件查看、checkpoint 列表与加载 - 增加 LLM action 后诊断能力,支持真实 LLM 和本地规则兜底 - 将 chat 人工确认点接入 LangGraph interrupt/checkpointer - 更新 README、流程图、待办文档和打包说明 - 补充相关单元测试 --- README.md | 52 +++- docs/current_logic_flow.md | 32 ++- docs/todo.md | 21 ++ packaging/README_linux_package.md | 4 +- packaging/README_packaged_agent.md | 84 ++++++- packaging/build_linux_self_contained.sh | 18 +- packaging/mcp_client.example.json | 23 ++ pam_deploy_graph/agent.py | 55 ++++ pam_deploy_graph/cli.py | 30 ++- pam_deploy_graph/interactive.py | 289 +++++++++++++++++++++- pam_deploy_graph/langgraph_runtime.py | 148 +++++++++++ pam_deploy_graph/llm/base.py | 19 +- pam_deploy_graph/llm/openai_compatible.py | 44 +++- pam_deploy_graph/llm/prompts.py | 19 ++ pam_deploy_graph/llm/rule_based.py | 57 +++++ pam_deploy_graph/mcp_client.py | 69 ++++++ pam_deploy_graph/mcp_factory.py | 28 +++ pam_deploy_graph/models.py | 14 ++ pyproject.toml | 1 + tests/test_agent_flow.py | 27 ++ tests/test_interactive_cli.py | 52 +++- tests/test_langgraph_runtime.py | 54 ++++ tests/test_llm_structured.py | 48 ++++ tests/test_mcp_client.py | 28 ++- 24 files changed, 1189 insertions(+), 27 deletions(-) create mode 100644 docs/todo.md create mode 100644 packaging/mcp_client.example.json create mode 100644 pam_deploy_graph/langgraph_runtime.py create mode 100644 pam_deploy_graph/mcp_factory.py create mode 100644 tests/test_langgraph_runtime.py diff --git a/README.md b/README.md index 9ff6542..72500b1 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ pam_deploy_graph/ params_loader.py # 读取 JSON 或 config.txt 风格参数文件 llm/ # LLM structured output 接口、真实 HTTP client、提示词、规则 fallback 和 guardrails graph.py # LangGraph StateGraph 集成入口 + langgraph_runtime.py # chat 人工确认点的 LangGraph interrupt 运行器 mcp_client.py # MCP session/callable adapter 与 client 配置读取 interactive.py # 常驻式 CLI 对话框,会话命令、确认和续跑 cli.py # CLI 入口 @@ -42,10 +43,12 @@ tests/ docs/ current_logic_flow.md # 当前整体逻辑结构流程图 + todo.md # chat 优化和 LLM action 后分析待办 packaging/ build_linux_self_contained.sh # Linux 解压即用包构建脚本 README_linux_package.md # Linux 打包说明和包大小评估 + mcp_client.example.json # MCP stdio 配置示例 ``` ## 当前进度 @@ -70,11 +73,15 @@ packaging/ - 增加规则 fallback `RuleBasedLlmClient`,用于本地开发和测试。 - 增加 LLM 输出 guardrails,禁止计划中出现可执行脚本命令和非法 action。 - 引入 `langgraph` 依赖,并提供 `build_langgraph()` 图工厂。 +- chat 人工确认点已接入 LangGraph interrupt/checkpointer:`run` 到待回滚确认时暂停,`approve/reject` 通过 `Command(resume=...)` 恢复。 - 引入 MCP client adapter,可包装 SDK session 或普通 callable,并提供 JSON client 配置读取。 +- CLI/chat 支持 `--mcp-config` 直接加载 stdio MCP 配置并构造 MCP runner。 - 本地已安装 `langgraph` 和 `mcp`,并完成 LangGraph fake 全局流程 smoke。 - CLI `analyze` 输出已做敏感字段脱敏。 -- 增加 `chat` 常驻式 CLI 对话框,支持自然语言分析、参数设置、执行确认、回滚确认、状态查看和续跑。 -- 添加基础测试,当前本地结果为 `31 passed, 1 skipped`。 +- 增加 `chat` 常驻式 CLI 对话框,支持自然语言分析、参数设置、执行确认、回滚确认、状态查看、事件查看、checkpoint 选择和续跑。 +- chat 可选启用 `rich` / `prompt_toolkit`,支持更清晰输出、命令补全和输入历史。 +- 增加 action 后 LLM/规则诊断,可通过 `--analyze-actions` 或 `llm action-analysis on` 显式开启。 +- 添加基础测试,当前本地结果为 `37 passed, 1 skipped`。 未完成: @@ -108,16 +115,25 @@ python -m pam_deploy_graph.cli analyze \ ## MCP Client 配置 -真实 MCP session 由外部接入,Agent 只依赖同步 `call_tool(name, arguments)` 接口。接入方式: +CLI/chat 已支持通过 `--mcp-config` 直接加载 MCP 配置。当前内置支持 stdio transport;配置文件里提供 MCP server 启动命令后,Agent 会在调用 PAM_NODE action 时创建 MCP stdio session。 + +CLI 示例: + +```bash +python -m pam_deploy_graph.cli chat \ + --config doc_scripts/config.txt.example \ + --strategy hybrid_node_mcp \ + --mcp-config mcp_client.json \ + --checkpoint runtime/checkpoints/demo.json +``` + +代码内嵌方式: ```python from pam_deploy_graph.agent import PamDeployAgent -from pam_deploy_graph.mcp_client import SessionMcpToolClient, load_mcp_client_config -from pam_deploy_graph.mcp_runner import McpActionRunner +from pam_deploy_graph.mcp_factory import build_mcp_runner_from_config -config = load_mcp_client_config("mcp_client.json") -client = SessionMcpToolClient(session) # session 是你接入真实 MCP 后得到的 SDK session -runner = McpActionRunner(client=client, tool_names=config.tool_names or None) +runner = build_mcp_runner_from_config("mcp_client.json") agent = PamDeployAgent(mcp_runner=runner) ``` @@ -126,6 +142,14 @@ agent = PamDeployAgent(mcp_runner=runner) ```json { "server_name": "pam-node-prod", + "transport": "stdio", + "command": "/opt/pam-node-mcp/server", + "args": ["--stdio"], + "cwd": "/opt/pam-node-mcp", + "env": { + "PAM_NODE_ENV": "prod" + }, + "timeout_seconds": 60, "tool_names": { "get-online-ips": "pam_get_online_ips", "create-download-task": "pam_create_download_task", @@ -164,7 +188,7 @@ dist/linux_self_contained/pam-deploy-agent-linux-x86_64/ dist/linux_self_contained/pam-deploy-agent-linux-x86_64.tar.gz ``` -发布包内的 `doc_scripts` 只包含运行必需文件:`deploy.sh`、`config.txt.example`、`PAM_AUTO_DEPLY_SKILL.md`。发布包内的 `README.md` 使用 `packaging/README_packaged_agent.md`,只介绍打包后 Agent 的使用方式。 +发布包内的 `doc_scripts` 只包含运行必需文件:`deploy.sh`、`config.txt.example`、`PAM_AUTO_DEPLY_SKILL.md`。发布包内的 `README.md` 使用 `packaging/README_packaged_agent.md`,只介绍打包后 Agent 的使用方式;同时会带上 `mcp_client.example.json` 作为 MCP 配置示例。 目标机器解压后运行: @@ -192,12 +216,18 @@ PAM> set VERSION_NUMBER=2.0.6 PAM> run 即将执行真实 action;确认执行请输入 yes: yes PAM> status +PAM> params +PAM> events 5 +PAM> llm action-analysis on +PAM> mcp config mcp_client.example.json +PAM> list checkpoints +PAM> load checkpoint runtime/checkpoints/chat-demo.json PAM> approve PAM> resume PAM> exit ``` -`chat` 默认仍要求在会话内显式输入 `run` 和 `yes` 才会执行 action;如果某个 IP 失败,会提示输入 `approve` 或 `reject [原因]`。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model`,配置方式和 `analyze` 一致。 +`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action;如果某个 IP 失败,会通过 LangGraph interrupt 暂停并提示输入 `approve` 或 `reject [原因]`,确认后恢复同一个图线程继续执行。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model`、`--mcp-config` 和 `--analyze-actions`。 预演: @@ -248,5 +278,5 @@ pytest -q 1. 接入真实 PAM_NODE MCP session,并用 `SessionMcpToolClient` 包装。 2. 在测试环境中做 smoke:HOME 脚本 `get-token/get-node-url` + NODE MCP `get-online-ips`。 -3. 把当前 checkpoint/confirmation 语义继续接入 LangGraph interrupt/checkpointer。 +3. 在测试环境验证真实脚本 action 的失败、回滚确认和续跑链路。 4. 继续细化参数确认、IP 范围确认的交互式 UI 或上层编排。 diff --git a/docs/current_logic_flow.md b/docs/current_logic_flow.md index b3aae77..e5bedbf 100644 --- a/docs/current_logic_flow.md +++ b/docs/current_logic_flow.md @@ -20,20 +20,25 @@ flowchart TD CLI --> AGENT[PamDeployAgent] CHAT --> AGENT + CHAT --> LGR[langgraph_runtime.py chat interrupt 运行器] PARAMS --> AGENT RULE --> AGENT REAL --> AGENT + LGR --> AGENT + LGR --> LGCHECK[LangGraph InMemorySaver checkpointer] AGENT --> ROUTER[ActionRouter] ROUTER --> SCRIPT[ScriptActionRunner] ROUTER --> MCP[McpActionRunner] ROUTER --> FAKE[FakeActionRunner] SCRIPT --> DEPLOY[doc_scripts/deploy.sh 或 deploy.ps1] - MCP --> MCPCLIENT[mcp_client.py: Session/Function adapter] + MCP --> MCPFACTORY[mcp_factory.py 读取 --mcp-config] + MCPFACTORY --> MCPCLIENT[mcp_client.py: stdio/Session/Function adapter] FAKE --> FIXTURE[测试 fixture 或默认 fake 返回值] AGENT --> CHECKPOINT[checkpoint_store.py] + AGENT --> ACTIONLLM[action 后 LLM/规则诊断] AGENT --> REPORT[render_report 部署报告] ``` @@ -99,6 +104,22 @@ flowchart LR C -- PAM_NODE action --> NM[MCP tool 执行] ``` +## action 后诊断 + +```mermaid +flowchart TD + A[action 执行完成] --> B{是否开启 analyze-actions} + B -- 否 --> X[只记录 ACTION_DONE/ACTION_FAIL] + B -- 是 --> C[整理 ActionResult 和 AgentState 摘要] + C --> D[敏感字段脱敏并截断长日志] + D --> E{真实 LLM 是否配置} + E -- 是 --> F[OpenAICompatibleLlmClient 输出结构化诊断] + E -- 否 --> G[RuleBasedLlmClient 本地规则诊断] + F --> H[追加 ACTION_ANALYSIS 事件] + G --> H + H --> I[诊断只作建议,不自动继续/回滚/改参数] +``` + ## 失败、人工确认和续跑 ```mermaid @@ -110,7 +131,11 @@ flowchart TD E --> F[设置 pending_confirmation=rollback-ip:IP] F --> G[保存 checkpoint 并暂停] - G --> H{用户决定} + G --> LG{是否来自 chat} + LG -- 是 --> LGI[LangGraph interrupt 输出确认请求] + LGI --> LGRS[approve/reject 通过 Command resume 恢复] + LGRS --> H{用户决定} + LG -- 否 --> H{用户决定} H -- approve --> I[confirm_pending 执行 rollback-ip] I --> J{rollback 是否成功} J -- 是 --> K[清空 pending_confirmation] @@ -128,10 +153,11 @@ flowchart TD - `ip_states[ip].status == SUCCESS`:成功 IP 会跳过。 - `ip_states[ip].completed_steps`:同一个 IP 已完成的 action 会跳过。 - `pending_confirmation`:存在待确认事项时,部署流程不继续执行,必须先 `approve` 或 `reject`。 +- chat 会话内的确认点由 `langgraph_runtime.py` 通过 LangGraph interrupt 和 InMemorySaver 托管;命令行一次性 `confirm/resume` 仍读取业务 checkpoint JSON。 - checkpoint 为了真实续跑会保存完整参数,请放在受控目录中。 ## 真实外部能力接入点 - 真实 LLM:`llm.openai_compatible.OpenAICompatibleLlmClient`,通过 `PAM_LLM_BASE_URL`、`PAM_LLM_API_KEY`、`PAM_LLM_MODEL` 或 CLI 参数配置。 -- 真实 MCP:外部建立 MCP session 后,用 `SessionMcpToolClient` 包装,再传给 `McpActionRunner`。 +- 真实 MCP:CLI/chat 可通过 `--mcp-config` 加载 stdio MCP 配置,内部由 `mcp_factory.py` 构造 `McpActionRunner`。 - 真实脚本:PAM_HOME action 通过 `doc_scripts/deploy.sh` 或 `deploy.ps1` 调用。 diff --git a/docs/todo.md b/docs/todo.md new file mode 100644 index 0000000..89d47d2 --- /dev/null +++ b/docs/todo.md @@ -0,0 +1,21 @@ +# 待办事项 + +## chat 交互优化 + +- [x] 使用 `rich` 输出表格、状态、错误和报告;未安装时自动降级为普通输出。 +- [x] 使用 `prompt_toolkit` 支持命令补全和历史记录;未安装时自动降级为 `input()`。 +- [x] 增加 `params` 命令,脱敏展示当前会话参数。 +- [x] 增加 `events` 命令,查看最近 action 执行记录。 +- [x] 增加 `load checkpoint` 和 `list checkpoints`,方便选择历史任务续跑。 +- [x] 增加参数确认和目标 IP 范围确认,不只在回滚阶段确认。 +- [x] 增加 LLM/MCP 配置热加载,例如 `llm config`、`mcp config`。 +- [x] 将 chat 的人工确认点接入 LangGraph interrupt/checkpointer;`run` 执行到回滚确认点后由 interrupt 暂停,`approve/reject` 通过 `Command(resume=...)` 恢复同一图线程。跨进程续跑仍保留业务 checkpoint JSON。 + +## LLM action 后分析 + +- [x] 每次 action 完成后,可把 `action`、`backend`、`ok`、`values`、`stderr`、`error_summary` 和当前 `AgentState` 摘要交给 LLM 分析。 +- [x] LLM 输出结构化结果:是否异常、异常等级、可能原因、建议动作、是否需要人工确认。 +- [x] LLM 分析只作为辅助建议,不直接决定继续执行、回滚或修改参数。 +- [x] 本地保留规则兜底:exit code、`verify-ip SUCCESS=false`、pending confirmation 等硬规则优先于 LLM。 +- [x] 对 LLM 输入做脱敏,禁止把 `CLIENT_SECRET`、token、Authorization、完整日志原文发送给模型。 +- [x] 通过 `--analyze-actions` 或 `llm action-analysis on` 显式开启,真实部署默认不启用。 diff --git a/packaging/README_linux_package.md b/packaging/README_linux_package.md index 7edf9d8..dc51069 100644 --- a/packaging/README_linux_package.md +++ b/packaging/README_linux_package.md @@ -19,7 +19,7 @@ bash packaging/build_linux_self_contained.sh ``` -默认会安装 `.[mcp]`,即包含 MCP 可选依赖。如果只想打最小包: +默认会安装 `.[mcp,chat]`,即包含 MCP 可选依赖和 chat 交互增强依赖。如果只想打最小包: ```bash PACKAGE_EXTRAS= bash packaging/build_linux_self_contained.sh @@ -42,6 +42,7 @@ pam-deploy-agent-linux-x86_64/ deploy.sh config.txt.example PAM_AUTO_DEPLY_SKILL.md + mcp_client.example.json README.md LICENSE ``` @@ -50,6 +51,7 @@ pam-deploy-agent-linux-x86_64/ - `doc_scripts` 不会打入项目设计文档、测试脚本、Windows bat/PowerShell 脚本。 - 发布包内的 `README.md` 来自 `packaging/README_packaged_agent.md`,只说明打包后 Agent 的使用方式。 +- 发布包内的 `mcp_client.example.json` 是 MCP stdio 配置示例,需要按真实 MCP server 修改。 - 项目开发用 README 不会复制到发布包内。 ## 解压后运行 diff --git a/packaging/README_packaged_agent.md b/packaging/README_packaged_agent.md index de44e02..0aa07a6 100644 --- a/packaging/README_packaged_agent.md +++ b/packaging/README_packaged_agent.md @@ -12,6 +12,7 @@ pam-deploy-agent-linux-x86_64/ deploy.sh # Linux 脚本 action 入口 config.txt.example # 参数配置示例 PAM_AUTO_DEPLY_SKILL.md + mcp_client.example.json README.md # 当前说明 LICENSE ``` @@ -31,6 +32,9 @@ pam-deploy-agent-linux-x86_64/ ./run.sh run-deploy --help ``` +发布包默认包含 `rich` 和 `prompt_toolkit`。如果终端支持,chat 会自动启用更清晰的输出、命令补全和输入历史;不可用时会自动降级为普通文本输入输出。 +chat 内的失败回滚确认由 LangGraph interrupt 托管;执行停在确认点后,输入 `approve` 或 `reject [原因]` 会恢复同一个图线程继续处理。 + ## 交互式使用 推荐先用 fake 策略验证流程: @@ -39,6 +43,16 @@ pam-deploy-agent-linux-x86_64/ ./run.sh chat --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json ``` +如果要启用 MCP,先按真实 MCP server 修改 `mcp_client.example.json`,再使用 `hybrid_node_mcp`: + +```bash +./run.sh chat \ + --config doc_scripts/config.txt.example \ + --strategy hybrid_node_mcp \ + --mcp-config mcp_client.example.json \ + --checkpoint runtime/checkpoints/demo.json +``` + 进入对话框后可输入: ```text @@ -48,6 +62,12 @@ PAM> set VERSION_NUMBER=2.0.6 PAM> run 即将执行真实 action;确认执行请输入 yes: yes PAM> status +PAM> params +PAM> events 5 +PAM> llm action-analysis on +PAM> mcp config mcp_client.example.json +PAM> list checkpoints +PAM> load checkpoint runtime/checkpoints/demo.json PAM> approve PAM> resume PAM> exit @@ -73,6 +93,28 @@ PAM> exit ./run.sh run-deploy --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json --confirm ``` +执行时开启 action 后诊断: + +```bash +./run.sh run-deploy \ + --config doc_scripts/config.txt.example \ + --strategy fake \ + --checkpoint runtime/checkpoints/demo.json \ + --analyze-actions \ + --confirm +``` + +使用 MCP 的完整部署: + +```bash +./run.sh run-deploy \ + --config doc_scripts/config.txt.example \ + --strategy hybrid_node_mcp \ + --mcp-config mcp_client.example.json \ + --checkpoint runtime/checkpoints/demo.json \ + --confirm +``` + 处理失败后的回滚确认: ```bash @@ -109,14 +151,54 @@ export PAM_LLM_MODEL="your-model-name" --llm-model your-model-name ``` +chat 内也可以热加载 LLM: + +```text +PAM> llm config base_url=https://your-llm.example.com/v1 api_key=your-api-key model=your-model-name +PAM> llm action-analysis on +PAM> llm fallback +``` + ## 策略说明 - `fake`:全部使用 fake runner,不访问真实环境。 - `script_only`:全部 action 走脚本。 - `hybrid_node_mcp`:PAM_HOME 走脚本,PAM_NODE 走 MCP。 +## MCP 配置 + +`--mcp-config` 指向 MCP client JSON 配置文件。当前支持 stdio transport: + +```json +{ + "server_name": "pam-node-prod", + "transport": "stdio", + "command": "/opt/pam-node-mcp/server", + "args": ["--stdio"], + "cwd": "/opt/pam-node-mcp", + "env": { + "PAM_NODE_ENV": "prod" + }, + "timeout_seconds": 60, + "tool_names": { + "get-online-ips": "pam_get_online_ips", + "verify-ip": "pam_verify_ip", + "rollback-ip": "pam_rollback_ip" + } +} +``` + +字段说明: + +- `command`:MCP server 启动命令。 +- `args`:MCP server 启动参数。 +- `cwd`:MCP server 工作目录,可为空。 +- `env`:传给 MCP server 的环境变量,可为空。 +- `timeout_seconds`:单次 tool 调用超时时间。 +- `tool_names`:Agent action 到 MCP tool name 的映射。 + ## 注意事项 - 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL`、`CLIENT_ID`、`CLIENT_SECRET`、`AIRPORT_CODE`、`APP_NAME`、`MODULE_NAME`、`VERSION_NUMBER`、`ZIP_FILE_PATH`。 - `checkpoint` 会保存完整运行参数,请放在受控目录。 -- 真实 MCP session 需要你在外部接入;当前包包含 MCP client adapter 和 action 映射能力。 +- `hybrid_node_mcp`、`resume`、`confirm` 如果需要执行 MCP action,请同时传入 `--mcp-config`。 diff --git a/packaging/build_linux_self_contained.sh b/packaging/build_linux_self_contained.sh index 0f4758c..b2e7e94 100644 --- a/packaging/build_linux_self_contained.sh +++ b/packaging/build_linux_self_contained.sh @@ -9,7 +9,7 @@ cd "$ROOT_DIR" PYTHON_BIN="${PYTHON_BIN:-python3}" APP_NAME="pam-deploy-agent" RELEASE_NAME="${APP_NAME}-linux-x86_64" -PACKAGE_EXTRAS="${PACKAGE_EXTRAS:-mcp}" +PACKAGE_EXTRAS="${PACKAGE_EXTRAS:-mcp,chat}" BUILD_DIR="${BUILD_DIR:-$ROOT_DIR/build/linux_self_contained}" DIST_DIR="${DIST_DIR:-$ROOT_DIR/dist/linux_self_contained}" RELEASE_DIR="$DIST_DIR/$RELEASE_NAME" @@ -70,6 +70,7 @@ cp -a doc_scripts/PAM_AUTO_DEPLY_SKILL.md "$RELEASE_DIR/doc_scripts/PAM_AUTO_DEP chmod +x "$RELEASE_DIR/doc_scripts/deploy.sh" cp -a packaging/README_packaged_agent.md "$RELEASE_DIR/README.md" +cp -a packaging/mcp_client.example.json "$RELEASE_DIR/mcp_client.example.json" cp -a LICENSE "$RELEASE_DIR/LICENSE" cat > "$RELEASE_DIR/run.sh" <<'RUN_SCRIPT' @@ -112,10 +113,19 @@ PAM 部署 Agent 解压即用包 --target-ip 指定目标工作站 IP。可重复传入多次。 + --mcp-config <路径> + MCP client JSON 配置文件。hybrid_node_mcp 策略、resume 或 confirm + 需要执行 MCP action 时使用。 + 示例:mcp_client.example.json + --confirm 非交互命令执行真实 action 前必须显式传入。 chat 模式会在会话中要求输入 run 和 yes。 + --analyze-actions + 每个 action 完成后追加 LLM/规则诊断建议。诊断只作为辅助建议, + 不会自动决定继续、回滚或修改参数。 + LLM 参数: --llm-base-url OpenAI-compatible LLM 服务地址,例如 https://example.com/v1 @@ -134,6 +144,8 @@ LLM 环境变量: 示例: ./run.sh chat --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json + ./run.sh chat --config doc_scripts/config.txt.example --strategy hybrid_node_mcp --mcp-config mcp_client.example.json --checkpoint runtime/checkpoints/demo.json + ./run.sh analyze --config doc_scripts/config.txt.example --text "请用 MCP 预演部署 HET PAM Node 版本 2.0.5,不要动环境" ./run.sh run-deploy --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json --confirm @@ -148,7 +160,9 @@ LLM 环境变量: 说明: 1. 本包已包含 Python 运行时和 Python 依赖,目标机器不需要安装 Python 包。 2. doc_scripts 只包含运行必需文件:deploy.sh、config.txt.example、PAM_AUTO_DEPLY_SKILL.md。 - 3. checkpoint 会保存完整运行参数,请放在受控目录。 + 3. mcp_client.example.json 是 MCP stdio 配置示例,需要按真实 MCP server 修改。 + 4. chat 内可使用 params、events、list checkpoints、load checkpoint、llm config、mcp config 等命令。 + 5. checkpoint 会保存完整运行参数,请放在受控目录。 HELP_TEXT } diff --git a/packaging/mcp_client.example.json b/packaging/mcp_client.example.json new file mode 100644 index 0000000..6a96c02 --- /dev/null +++ b/packaging/mcp_client.example.json @@ -0,0 +1,23 @@ +{ + "server_name": "pam-node-prod", + "transport": "stdio", + "command": "/opt/pam-node-mcp/server", + "args": ["--stdio"], + "cwd": "/opt/pam-node-mcp", + "env": { + "PAM_NODE_ENV": "prod" + }, + "timeout_seconds": 60, + "tool_names": { + "get-online-ips": "pam_get_online_ips", + "create-download-task": "pam_create_download_task", + "poll-download-progress": "pam_poll_download_progress", + "upgrade-ip": "pam_upgrade_ip", + "poll-upgrade-progress": "pam_poll_upgrade_progress", + "start-ip": "pam_start_ip", + "stop-ip": "pam_stop_ip", + "verify-ip": "pam_verify_ip", + "download-log": "pam_download_log", + "rollback-ip": "pam_rollback_ip" + } +} diff --git a/pam_deploy_graph/agent.py b/pam_deploy_graph/agent.py index ee04687..ce508b5 100644 --- a/pam_deploy_graph/agent.py +++ b/pam_deploy_graph/agent.py @@ -7,6 +7,7 @@ from __future__ import annotations import time +from dataclasses import asdict from pathlib import Path from typing import Any @@ -33,6 +34,7 @@ class PamDeployAgent: mcp_runner: McpActionRunner | None = None, fake_runner: FakeActionRunner | None = None, llm_client: LlmClient | None = None, + action_analysis_enabled: bool = False, ) -> None: """初始化策略、脚本 runner、MCP runner、fake runner 和 LLM client。""" self.skill_policy = load_skill_policy(skill_path) @@ -41,6 +43,7 @@ class PamDeployAgent: self.fake_runner = fake_runner or FakeActionRunner() self.mcp_runner = mcp_runner self.llm_client = llm_client or RuleBasedLlmClient() + self.action_analysis_enabled = action_analysis_enabled self.router = ActionRouter( script_runner=self.script_runner, mcp_runner=mcp_runner, @@ -180,6 +183,7 @@ class PamDeployAgent: "message": result.error_summary or "ok", } ) + self._append_action_analysis(state, action, result) if not result.ok: state.last_failed_step = action self._save_checkpoint(state) @@ -243,6 +247,7 @@ class PamDeployAgent: "message": result.error_summary or result.values.get("MESSAGE", "ok"), } ) + self._append_action_analysis(state, action, result, ip=ip) if failed: self._record_ip_failure(state, ip, action, result.error_summary or str(result.values)) @@ -322,6 +327,7 @@ class PamDeployAgent: "message": result.error_summary or result.values.get("MESSAGE", "ok"), } ) + self._append_action_analysis(state, "rollback-ip", result, ip=ip) if result.ok: state.pending_confirmation = "" state.last_success_step = "rollback-ip" @@ -427,12 +433,61 @@ class PamDeployAgent: "message": result.error_summary or "尽力下载日志失败", } ) + self._append_action_analysis(state, "download-log", result, ip=ip) def _save_checkpoint(self, state: AgentState) -> None: """如果配置了 checkpoint 路径,则保存完整运行状态。""" if state.checkpoint_path: save_checkpoint(state, state.checkpoint_path, redact=False) + def _append_action_analysis( + self, + state: AgentState, + action: str, + result, + *, + ip: str | None = None, + ) -> None: + """启用 action 后分析时,把诊断结果追加到 events。""" + if not self.action_analysis_enabled: + return + try: + analysis = self.llm_client.analyze_action_result( + action=action, + result=result, + state_summary=self._state_summary_for_llm(state, ip=ip), + ) + except Exception as exc: # pragma: no cover - 诊断失败不应影响部署主流程 + state.events.append( + { + "type": "ACTION_ANALYSIS_FAIL", + "stage": action, + "ip": ip or "", + "message": str(exc), + } + ) + return + payload = asdict(analysis) + payload.update({"type": "ACTION_ANALYSIS", "stage": action}) + if ip: + payload["ip"] = ip + state.events.append(payload) + + def _state_summary_for_llm(self, state: AgentState, *, ip: str | None = None) -> dict[str, Any]: + """生成给 LLM action 分析使用的脱敏状态摘要。""" + return { + "run_id": state.run_id, + "execution_strategy": state.execution_strategy, + "completed_global_steps": state.completed_global_steps, + "online_ip_count": len(state.online_ips), + "target_ips": state.target_ips, + "current_ip": ip or "", + "current_ip_state": state.ip_states.get(ip, {}) if ip else {}, + "pending_confirmation": state.pending_confirmation, + "last_success_step": state.last_success_step, + "last_failed_step": state.last_failed_step, + } + def render_report(self, state: AgentState) -> str: """渲染当前部署状态报告。""" success = sum(1 for item in state.ip_states.values() if item.get("status") == "SUCCESS") diff --git a/pam_deploy_graph/cli.py b/pam_deploy_graph/cli.py index 6000650..226733b 100644 --- a/pam_deploy_graph/cli.py +++ b/pam_deploy_graph/cli.py @@ -10,6 +10,7 @@ from .agent import PamDeployAgent from .checkpoint_store import load_agent_state, redact_mapping from .interactive import run_interactive_chat from .llm import build_llm_client +from .mcp_factory import build_mcp_runner_from_config from .params_loader import load_params_file @@ -20,6 +21,16 @@ def add_llm_args(parser: argparse.ArgumentParser) -> None: parser.add_argument("--llm-model") +def add_mcp_args(parser: argparse.ArgumentParser) -> None: + """为需要执行 MCP action 的子命令追加 MCP 配置参数。""" + parser.add_argument("--mcp-config", help="MCP client JSON 配置文件路径") + + +def add_action_analysis_arg(parser: argparse.ArgumentParser) -> None: + """为执行类子命令追加 action 后诊断开关。""" + parser.add_argument("--analyze-actions", action="store_true", help="每个 action 后追加 LLM/规则诊断建议") + + def require_confirm(args: argparse.Namespace) -> None: """真实执行前强制要求命令行显式传入 --confirm。""" if not getattr(args, "confirm", False): @@ -54,12 +65,16 @@ def main() -> None: chat.add_argument("--target-ip", action="append", default=[]) chat.add_argument("--checkpoint") add_llm_args(chat) + add_mcp_args(chat) + add_action_analysis_arg(chat) run = sub.add_parser("run-global") run.add_argument("--config", required=True) run.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"]) run.add_argument("--checkpoint") run.add_argument("--confirm", action="store_true") + add_mcp_args(run) + add_action_analysis_arg(run) deploy = sub.add_parser("run-deploy") deploy.add_argument("--config", required=True) @@ -67,16 +82,22 @@ def main() -> None: deploy.add_argument("--target-ip", action="append", default=[]) deploy.add_argument("--checkpoint") deploy.add_argument("--confirm", action="store_true") + add_mcp_args(deploy) + add_action_analysis_arg(deploy) resume = sub.add_parser("resume") resume.add_argument("--checkpoint", required=True) resume.add_argument("--confirm", action="store_true") + add_mcp_args(resume) + add_action_analysis_arg(resume) confirm = sub.add_parser("confirm") confirm.add_argument("--checkpoint", required=True) confirm.add_argument("--decision", required=True, choices=["approve", "reject"]) confirm.add_argument("--note", default="") confirm.add_argument("--confirm", action="store_true") + add_mcp_args(confirm) + add_action_analysis_arg(confirm) args = parser.parse_args() params = load_params_file(args.config) if getattr(args, "config", None) else {} @@ -87,7 +108,14 @@ def main() -> None: api_key=args.llm_api_key, model=args.llm_model, ) - agent = PamDeployAgent(llm_client=llm_client) + mcp_runner = None + if getattr(args, "mcp_config", None): + mcp_runner = build_mcp_runner_from_config(args.mcp_config) + agent = PamDeployAgent( + llm_client=llm_client, + mcp_runner=mcp_runner, + action_analysis_enabled=bool(getattr(args, "analyze_actions", False)), + ) if args.command == "analyze": result = agent.analyze_request(args.text, params) diff --git a/pam_deploy_graph/interactive.py b/pam_deploy_graph/interactive.py index e492183..22b9097 100644 --- a/pam_deploy_graph/interactive.py +++ b/pam_deploy_graph/interactive.py @@ -3,12 +3,19 @@ from __future__ import annotations import time +import json +import shlex +import builtins from dataclasses import asdict from pathlib import Path from typing import Any, Callable from .agent import PamDeployAgent from .checkpoint_store import load_agent_state, redact_mapping +from .langgraph_runtime import LangGraphDeploymentRuntime, LangGraphRunResult +from .llm import build_llm_client +from .llm.rule_based import RuleBasedLlmClient +from .mcp_factory import build_mcp_runner_from_config from .models import AgentState, ExecutionStrategy InputFunc = Callable[[str], str] @@ -18,12 +25,20 @@ COMMAND_HELP = """可用命令: help 显示帮助 preview 查看当前参数和执行策略 analyze <需求> 只做理解和计划,不执行 + params 脱敏展示当前会话参数 + events [数量] 查看最近 action 事件,默认 10 条 set KEY=VALUE 修改当前会话参数 + llm config KEY=VALUE 配置真实 LLM,支持 base_url/api_key/model + llm fallback 切回本地规则 fallback + llm action-analysis on|off 开关 action 后诊断 + mcp config <路径> 加载 MCP client JSON 配置 run 创建部署任务并执行 status 查看当前运行状态 approve 确认待处理回滚 reject [原因] 拒绝待处理回滚 resume 从当前 checkpoint 续跑 + list checkpoints 列出 checkpoint 目录下的 JSON 文件 + load checkpoint <路径> 加载指定 checkpoint checkpoint 显示 checkpoint 路径 exit 退出 @@ -51,10 +66,13 @@ class InteractiveCliSession: self.strategy = strategy self.checkpoint_path = checkpoint_path or _default_checkpoint_path() self.target_ips = list(target_ips or []) - self.input = input_func - self.output = output_func + self.input = _build_prompt_input(input_func) + self.output = _build_output_func(output_func) self.state: AgentState | None = None self.last_analysis: dict[str, Any] | None = None + self.llm_config: dict[str, str] = {} + self.mcp_config_path: str = "" + self.graph_runtime: LangGraphDeploymentRuntime | None = None def run(self) -> None: """启动 REPL 循环,直到用户 exit 或输入流结束。""" @@ -88,12 +106,24 @@ class InteractiveCliSession: if normalized == "preview": self.output(self.agent.preview(self.params, self.strategy)) return True + if normalized == "params": + self._show_params() + return True + if normalized == "events": + self._show_events(rest.strip()) + return True if normalized == "analyze": self._analyze(rest.strip()) return True if normalized == "set": self._set_param(rest.strip()) return True + if normalized == "llm": + self._configure_llm(rest.strip()) + return True + if normalized == "mcp": + self._configure_mcp(rest.strip()) + return True if normalized in ("run", "deploy", "execute"): self._run_deploy() return True @@ -112,6 +142,12 @@ class InteractiveCliSession: if normalized == "checkpoint": self.output(f"checkpoint: {self.checkpoint_path}") return True + if normalized == "list" and rest.strip().lower() == "checkpoints": + self._list_checkpoints() + return True + if normalized == "load" and rest.strip().lower().startswith("checkpoint"): + self._load_checkpoint(rest.strip()[len("checkpoint") :].strip()) + return True self._analyze(text) return True @@ -159,12 +195,122 @@ class InteractiveCliSession: self.params[key] = value.strip() self.output(f"已设置 {key}") + def _show_params(self) -> None: + """脱敏展示当前会话参数。""" + self.output(_format_redacted_params(redact_mapping(self.params))) + + def _show_events(self, count_text: str) -> None: + """展示最近若干条事件。""" + if self.state is None or not self.state.events: + self.output("当前没有事件。") + return + try: + count = int(count_text) if count_text else 10 + except ValueError: + self.output("格式:events [数量]") + return + events = self.state.events[-max(count, 1) :] + self.output(json.dumps(redact_mapping(events), ensure_ascii=False, indent=2, default=str)) + + def _configure_llm(self, text: str) -> None: + """热加载 LLM 配置,或开关 action 后诊断。""" + if not text: + self.output("格式:llm config base_url=... api_key=... model=... | llm fallback | llm action-analysis on|off") + return + parts = shlex.split(text) + if parts[0] == "fallback": + self.agent.llm_client = RuleBasedLlmClient() + self.llm_config = {} + self.output("已切回本地规则 LLM fallback。") + return + if parts[0] == "action-analysis": + if len(parts) < 2 or parts[1] not in ("on", "off"): + self.output("格式:llm action-analysis on|off") + return + self.agent.action_analysis_enabled = parts[1] == "on" + self.output(f"action 后诊断已{'开启' if self.agent.action_analysis_enabled else '关闭'}。") + return + if parts[0] != "config": + self.output("未知 llm 命令。") + return + updates = _parse_key_values(parts[1:]) + self.llm_config.update(updates) + try: + self.agent.llm_client = build_llm_client( + base_url=self.llm_config.get("base_url"), + api_key=self.llm_config.get("api_key"), + model=self.llm_config.get("model"), + ) + except Exception as exc: + self.output(f"LLM 配置失败: {exc}") + return + safe = {**self.llm_config} + if safe.get("api_key"): + safe["api_key"] = "***" + self.output("LLM 配置已加载: " + json.dumps(safe, ensure_ascii=False)) + + def _configure_mcp(self, text: str) -> None: + """热加载 MCP client 配置。""" + command, _, path = text.partition(" ") + if command != "config" or not path.strip(): + self.output("格式:mcp config ") + return + path = path.strip().strip('"') + try: + runner = build_mcp_runner_from_config(path) + except Exception as exc: + self.output(f"MCP 配置失败: {exc}") + return + self.agent.mcp_runner = runner + self.agent.router.mcp_runner = runner + self.mcp_config_path = path + self.output(f"MCP 配置已加载: {path}") + + def _list_checkpoints(self) -> None: + """列出当前 checkpoint 目录下的 JSON 文件。""" + checkpoint_dir = Path(self.checkpoint_path).parent + if not checkpoint_dir.exists(): + self.output(f"checkpoint 目录不存在: {checkpoint_dir}") + return + files = sorted(checkpoint_dir.glob("*.json"), key=lambda item: item.stat().st_mtime, reverse=True) + if not files: + self.output(f"checkpoint 目录没有 JSON 文件: {checkpoint_dir}") + return + lines = ["checkpoint 列表:"] + for file in files[:20]: + lines.append(f"- {file}") + self.output("\n".join(lines)) + + def _load_checkpoint(self, path_text: str) -> None: + """加载指定 checkpoint 文件。""" + if not path_text: + self.output("格式:load checkpoint <路径>") + return + checkpoint = Path(path_text) + if not checkpoint.exists(): + self.output(f"checkpoint 不存在: {checkpoint}") + return + self.state = load_agent_state(checkpoint) + self.state.checkpoint_path = str(checkpoint) + self.checkpoint_path = str(checkpoint) + self.params = dict(self.state.params) + self.strategy = self.state.execution_strategy + self.target_ips = list(self.state.target_ips) + self.graph_runtime = None + self.output(f"已加载 checkpoint: {checkpoint}") + if self.state.pending_confirmation: + self._print_confirmation() + def _run_deploy(self) -> None: """在用户确认后创建状态并执行完整部署流程。""" if self.state and self.state.pending_confirmation: self._print_confirmation() return + if not self._confirm_params_and_scope(): + self.output("已取消执行。") + return + if not self._ask_yes_no("即将执行真实 action;确认执行请输入 yes: "): self.output("已取消执行。") return @@ -175,8 +321,20 @@ class InteractiveCliSession: checkpoint_path=self.checkpoint_path, target_ips=self.target_ips, ) + self.graph_runtime = None self._execute_current_state() + def _confirm_params_and_scope(self) -> bool: + """执行前确认参数和目标 IP 范围。""" + self.output(_format_redacted_params(redact_mapping(self.params))) + if not self._ask_yes_no("确认以上参数请输入 yes: "): + return False + if self.target_ips: + self.output("目标 IP: " + ", ".join(self.target_ips)) + else: + self.output("目标 IP: 未指定,将在 get-online-ips 后使用全部在线 IP。") + return self._ask_yes_no("确认目标范围请输入 yes: ") + def _resume(self) -> None: """从内存状态或 checkpoint 文件继续执行部署流程。""" if self.state is None: @@ -186,6 +344,9 @@ class InteractiveCliSession: return self.state = load_agent_state(checkpoint) self.state.checkpoint_path = self.state.checkpoint_path or str(checkpoint) + if self.graph_runtime and self.graph_runtime.waiting_confirmation: + self._print_confirmation() + return self._execute_current_state() def _execute_current_state(self) -> None: @@ -193,7 +354,36 @@ class InteractiveCliSession: if self.state is None: self.output("当前没有运行状态。") return - self.state = self.agent.run_deploy_flow(self.state) + try: + if self.graph_runtime is None or not self.graph_runtime.waiting_confirmation: + self.graph_runtime = LangGraphDeploymentRuntime(agent=self.agent) + result = self.graph_runtime.start(self.state) + except RuntimeError as exc: + self.output(f"LangGraph 确认运行器不可用,降级为本地执行: {exc}") + self.graph_runtime = None + self.state = self.agent.run_deploy_flow(self.state) + self._print_state_report_and_checkpoint() + return + self._apply_graph_result(result) + + def _apply_graph_result(self, result: LangGraphRunResult) -> None: + """把 LangGraph 运行结果同步回 chat 会话并输出用户可见状态。""" + if result.state is not None: + self.state = result.state + if self.state is None: + self.output("当前没有运行状态。") + return + self.output(result.report or self.agent.render_report(self.state)) + if result.interrupted and result.confirmation: + self._print_confirmation_request(result.confirmation) + elif self.state.pending_confirmation: + self._print_confirmation() + self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}") + + def _print_state_report_and_checkpoint(self) -> None: + """输出本地执行路径的状态报告和 checkpoint。""" + if self.state is None: + return self.output(self.agent.render_report(self.state)) if self.state.pending_confirmation: self._print_confirmation() @@ -223,6 +413,15 @@ class InteractiveCliSession: self.output("当前没有待确认任务。") return + if self.graph_runtime and self.graph_runtime.waiting_confirmation: + try: + result = self.graph_runtime.resume(approved=approved, note=note) + except RuntimeError as exc: + self.output(f"LangGraph 确认恢复失败,降级为本地确认: {exc}") + else: + self._apply_graph_result(result) + return + self.state = self.agent.confirm_pending(self.state, approved=approved, operator_note=note) self.output(self.agent.render_report(self.state)) if self.state.pending_confirmation: @@ -235,6 +434,10 @@ class InteractiveCliSession: request = self.agent.build_confirmation_request(self.state) if not request: return + self._print_confirmation_request(request) + + def _print_confirmation_request(self, request: dict[str, Any]) -> None: + """输出指定的人工确认请求。""" self.output("需要人工确认:") self.output(f"- type: {request.get('type')}") if request.get("ip"): @@ -307,3 +510,83 @@ def _format_redacted_params(params: dict[str, Any]) -> str: for key in sorted(params): lines.append(f"- {key}: {params[key]}") return "\n".join(lines) + + +def _parse_key_values(parts: list[str]) -> dict[str, str]: + """解析 KEY=VALUE 参数列表。""" + values: dict[str, str] = {} + for part in parts: + if "=" not in part: + continue + key, value = part.split("=", 1) + if key: + values[key] = value + return values + + +def _build_prompt_input(input_func: InputFunc) -> InputFunc: + """如果安装了 prompt_toolkit,则启用历史记录和命令补全。""" + if input_func is not builtins.input: + return input_func + try: + from prompt_toolkit import PromptSession + from prompt_toolkit.completion import WordCompleter + from prompt_toolkit.history import FileHistory + except ImportError: + return input_func + + commands = [ + "help", + "preview", + "analyze", + "params", + "events", + "set", + "llm config", + "llm fallback", + "llm action-analysis on", + "llm action-analysis off", + "mcp config", + "run", + "status", + "approve", + "reject", + "resume", + "list checkpoints", + "load checkpoint", + "checkpoint", + "exit", + ] + session = PromptSession( + history=FileHistory(str(Path("runtime") / "chat_history.txt")), + completer=WordCompleter(commands, ignore_case=True, sentence=True), + ) + return session.prompt + + +def _build_output_func(output_func: OutputFunc) -> OutputFunc: + """如果安装了 rich,则使用 rich 输出;否则保持原输出函数。""" + if output_func is not builtins.print: + return output_func + try: + from rich.console import Console + from rich.markdown import Markdown + except ImportError: + return output_func + console = Console() + + def rich_print(value: str) -> None: + text = str(value) + stripped = text.lstrip() + if stripped.startswith("{") or stripped.startswith("["): + try: + console.print_json(text) + return + except Exception: + pass + if text.startswith("## ") or "\n| ---" in text: + console.print(Markdown(text)) + return + console.print(text) + + return rich_print diff --git a/pam_deploy_graph/langgraph_runtime.py b/pam_deploy_graph/langgraph_runtime.py new file mode 100644 index 0000000..190e028 --- /dev/null +++ b/pam_deploy_graph/langgraph_runtime.py @@ -0,0 +1,148 @@ +"""chat 人工确认点的 LangGraph interrupt 运行器。""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any +from uuid import uuid4 + +from .agent import PamDeployAgent +from .models import AgentState + + +@dataclass(slots=True) +class LangGraphRunResult: + """一次 LangGraph 执行或恢复后的结果摘要。""" + + state: AgentState | None = None + report: str = "" + confirmation: dict[str, Any] = field(default_factory=dict) + interrupted: bool = False + chunks: list[dict[str, Any]] = field(default_factory=list) + + +class LangGraphDeploymentRuntime: + """用 LangGraph interrupt/checkpointer 托管 chat 中的人工确认流程。""" + + def __init__(self, *, agent: PamDeployAgent, thread_id: str | None = None) -> None: + """初始化图实例和会话线程 ID。""" + self.agent = agent + self.thread_id = thread_id or str(uuid4()) + self._waiting_confirmation = False + self._graph = self._build_graph() + + @property + def waiting_confirmation(self) -> bool: + """返回当前 LangGraph 会话是否停在 interrupt 确认点。""" + return self._waiting_confirmation + + def start(self, state: AgentState) -> LangGraphRunResult: + """从给定 AgentState 开始执行,直到结束或遇到人工确认点。""" + self._waiting_confirmation = False + return self._consume(self._graph.stream({"agent_state": state}, self._config())) + + def resume(self, *, approved: bool, note: str = "") -> LangGraphRunResult: + """把人工确认结果交回 LangGraph,并继续执行。""" + try: + from langgraph.types import Command + except ImportError as exc: # pragma: no cover - 依赖缺失时由调用方降级 + raise RuntimeError("未安装 langgraph,无法恢复 interrupt。") from exc + + decision = {"approved": approved, "note": note} + return self._consume(self._graph.stream(Command(resume=decision), self._config())) + + def _build_graph(self): + """构建 deploy -> confirm interrupt -> deploy 的循环图。""" + try: + from langgraph.checkpoint.memory import InMemorySaver + from langgraph.graph import END, START, StateGraph + from langgraph.types import interrupt + except ImportError as exc: # pragma: no cover - 依赖缺失时由调用方降级 + raise RuntimeError("未安装 langgraph,无法启用 chat interrupt。") from exc + + def deploy_node(state: dict[str, Any]) -> dict[str, Any]: + """执行部署流,遇到 pending_confirmation 时由路由转入确认节点。""" + agent_state = self.agent.run_deploy_flow(state["agent_state"]) + return {"agent_state": agent_state} + + def confirm_node(state: dict[str, Any]) -> dict[str, Any]: + """把确认请求交给 LangGraph interrupt,并在恢复后执行确认动作。""" + agent_state = state["agent_state"] + request = self.agent.build_confirmation_request(agent_state) + decision = interrupt(request) + approved, note = _parse_confirmation_decision(decision) + agent_state = self.agent.confirm_pending( + agent_state, + approved=approved, + operator_note=note, + ) + return {"agent_state": agent_state} + + def report_node(state: dict[str, Any]) -> dict[str, Any]: + """渲染当前状态报告。""" + return {"report": self.agent.render_report(state["agent_state"])} + + def route_after_deploy(state: dict[str, Any]) -> str: + """根据是否存在 pending_confirmation 决定下一步。""" + agent_state = state["agent_state"] + return "confirm" if agent_state.pending_confirmation else "report" + + graph = StateGraph(dict) + graph.add_node("deploy", deploy_node) + graph.add_node("confirm", confirm_node) + graph.add_node("report", report_node) + graph.add_edge(START, "deploy") + graph.add_conditional_edges( + "deploy", + route_after_deploy, + {"confirm": "confirm", "report": "report"}, + ) + graph.add_edge("confirm", "deploy") + graph.add_edge("report", END) + return graph.compile(checkpointer=InMemorySaver()) + + def _config(self) -> dict[str, Any]: + """生成 LangGraph checkpointer 使用的线程配置。""" + return {"configurable": {"thread_id": self.thread_id}} + + def _consume(self, chunks: Any) -> LangGraphRunResult: + """消费 LangGraph stream 输出,提取状态、报告和 interrupt 请求。""" + result = LangGraphRunResult() + for chunk in chunks: + result.chunks.append(chunk) + if "__interrupt__" in chunk: + result.interrupted = True + result.confirmation = _extract_interrupt_value(chunk["__interrupt__"]) + continue + + for value in chunk.values(): + if not isinstance(value, dict): + continue + if isinstance(value.get("agent_state"), AgentState): + result.state = value["agent_state"] + if isinstance(value.get("report"), str): + result.report = value["report"] + + self._waiting_confirmation = result.interrupted + return result + + +def _extract_interrupt_value(interrupts: Any) -> dict[str, Any]: + """从 LangGraph interrupt 对象中提取确认请求字典。""" + if not interrupts: + return {} + first = interrupts[0] + value = getattr(first, "value", first) + return value if isinstance(value, dict) else {"value": value} + + +def _parse_confirmation_decision(value: Any) -> tuple[bool, str]: + """把 interrupt resume 值解析为 approved/note。""" + if isinstance(value, dict): + return bool(value.get("approved", False)), str(value.get("note", "")) + if isinstance(value, bool): + return value, "" + if isinstance(value, str): + normalized = value.strip().lower() + return normalized in ("approve", "approved", "yes", "y", "true"), value + return False, str(value) diff --git a/pam_deploy_graph/llm/base.py b/pam_deploy_graph/llm/base.py index ee5c2ca..9d8a31c 100644 --- a/pam_deploy_graph/llm/base.py +++ b/pam_deploy_graph/llm/base.py @@ -4,7 +4,14 @@ from __future__ import annotations from typing import Any, Protocol -from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult +from pam_deploy_graph.models import ( + ActionResult, + ExecutionStrategy, + LlmActionAnalysis, + LlmDeployPlan, + LlmIntentResult, + LlmParamResult, +) class LlmClient(Protocol): @@ -27,3 +34,13 @@ class LlmClient(Protocol): ) -> LlmDeployPlan: """根据参数和意图生成部署计划。""" ... + + def analyze_action_result( + self, + *, + action: str, + result: ActionResult, + state_summary: dict[str, Any], + ) -> LlmActionAnalysis: + """分析 action 执行结果,并给出辅助诊断建议。""" + ... diff --git a/pam_deploy_graph/llm/openai_compatible.py b/pam_deploy_graph/llm/openai_compatible.py index 73b865c..c5b515a 100644 --- a/pam_deploy_graph/llm/openai_compatible.py +++ b/pam_deploy_graph/llm/openai_compatible.py @@ -20,8 +20,9 @@ from pam_deploy_graph.constants import ( SENSITIVE_KEYS, ) from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult +from pam_deploy_graph.models import ActionResult, LlmActionAnalysis -from .prompts import INTENT_PROMPT, PARAM_PROMPT, PLAN_PROMPT, SYSTEM_PROMPT +from .prompts import ACTION_ANALYSIS_PROMPT, INTENT_PROMPT, PARAM_PROMPT, PLAN_PROMPT, SYSTEM_PROMPT JsonTransport = Callable[[str, dict[str, str], dict[str, Any], float], dict[str, Any]] @@ -127,6 +128,40 @@ class OpenAICompatibleLlmClient: execution_strategy=_string(payload, "execution_strategy", strategy), # type: ignore[arg-type] ) + def analyze_action_result( + self, + *, + action: str, + result: ActionResult, + state_summary: dict[str, Any], + ) -> LlmActionAnalysis: + """调用 LLM 分析 action 结果,返回结构化诊断建议。""" + payload = self._complete_json( + ACTION_ANALYSIS_PROMPT, + { + "action": action, + "result": { + "backend": result.backend, + "ok": result.ok, + "exit_code": result.exit_code, + "tool_name": result.tool_name, + "values": _redact_sensitive(result.values), + "stderr": _truncate_text(result.stderr), + "error_summary": result.error_summary, + }, + "state_summary": _redact_sensitive(state_summary), + }, + ) + return LlmActionAnalysis( + action=_string(payload, "action", action), + has_anomaly=bool(payload.get("has_anomaly", False)), + severity=_string(payload, "severity", "info"), # type: ignore[arg-type] + possible_reason=_string(payload, "possible_reason", ""), + suggested_action=_string(payload, "suggested_action", ""), + requires_confirmation=bool(payload.get("requires_confirmation", False)), + notes=_string_list(payload.get("notes")), + ) + def _complete_json(self, instruction: str, input_payload: dict[str, Any]) -> dict[str, Any]: """发送 chat/completions 请求,并解析 JSON 对象响应。""" request_payload = { @@ -229,6 +264,13 @@ def _redact_sensitive(value: Any) -> Any: return value +def _truncate_text(value: str, limit: int = 1000) -> str: + """截断发送给 LLM 的长文本,避免传入完整日志。""" + if len(value) <= limit: + return value + return value[:limit] + "...[已截断]" + + def _string(payload: dict[str, Any], key: str, default: str) -> str: """安全读取字符串字段。""" value = payload.get(key, default) diff --git a/pam_deploy_graph/llm/prompts.py b/pam_deploy_graph/llm/prompts.py index a9f291d..674eed7 100644 --- a/pam_deploy_graph/llm/prompts.py +++ b/pam_deploy_graph/llm/prompts.py @@ -65,3 +65,22 @@ PLAN_PROMPT = """生成 PAM 部署计划。 计划只能使用允许 action;不要包含可执行脚本命令、命令行参数或密钥。 PAM_HOME action 仍由脚本 action 执行;PAM_NODE action 在 hybrid_node_mcp 策略下走 MCP。 """ + +ACTION_ANALYSIS_PROMPT = """分析一次 PAM action 执行结果。 + +输出 JSON schema: +{ + "action": "...", + "has_anomaly": false, + "severity": "info|low|medium|high", + "possible_reason": "...", + "suggested_action": "...", + "requires_confirmation": false, + "notes": ["..."] +} + +要求: +- 只给诊断建议,不决定继续执行、回滚或修改参数。 +- 如果 exit_code 非 0、ok=false、verify-ip SUCCESS=false、出现 pending_confirmation,应标记异常。 +- 不要输出密钥、token、Authorization 或完整日志原文。 +""" diff --git a/pam_deploy_graph/llm/rule_based.py b/pam_deploy_graph/llm/rule_based.py index 566371f..363371e 100644 --- a/pam_deploy_graph/llm/rule_based.py +++ b/pam_deploy_graph/llm/rule_based.py @@ -11,7 +11,9 @@ from typing import Any from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS from pam_deploy_graph.models import ( + ActionResult, ExecutionStrategy, + LlmActionAnalysis, LlmDeployPlan, LlmIntentResult, LlmParamResult, @@ -145,6 +147,61 @@ class RuleBasedLlmClient: execution_strategy=strategy, ) + def analyze_action_result( + self, + *, + action: str, + result: ActionResult, + state_summary: dict[str, Any], + ) -> LlmActionAnalysis: + """用本地规则分析 action 结果,作为真实 LLM 不可用时的兜底。""" + notes: list[str] = [] + has_anomaly = not result.ok + severity = "info" + possible_reason = "" + suggested_action = "继续观察。" + requires_confirmation = False + + if not result.ok: + severity = "medium" + possible_reason = result.error_summary or "action 返回失败状态。" + suggested_action = "查看 action stderr/raw_output,确认参数、网络和目标服务状态。" + notes.append("硬规则检测到 action 执行失败。") + + if action == "verify-ip": + success = result.values.get("SUCCESS") + if success is not None and str(success).lower() not in ("true", "1", "yes"): + has_anomaly = True + severity = "high" + possible_reason = result.values.get("MESSAGE", "") or "工作站健康检查未通过。" + suggested_action = "先下载日志并人工确认是否执行回滚。" + requires_confirmation = True + notes.append("verify-ip SUCCESS 非成功值。") + + if action == "rollback-ip" and not result.ok: + severity = "high" + suggested_action = "保持待确认状态,人工排查回滚失败原因后重试或转人工处理。" + requires_confirmation = True + notes.append("rollback-ip 失败需要人工处理。") + + if result.values.get("PENDING_AGENT_CONFIRMATION"): + has_anomaly = True + severity = "high" + possible_reason = str(result.values["PENDING_AGENT_CONFIRMATION"]) + suggested_action = "暂停自动流程,等待人工确认。" + requires_confirmation = True + notes.append("action 返回待人工确认标记。") + + return LlmActionAnalysis( + action=action, + has_anomaly=has_anomaly, + severity=severity, # type: ignore[arg-type] + possible_reason=possible_reason, + suggested_action=suggested_action, + requires_confirmation=requires_confirmation, + notes=notes, + ) + def _extract_key_values(self, text: str) -> dict[str, str]: """抽取 KEY=VALUE 形式的参数。""" params: dict[str, str] = {} diff --git a/pam_deploy_graph/mcp_client.py b/pam_deploy_graph/mcp_client.py index 8820739..3bdddd5 100644 --- a/pam_deploy_graph/mcp_client.py +++ b/pam_deploy_graph/mcp_client.py @@ -7,6 +7,7 @@ callable 或 SDK session 适配成这个接口,避免业务代码绑定具体 from __future__ import annotations import json +from datetime import timedelta from collections.abc import Callable from dataclasses import dataclass, field from pathlib import Path @@ -18,6 +19,12 @@ class McpClientConfig: """真实 MCP session 建立后需要传给 runner 的配置。""" server_name: str = "pam-node" + transport: str = "stdio" + command: str = "" + args: list[str] = field(default_factory=list) + env: dict[str, str] | None = None + cwd: str = "" + timeout_seconds: float = 60 tool_names: dict[str, str] = field(default_factory=dict) @classmethod @@ -26,8 +33,20 @@ class McpClientConfig: tool_names = payload.get("tool_names") or payload.get("tools") or {} if not isinstance(tool_names, dict): raise ValueError("MCP tool_names 必须是 JSON object") + args = payload.get("args") or [] + if not isinstance(args, list): + raise ValueError("MCP args 必须是数组") + env = payload.get("env") + if env is not None and not isinstance(env, dict): + raise ValueError("MCP env 必须是 JSON object") return cls( server_name=str(payload.get("server_name", "pam-node")), + transport=str(payload.get("transport", "stdio")), + command=str(payload.get("command", "")), + args=[str(item) for item in args], + env={str(key): str(value) for key, value in env.items()} if env else None, + cwd=str(payload.get("cwd", "")), + timeout_seconds=float(payload.get("timeout_seconds", 60)), tool_names={str(key): str(value) for key, value in tool_names.items()}, ) @@ -74,6 +93,56 @@ class SessionMcpToolClient: return normalize_mcp_sdk_result(result) +class StdioMcpToolClient: + """通过 MCP Python SDK 启动 stdio server 并调用 tool。""" + + def __init__( + self, + *, + command: str, + args: list[str] | None = None, + env: dict[str, str] | None = None, + cwd: str | None = None, + timeout_seconds: float = 60, + ) -> None: + """保存 stdio server 启动参数。""" + if not command: + raise ValueError("stdio MCP 配置必须提供 command") + self.command = command + self.args = list(args or []) + self.env = env + self.cwd = cwd or None + self.timeout_seconds = timeout_seconds + + def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: + """创建一次 MCP stdio session,调用 tool 后关闭 session。""" + try: + import anyio + from mcp import ClientSession + from mcp.client.stdio import StdioServerParameters, stdio_client + except ImportError as exc: # pragma: no cover - 依赖安装状态 + raise RuntimeError("未安装 MCP Python SDK,请安装项目的 mcp 可选依赖") from exc + + async def call_once() -> Any: + server = StdioServerParameters( + command=self.command, + args=self.args, + env=self.env, + cwd=self.cwd, + ) + async with stdio_client(server) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + result = await session.call_tool( + tool_name, + arguments, + read_timeout_seconds=timedelta(seconds=self.timeout_seconds), + ) + return normalize_mcp_sdk_result(result) + + return anyio.run(call_once) + + def normalize_mcp_sdk_result(result: Any) -> Any: """把常见 MCP SDK 返回结构归一化成 dict/list/string。""" if hasattr(result, "structuredContent"): diff --git a/pam_deploy_graph/mcp_factory.py b/pam_deploy_graph/mcp_factory.py new file mode 100644 index 0000000..9e3c7ce --- /dev/null +++ b/pam_deploy_graph/mcp_factory.py @@ -0,0 +1,28 @@ +"""根据配置文件构造 MCP runner。""" + +from __future__ import annotations + +from pathlib import Path + +from .mcp_client import McpClientConfig, StdioMcpToolClient, load_mcp_client_config +from .mcp_runner import McpActionRunner + + +def build_mcp_runner_from_config(path: str | Path) -> McpActionRunner: + """读取 MCP 配置文件,并构造可直接给 Agent 使用的 runner。""" + config = load_mcp_client_config(path) + client = build_mcp_client(config) + return McpActionRunner(client=client, tool_names=config.tool_names or None) + + +def build_mcp_client(config: McpClientConfig): + """根据 transport 类型创建 MCP client。""" + if config.transport == "stdio": + return StdioMcpToolClient( + command=config.command, + args=config.args, + env=config.env, + cwd=config.cwd or None, + timeout_seconds=config.timeout_seconds, + ) + raise ValueError(f"不支持的 MCP transport: {config.transport}") diff --git a/pam_deploy_graph/models.py b/pam_deploy_graph/models.py index d621496..90c8e91 100644 --- a/pam_deploy_graph/models.py +++ b/pam_deploy_graph/models.py @@ -10,6 +10,7 @@ ExecutionStrategy = Literal["hybrid_node_mcp", "script_only", "fake"] IntentName = Literal["deploy", "show_usage", "preview", "query_node_ips", "rollback"] ModePreference = Literal["MCP", "API脚本", "未指定"] StrategyPreference = Literal["hybrid_node_mcp", "script_only", "fake", "未指定"] +ActionAnalysisSeverity = Literal["info", "low", "medium", "high"] @dataclass(slots=True) @@ -88,6 +89,19 @@ class LlmDeployPlan: execution_strategy: StrategyPreference = "未指定" +@dataclass(slots=True) +class LlmActionAnalysis: + """LLM 或规则对单次 action 结果的诊断建议。""" + + action: str + has_anomaly: bool = False + severity: ActionAnalysisSeverity = "info" + possible_reason: str = "" + suggested_action: str = "" + requires_confirmation: bool = False + notes: list[str] = field(default_factory=list) + + @dataclass(slots=True) class AgentState: """一次部署运行的完整状态,可序列化到 checkpoint。""" diff --git a/pyproject.toml b/pyproject.toml index 8036ab8..0d8e50b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ [project.optional-dependencies] mcp = ["mcp>=1"] +chat = ["rich>=13", "prompt_toolkit>=3"] test = ["pytest"] [tool.pytest.ini_options] diff --git a/tests/test_agent_flow.py b/tests/test_agent_flow.py index a573b92..05a196f 100644 --- a/tests/test_agent_flow.py +++ b/tests/test_agent_flow.py @@ -60,6 +60,33 @@ def test_run_deploy_flow_stops_on_verify_failure(tmp_path: Path): assert any(event["type"] == "CONFIRMATION_REQUIRED" for event in state.events) +def test_action_analysis_event_is_recorded_when_enabled(tmp_path: Path): + fake = FakeActionRunner( + { + "verify-ip:192.168.1.10": { + "ACTION": "verify-ip", + "IP": "192.168.1.10", + "SUCCESS": "false", + "MESSAGE": "health check failed", + } + } + ) + agent = PamDeployAgent(fake_runner=fake, action_analysis_enabled=True) + state = agent.create_state( + params=PARAMS, + execution_strategy="fake", + config_path=str(tmp_path / "config.txt"), + ) + + agent.run_deploy_flow(state) + + analyses = [event for event in state.events if event["type"] == "ACTION_ANALYSIS"] + verify_analysis = [event for event in analyses if event["stage"] == "verify-ip"][0] + assert verify_analysis["has_anomaly"] is True + assert verify_analysis["severity"] == "high" + assert verify_analysis["requires_confirmation"] is True + + def test_confirm_pending_rollback_runs_rollback_and_resume_continues(tmp_path: Path): fake = FakeActionRunner( { diff --git a/tests/test_interactive_cli.py b/tests/test_interactive_cli.py index 092ffb5..7aaf7d1 100644 --- a/tests/test_interactive_cli.py +++ b/tests/test_interactive_cli.py @@ -50,7 +50,7 @@ def test_chat_run_executes_fake_deploy_and_writes_checkpoint(tmp_path: Path): checkpoint_path=str(checkpoint), ) - run_session(session, ["run", "yes", "exit"]) + run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert checkpoint.exists() assert session.state is not None @@ -76,9 +76,57 @@ def test_chat_approve_then_resume_continues_after_failed_ip(tmp_path: Path): checkpoint_path=str(tmp_path / "checkpoint.json"), ) - run_session(session, ["run", "yes", "approve", "resume", "exit"]) + run_session(session, ["run", "yes", "yes", "yes", "approve", "resume", "exit"]) assert session.state is not None assert session.state.pending_confirmation == "" assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE" assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS" + + +def test_chat_params_events_and_checkpoint_commands(tmp_path: Path): + checkpoint = tmp_path / "checkpoint.json" + session = InteractiveCliSession( + agent=PamDeployAgent(fake_runner=FakeActionRunner(), action_analysis_enabled=True), + params=PARAMS, + strategy="fake", + checkpoint_path=str(checkpoint), + ) + + output = run_session( + session, + [ + "params", + "llm action-analysis on", + "run", + "yes", + "yes", + "yes", + "events 2", + "list checkpoints", + "load checkpoint " + str(checkpoint), + "exit", + ], + ) + + assert session.state is not None + assert any("CLIENT_SECRET: ***" in item for item in output) + assert any("ACTION_ANALYSIS" in item for item in output) + assert any("checkpoint 列表" in item for item in output) + + +def test_chat_can_hot_load_mcp_config(tmp_path: Path): + mcp_config = tmp_path / "mcp.json" + mcp_config.write_text('{"transport": "stdio", "command": "python"}', encoding="utf-8") + session = InteractiveCliSession( + agent=PamDeployAgent(), + params=PARAMS, + strategy="hybrid_node_mcp", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["mcp config " + mcp_config.as_posix(), "exit"]) + + assert session.agent.mcp_runner is not None + assert session.agent.router.mcp_runner is not None + assert any("MCP 配置已加载" in item for item in output) diff --git a/tests/test_langgraph_runtime.py b/tests/test_langgraph_runtime.py new file mode 100644 index 0000000..7cbdc02 --- /dev/null +++ b/tests/test_langgraph_runtime.py @@ -0,0 +1,54 @@ +from pathlib import Path + +from pam_deploy_graph.agent import PamDeployAgent +from pam_deploy_graph.fake_runner import FakeActionRunner +from pam_deploy_graph.langgraph_runtime import LangGraphDeploymentRuntime + + +PARAMS = { + "HOME_BASE_URL": "https://pam.home.example.com", + "CLIENT_ID": "client", + "CLIENT_SECRET": "secret", + "AIRPORT_CODE": "HET", + "APP_NAME": "PAM", + "MODULE_NAME": "Node", + "VERSION_NUMBER": "2.0.5", + "ZIP_FILE_PATH": "C:/pkg.zip", +} + + +def test_langgraph_runtime_interrupts_and_resumes_confirmation(tmp_path: Path): + fake = FakeActionRunner( + { + "verify-ip:192.168.1.10": { + "ACTION": "verify-ip", + "IP": "192.168.1.10", + "SUCCESS": "false", + "MESSAGE": "health check failed", + } + } + ) + agent = PamDeployAgent(fake_runner=fake) + state = agent.create_state( + params=PARAMS, + execution_strategy="fake", + config_path=str(tmp_path / "config.txt"), + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + runtime = LangGraphDeploymentRuntime(agent=agent) + + first = runtime.start(state) + + assert first.interrupted is True + assert runtime.waiting_confirmation is True + assert first.confirmation["type"] == "rollback-ip" + assert first.confirmation["ip"] == "192.168.1.10" + + second = runtime.resume(approved=True) + + assert second.interrupted is False + assert runtime.waiting_confirmation is False + assert second.state is not None + assert second.state.pending_confirmation == "" + assert second.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE" + assert second.state.ip_states["192.168.1.11"]["status"] == "SUCCESS" diff --git a/tests/test_llm_structured.py b/tests/test_llm_structured.py index 09f5b89..31193b2 100644 --- a/tests/test_llm_structured.py +++ b/tests/test_llm_structured.py @@ -6,6 +6,7 @@ from pam_deploy_graph.llm.openai_compatible import OpenAICompatibleLlmClient from pam_deploy_graph.llm.rule_based import RuleBasedLlmClient from pam_deploy_graph.llm.validators import validate_deploy_plan from pam_deploy_graph.models import LlmDeployPlan +from pam_deploy_graph.models import ActionResult def test_understand_request_prefers_hybrid_for_mcp(): @@ -141,3 +142,50 @@ def test_openai_compatible_client_does_not_send_base_secret(): serialized_prompt = str(calls[0]) assert "real-secret" not in serialized_prompt assert result.extracted_params["CLIENT_SECRET"] == "real-secret" + + +def test_openai_compatible_client_analyzes_action_result_with_redaction(): + calls = [] + + def transport(url, headers, payload, timeout_sec): + calls.append(payload) + return { + "choices": [ + { + "message": { + "content": ( + '{"action":"verify-ip","has_anomaly":true,"severity":"high",' + '"possible_reason":"health check failed",' + '"suggested_action":"download logs","requires_confirmation":true,' + '"notes":["verify failed"]}' + ) + } + } + ] + } + + client = OpenAICompatibleLlmClient( + base_url="https://llm.example/v1", + api_key="secret-key", + model="model-a", + transport=transport, + ) + + analysis = client.analyze_action_result( + action="verify-ip", + result=ActionResult( + action="verify-ip", + backend="fake", + ok=False, + values={"CLIENT_SECRET": "real-secret", "SUCCESS": "false"}, + stderr="x" * 1200, + error_summary="failed", + ), + state_summary={"params": {"CLIENT_SECRET": "real-secret"}}, + ) + + serialized_prompt = str(calls[0]) + assert analysis.has_anomaly is True + assert analysis.severity == "high" + assert "real-secret" not in serialized_prompt + assert "[已截断]" in serialized_prompt diff --git a/tests/test_mcp_client.py b/tests/test_mcp_client.py index 4af77aa..3b64a20 100644 --- a/tests/test_mcp_client.py +++ b/tests/test_mcp_client.py @@ -2,8 +2,10 @@ from pam_deploy_graph.mcp_client import ( FunctionMcpToolClient, load_mcp_client_config, SessionMcpToolClient, + StdioMcpToolClient, normalize_mcp_sdk_result, ) +from pam_deploy_graph.mcp_factory import build_mcp_runner_from_config def test_function_mcp_client_wraps_callable(): @@ -31,11 +33,35 @@ def test_session_mcp_client_normalizes_text_json_content(): def test_load_mcp_client_config(tmp_path): path = tmp_path / "mcp.json" path.write_text( - '{"server_name": "pam-node-prod", "tool_names": {"get-online-ips": "custom_ips"}}', + ( + '{"server_name": "pam-node-prod", "transport": "stdio", ' + '"command": "python", "args": ["-m", "server"], ' + '"env": {"PAM_ENV": "test"}, "cwd": "/tmp", "timeout_seconds": 3, ' + '"tool_names": {"get-online-ips": "custom_ips"}}' + ), encoding="utf-8", ) config = load_mcp_client_config(path) assert config.server_name == "pam-node-prod" + assert config.transport == "stdio" + assert config.command == "python" + assert config.args == ["-m", "server"] + assert config.env == {"PAM_ENV": "test"} + assert config.cwd == "/tmp" + assert config.timeout_seconds == 3 assert config.tool_names["get-online-ips"] == "custom_ips" + + +def test_build_mcp_runner_from_stdio_config(tmp_path): + path = tmp_path / "mcp.json" + path.write_text( + '{"transport": "stdio", "command": "python", "tool_names": {"verify-ip": "custom_verify"}}', + encoding="utf-8", + ) + + runner = build_mcp_runner_from_config(path) + + assert isinstance(runner.client, StdioMcpToolClient) + assert runner.tool_names["verify-ip"] == "custom_verify"