From 8d390aa4166255b9cf637b700b851e6e7fe840c5 Mon Sep 17 00:00:00 2001 From: dark Date: Wed, 3 Jun 2026 17:02:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=20chat/runtime=20=E7=9A=84?= =?UTF-8?q?=20LLM=20=E5=AE=A1=E6=A0=B8=E3=80=81=E6=96=AD=E7=82=B9=E7=BB=AD?= =?UTF-8?q?=E8=B7=91=E4=B8=8E=E7=83=AD=E6=9B=B4=E6=96=B0=EF=BC=8C=E5=B9=B6?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=89=93=E5=8C=85=E6=96=87=E6=A1=A3=20?= =?UTF-8?q?=E8=B0=83=E6=95=B4=20workflow=20=E6=89=A7=E8=A1=8C=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=9A=E6=AF=8F=E4=B8=AA=20action=20=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=90=8E=E7=BB=9F=E4=B8=80=E8=BF=9B=E5=85=A5=20LLM/?= =?UTF-8?q?=E8=A7=84=E5=88=99=E5=AE=A1=E6=A0=B8=EF=BC=8C=E5=AE=A1=E6=A0=B8?= =?UTF-8?q?=E5=BC=80=E5=A7=8B/=E7=BB=93=E6=9E=9C=E5=8F=AF=E6=92=AD?= =?UTF-8?q?=E6=8A=A5=EF=BC=8C=E5=AE=A1=E6=A0=B8=E9=98=BB=E6=96=AD=E6=97=B6?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=9A=82=E5=81=9C=E5=B9=B6=E7=BB=99=E5=87=BA?= =?UTF-8?q?=E5=BB=BA=E8=AE=AE=20=E5=A2=9E=E5=BC=BA=20chat=20=E4=BA=A4?= =?UTF-8?q?=E4=BA=92=EF=BC=9A=E6=94=AF=E6=8C=81=E6=89=A7=E8=A1=8C=E4=B8=AD?= =?UTF-8?q?=20Ctrl+C=20=E4=B8=AD=E6=96=AD=E5=B9=B6=E4=BF=9D=E5=AD=98=20che?= =?UTF-8?q?ckpoint=EF=BC=8C=E5=90=8E=E7=BB=AD=E5=8F=AF=20resume=20?= =?UTF-8?q?=E7=BB=A7=E7=BB=AD=20=E5=A2=9E=E5=8A=A0=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E6=97=B6=E7=83=AD=E6=9B=B4=E6=96=B0=E8=83=BD=E5=8A=9B=EF=BC=9A?= =?UTF-8?q?=E6=94=AF=E6=8C=81=20set=20KEY=3DVALUE=20=E5=92=8C=20load=20par?= =?UTF-8?q?ams=20<=E8=B7=AF=E5=BE=84>=20=E5=90=8C=E6=AD=A5=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E5=BD=93=E5=89=8D=20state=E3=80=81config.txt=20?= =?UTF-8?q?=E5=92=8C=20checkpoint=20=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=20action=20=E5=AE=A1=E6=A0=B8=E6=8F=90=E7=A4=BA?= =?UTF-8?q?=E8=AF=8D=EF=BC=9A=E6=96=B0=E5=A2=9E=20--llm-action-analysis-pr?= =?UTF-8?q?ompt-file=20/=20PAM=5FLLM=5FACTION=5FANALYSIS=5FPROMPT=5FFILE?= =?UTF-8?q?=20=E6=96=B0=E5=A2=9E=20prompts/action=5Freview.txt=EF=BC=8C?= =?UTF-8?q?=E8=90=BD=E5=9C=B0=E4=BF=9D=E5=AD=98=E5=BD=93=E5=89=8D=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E5=AE=A1=E6=A0=B8=E6=8F=90=E7=A4=BA=E8=AF=8D=EF=BC=8C?= =?UTF-8?q?=E4=BE=BF=E4=BA=8E=E5=90=8E=E7=BB=AD=E6=8C=89=E5=9F=BA=E7=BA=BF?= =?UTF-8?q?=E8=B0=83=E6=95=B4=20=E6=9B=B4=E6=96=B0=20Linux=20=E6=89=93?= =?UTF-8?q?=E5=8C=85=E8=84=9A=E6=9C=AC=EF=BC=8C=E5=B0=86=20prompts/action?= =?UTF-8?q?=5Freview.txt=20=E4=B8=80=E5=B9=B6=E5=B8=A6=E5=85=A5=E5=8F=91?= =?UTF-8?q?=E5=B8=83=E5=8C=85=20=E5=90=8C=E6=AD=A5=E6=9B=B4=E6=96=B0=20REA?= =?UTF-8?q?DME=E3=80=81=E6=B5=81=E7=A8=8B=E5=9B=BE=E3=80=81todo=20?= =?UTF-8?q?=E5=92=8C=E6=89=93=E5=8C=85=E6=96=87=E6=A1=A3=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E6=AD=A3=20--analyze-actions=20=E8=AF=AD=E4=B9=89=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E4=B8=8E=20chat=20=E6=9C=80=E6=96=B0=E8=A1=8C?= =?UTF-8?q?=E4=B8=BA=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 29 ++- docs/current_logic_flow.md | 54 +++++- docs/todo.md | 7 +- packaging/README_linux_package.md | 11 ++ packaging/README_packaged_agent.md | 30 ++- packaging/build_linux_self_contained.sh | 16 +- pam_deploy_graph/agent.py | 175 ++++++++++++++++-- pam_deploy_graph/cli.py | 16 +- pam_deploy_graph/interactive.py | 122 +++++++++++- pam_deploy_graph/llm/base.py | 2 +- pam_deploy_graph/llm/factory.py | 15 +- pam_deploy_graph/llm/openai_compatible.py | 14 +- pam_deploy_graph/llm/prompts.py | 3 +- pam_deploy_graph/llm/rule_based.py | 6 + pam_deploy_graph/models.py | 4 + pam_deploy_graph/tool_catalog.py | 214 ++++++++++++++++++++++ prompts/action_review.txt | 18 ++ tests/test_agent_flow.py | 114 ++++++++++++ tests/test_interactive_cli.py | 79 ++++++++ 19 files changed, 876 insertions(+), 53 deletions(-) create mode 100644 pam_deploy_graph/tool_catalog.py create mode 100644 prompts/action_review.txt diff --git a/README.md b/README.md index 27ab02b..831f268 100644 --- a/README.md +++ b/README.md @@ -82,8 +82,13 @@ packaging/ - chat 在开发环境可选启用 `rich` / `prompt_toolkit`;PyInstaller 打包环境默认使用普通文本输入,避免交互兼容问题。 - chat 执行前会归一化参数并展示实际写入脚本配置的值;`script_only` / `hybrid_node_mcp` 会提前检查 `ZIP_FILE_PATH` 是否存在。 - chat 执行中会播报每个 action 的开始、完成或失败;action 执行失败会停在当前 checkpoint,不再误报 LangGraph 不可用。 -- 增加 action 后 LLM/规则诊断,可通过 `--analyze-actions` 或 `llm action-analysis on` 显式开启。 -- 添加基础测试,当前本地结果为 `51 passed, 2 skipped`。 +- 每个 action 完成后都会进入一次 LLM/规则审核;如果审核建议停止,流程会暂停并给出建议,等待用户 `resume`。 +- `--analyze-actions` 和 `llm action-analysis on` 改为只控制是否把详细审核结果写入 `events`,不再控制审核是否执行。 +- chat 会播报 action 审核开始、审核完成和审核失败,避免黑盒执行。 +- chat 支持执行中按 `Ctrl+C` 中断,保存 checkpoint 后再 `resume`。 +- chat 支持 `set KEY=VALUE` 和 `load params <路径>` 热更新当前运行参数,并同步回写运行中的 `config.txt` 与 checkpoint。 +- 支持通过 `--llm-action-analysis-prompt-file`、`PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。 +- 添加基础测试,当前本地结果为 `57 passed, 2 skipped`。 未完成: @@ -113,6 +118,19 @@ python -m pam_deploy_graph.cli analyze \ --llm-model your-model-name ``` +如需自定义 action 审核提示词,可再补充: + +```bash +python -m pam_deploy_graph.cli analyze \ + --config doc_scripts/config.txt.example \ + --text "请分析这次部署" \ + --llm-base-url https://your-llm.example.com/v1 \ + --llm-model your-model-name \ + --llm-action-analysis-prompt-file prompts/action_review.txt +``` + +仓库内已提供 [prompts/action_review.txt](/e:/AIcoding/agent_deply/prompts/action_review.txt) 作为“当前默认 action 审核提示词”的落地副本,后续自定义时可以先复制它再改,便于和内置默认行为对照。 + 真实 LLM 调用位置在 `pam_deploy_graph/llm/openai_compatible.py`,提示词在 `pam_deploy_graph/llm/prompts.py`。发送给 LLM 的 `base_params` 会脱敏,`CLIENT_SECRET` 不会进入 prompt;本地生成计划后仍会执行 guardrails 校验。 如果服务需要鉴权,再补充: @@ -253,14 +271,17 @@ python -m pam_deploy_graph.cli chat --config doc_scripts/config.txt.example --st PAM> 请用 MCP 预演部署 HET PAM Node 版本 2.0.5,不要动环境 PAM> preview PAM> set VERSION_NUMBER=2.0.6 +PAM> load params runtime/override.txt PAM> run 即将执行真实 action;确认执行请输入 yes: yes 开始执行 action: get-token [backend=fake] +开始分析 action 结果: get-token [backend=fake] 完成 action: get-token [backend=fake] PAM> status PAM> params PAM> events 5 PAM> llm action-analysis on +PAM> llm config action_analysis_prompt_file=prompts/action_review.txt PAM> mcp config mcp_client.example.json PAM> list checkpoints PAM> load checkpoint runtime/checkpoints/chat-demo.json @@ -269,7 +290,7 @@ PAM> resume PAM> exit ``` -`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。如果某个 IP 失败,会通过 LangGraph interrupt 暂停并提示输入 `approve` 或 `reject [原因]`,确认后恢复同一个图线程继续执行。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model`、`--mcp-config` 和 `--analyze-actions`。 +`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。每个 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;如果审核建议停止或审核本身失败,流程会暂停并输出建议,等待用户决定是否 `resume`。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断,chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted`。`set KEY=VALUE` 和 `load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file`、`--mcp-config` 和 `--analyze-actions`。 预演: @@ -295,7 +316,7 @@ python -m pam_deploy_graph.cli run-deploy --config doc_scripts/config.txt.exampl python -m pam_deploy_graph.cli confirm --checkpoint runtime/checkpoints/demo.json --decision approve --confirm ``` -`confirm` 会通过 LangGraph interrupt resume 处理确认,并在确认后继续执行后续图节点;如果进程中断或需要再次续跑,再执行 `resume` 即可。 +`confirm` 会通过 LangGraph interrupt resume 处理确认,并在确认后继续执行后续图节点;如果流程此前处于 `paused` 状态,`resume` 会先清理暂停标记,再从 checkpoint 继续执行。 拒绝回滚: diff --git a/docs/current_logic_flow.md b/docs/current_logic_flow.md index ec8d54a..70f01db 100644 --- a/docs/current_logic_flow.md +++ b/docs/current_logic_flow.md @@ -1,6 +1,6 @@ # 当前整体逻辑结构流程图 -本文描述当前 PAM 部署 Agent 的主要模块、运行路径、人工确认点和断点续跑逻辑。 +本文描述当前 PAM 部署 Agent 的主要模块、运行路径、LLM 审核、人工确认点、热更新和断点续跑逻辑。 ## 模块结构 @@ -105,28 +105,41 @@ flowchart LR C -- PAM_NODE action --> NM[MCP tool 执行] ``` -## action 后诊断 +## action 后审核 ```mermaid flowchart TD - A[action 执行完成] --> B{是否开启 analyze-actions} - B -- 否 --> X[只记录 ACTION_DONE/ACTION_FAIL] - B -- 是 --> C[整理 ActionResult 和 AgentState 摘要] + A[action 执行完成] --> C[整理 ActionResult 和 AgentState 摘要] C --> D[敏感字段脱敏并截断长日志] D --> E{真实 LLM 是否配置} - E -- 是 --> F[OpenAICompatibleLlmClient 输出结构化诊断] - E -- 否 --> G[RuleBasedLlmClient 本地规则诊断] - F --> H[追加 ACTION_ANALYSIS 事件] + E -- 是 --> F[OpenAICompatibleLlmClient 输出结构化审核] + E -- 否 --> G[RuleBasedLlmClient 本地规则审核] + F --> H{should_continue} G --> H - H --> I[诊断只作建议,不自动继续/回滚/改参数] + H -- true --> I[继续后续 action] + H -- false --> J[暂停流程并写入 review_context] + J --> K[chat/CLI 播报审核建议并等待 resume] + F --> L{是否开启 analyze-actions} + G --> L + L -- 是 --> M[追加 ACTION_ANALYSIS 事件] + L -- 否 --> N[不写详细事件,仅播报审核过程] ``` +说明: + +- 每个 action 完成后都会进入一次审核,不再依赖 `--analyze-actions` 开关。 +- `--analyze-actions` 或 `llm action-analysis on` 只控制是否把详细审核结果写入 `events`。 +- 如果审核本身失败,也会生成“停止继续”的审核结果并暂停流程,避免黑盒继续执行。 + ## 失败、人工确认和续跑 ```mermaid flowchart TD A[逐 IP action 执行] --> B{action 失败或业务校验失败} B -- 否 --> C[记录 completed_steps 并保存 checkpoint] + C --> C1{LLM 审核是否允许继续} + C1 -- 是 --> C2[继续后续 action] + C1 -- 否 --> G[保存 checkpoint 并暂停] B -- 是 --> D[记录 ip_state 为 FAILED] D --> E[download-log 尽力下载日志] E --> F[设置 pending_confirmation=rollback-ip:IP] @@ -148,18 +161,39 @@ flowchart TD N --> O[跳过已完成全局步骤、成功 IP 和单 IP 已完成 action] ``` +## 用户中断与热更新 + +```mermaid +flowchart TD + A[chat 执行中] --> B{用户是否按 Ctrl+C} + B -- 是 --> C[pause_state 标记 paused=user_interrupted] + C --> D[保存 checkpoint] + D --> E[chat 播报可 resume] + B -- 否 --> F[继续执行] + + G[用户输入 set KEY=VALUE] --> H[normalize_params] + I[用户输入 load params <路径>] --> J[读取参数文件] + J --> H + H --> K[update_state_params] + K --> L[回写 state.params] + L --> M[回写运行中的 config.txt] + M --> N[保存 checkpoint] +``` + ## checkpoint 续跑语义 - `completed_global_steps`:全局阶段已经完成的 action 会跳过。 - `ip_states[ip].status == SUCCESS`:成功 IP 会跳过。 - `ip_states[ip].completed_steps`:同一个 IP 已完成的 action 会跳过。 - `pending_confirmation`:存在待确认事项时,部署流程不继续执行,必须先 `approve` 或 `reject`。 +- `paused` / `pause_reason`:流程可能因 LLM 审核阻断、用户中断、回滚失败等原因暂停;`resume` 会先清理暂停标记,再继续执行。 +- `review_context`:保存最近一次暂停时的审核建议、失败原因、IP 和阶段,供 chat/CLI 输出给用户。 - CLI/chat 的运行调度由 `langgraph_runtime.py` 通过 action 级 LangGraph 节点执行;chat 和 CLI confirm 的确认点使用 LangGraph interrupt 和 InMemorySaver。 - 跨进程续跑仍读取业务 checkpoint JSON;LangGraph checkpointer 负责单进程图恢复和 interrupt resume。 - checkpoint 为了真实续跑会保存完整参数,请放在受控目录中。 ## 真实外部能力接入点 -- 真实 LLM:`llm.openai_compatible.OpenAICompatibleLlmClient`,通过 `PAM_LLM_BASE_URL`、`PAM_LLM_API_KEY`、`PAM_LLM_MODEL` 或 CLI 参数配置。 +- 真实 LLM:`llm.openai_compatible.OpenAICompatibleLlmClient`,通过 `PAM_LLM_BASE_URL`、`PAM_LLM_API_KEY`、`PAM_LLM_MODEL`、`PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE` 或 CLI 参数配置。 - 真实 MCP:CLI/chat 可通过 `--mcp-config` 加载 streamable_http、sse 或 stdio MCP 配置,HTTP/SSE 支持独立 token 鉴权,并通过 `list_tools` 自动发现 server tools。 - 真实脚本:PAM_HOME action 通过 `doc_scripts/deploy.sh` 或 `deploy.ps1` 调用。 diff --git a/docs/todo.md b/docs/todo.md index 89d47d2..e04367d 100644 --- a/docs/todo.md +++ b/docs/todo.md @@ -7,15 +7,18 @@ - [x] 增加 `params` 命令,脱敏展示当前会话参数。 - [x] 增加 `events` 命令,查看最近 action 执行记录。 - [x] 增加 `load checkpoint` 和 `list checkpoints`,方便选择历史任务续跑。 +- [x] 增加 `load params <路径>`,允许从参数文件热更新当前会话和当前运行任务。 - [x] 增加参数确认和目标 IP 范围确认,不只在回滚阶段确认。 - [x] 增加 LLM/MCP 配置热加载,例如 `llm config`、`mcp config`。 +- [x] 增加执行中 `Ctrl+C` 中断处理:保存 checkpoint、标记 `user_interrupted`,再由 `resume` 继续。 - [x] 将 chat 的人工确认点接入 LangGraph interrupt/checkpointer;`run` 执行到回滚确认点后由 interrupt 暂停,`approve/reject` 通过 `Command(resume=...)` 恢复同一图线程。跨进程续跑仍保留业务 checkpoint JSON。 ## LLM action 后分析 - [x] 每次 action 完成后,可把 `action`、`backend`、`ok`、`values`、`stderr`、`error_summary` 和当前 `AgentState` 摘要交给 LLM 分析。 - [x] LLM 输出结构化结果:是否异常、异常等级、可能原因、建议动作、是否需要人工确认。 -- [x] LLM 分析只作为辅助建议,不直接决定继续执行、回滚或修改参数。 +- [x] LLM 分析结果会影响流程是否继续:`should_continue=false` 时自动暂停,并把建议输出给用户。 - [x] 本地保留规则兜底:exit code、`verify-ip SUCCESS=false`、pending confirmation 等硬规则优先于 LLM。 - [x] 对 LLM 输入做脱敏,禁止把 `CLIENT_SECRET`、token、Authorization、完整日志原文发送给模型。 -- [x] 通过 `--analyze-actions` 或 `llm action-analysis on` 显式开启,真实部署默认不启用。 +- [x] 每个 action 都会执行审核;`--analyze-actions` 或 `llm action-analysis on` 只控制是否把详细审核结果写入 `events`。 +- [x] 支持通过 `--llm-action-analysis-prompt-file`、环境变量或 chat 命令热加载自定义 action 审核提示词。 diff --git a/packaging/README_linux_package.md b/packaging/README_linux_package.md index 7907ea6..6a56c92 100644 --- a/packaging/README_linux_package.md +++ b/packaging/README_linux_package.md @@ -42,6 +42,8 @@ pam-deploy-agent-linux-x86_64/ deploy.sh config.txt.example PAM_AUTO_DEPLY_SKILL.md + prompts/ + action_review.txt mcp_client.example.json README.md LICENSE @@ -50,6 +52,7 @@ pam-deploy-agent-linux-x86_64/ 说明: - `doc_scripts` 不会打入项目设计文档、测试脚本、Windows bat/PowerShell 脚本。 +- `prompts/action_review.txt` 会随发布包一起带上,作为当前默认 action 审核提示词的参照版本。 - 发布包内的 `README.md` 来自 `packaging/README_packaged_agent.md`,只说明打包后 Agent 的使用方式。 - 发布包内的 `mcp_client.example.json` 是 MCP server URL + 独立鉴权配置示例,需要按真实 MCP server 和 token 地址修改。 - 项目开发用 README 不会复制到发布包内。 @@ -65,6 +68,14 @@ cd pam-deploy-agent-linux-x86_64 `run.sh --help` 是发布包专用的中文帮助,会解释命令、参数、环境变量和常见示例。`run.sh` 会切换到发布目录再启动可执行程序,因此默认的 `doc_scripts/...` 相对路径可以正常工作。 +本次发布包对应的运行时行为也已同步到包内 `README.md`: + +- 每个 action 完成后都会自动执行一次 LLM/规则审核。 +- `--analyze-actions` 只控制是否把详细审核结果写入 `events`。 +- chat 支持执行中 `Ctrl+C` 中断后保存 checkpoint,再通过 `resume` 继续。 +- chat 支持 `set KEY=VALUE` 和 `load params <路径>` 热更新当前运行任务参数。 +- 支持通过 `--llm-action-analysis-prompt-file` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。 + ## 包大小评估 最终大小以脚本末尾打印的 `du` 结果为准。按当前依赖结构预估: diff --git a/packaging/README_packaged_agent.md b/packaging/README_packaged_agent.md index e23cbfb..630acf6 100644 --- a/packaging/README_packaged_agent.md +++ b/packaging/README_packaged_agent.md @@ -12,12 +12,14 @@ pam-deploy-agent-linux-x86_64/ deploy.sh # Linux 脚本 action 入口 config.txt.example # 参数配置示例 PAM_AUTO_DEPLY_SKILL.md + prompts/ + action_review.txt # 当前默认 action 审核提示词基线 mcp_client.example.json README.md # 当前说明 LICENSE ``` -`doc_scripts` 只保留运行必需文件,不包含项目设计文档、测试脚本或 Windows 脚本。 +`doc_scripts` 只保留运行必需文件,不包含项目设计文档、测试脚本或 Windows 脚本。`prompts/action_review.txt` 是当前默认 action 审核提示词的落地副本,便于复制后按需修改。 ## 查看帮助 @@ -34,7 +36,7 @@ pam-deploy-agent-linux-x86_64/ 发布包默认使用普通文本输入,避免 PyInstaller 环境下 `prompt_toolkit` 兼容性问题;输出仍会在可用时使用 `rich` 做更清晰的文本展示。 chat 内的失败回滚确认由 LangGraph interrupt 托管;执行停在确认点后,输入 `approve` 或 `reject [原因]` 会恢复同一个图线程继续处理。 -chat 会在执行前归一化并展示实际写入脚本配置的参数;`script_only` / `hybrid_node_mcp` 会先检查 `ZIP_FILE_PATH` 是否存在,避免脚本运行后才用默认路径失败。执行过程中每个 action 都会输出开始、完成或失败状态。 +chat 会在执行前归一化并展示实际写入脚本配置的参数;`script_only` / `hybrid_node_mcp` 会先检查 `ZIP_FILE_PATH` 是否存在,避免脚本运行后才用默认路径失败。执行过程中每个 action 都会输出开始、完成或失败状态;每个 action 完成后还会自动进入一次 LLM/规则审核,并播报审核开始和审核结果。 ## 交互式使用 @@ -60,14 +62,17 @@ chat 会在执行前归一化并展示实际写入脚本配置的参数;`scrip PAM> 请用 MCP 预演部署 HET PAM Node 版本 2.0.5,不要动环境 PAM> preview PAM> set VERSION_NUMBER=2.0.6 +PAM> load params runtime/override.txt PAM> run 即将执行真实 action;确认执行请输入 yes: yes 开始执行 action: get-token [backend=fake] +开始分析 action 结果: get-token [backend=fake] 完成 action: get-token [backend=fake] PAM> status PAM> params PAM> events 5 PAM> llm action-analysis on +PAM> llm config action_analysis_prompt_file=prompts/action_review.txt PAM> mcp config mcp_client.example.json PAM> list checkpoints PAM> load checkpoint runtime/checkpoints/demo.json @@ -96,7 +101,7 @@ PAM> exit ./run.sh run-deploy --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json --confirm ``` -执行时开启 action 后诊断: +执行时把详细 action 审核结果写入 `events`: ```bash ./run.sh run-deploy \ @@ -138,12 +143,13 @@ PAM> exit ```bash export PAM_LLM_BASE_URL="https://your-llm.example.com/v1" -export PAM_LLM_API_KEY="your-api-key" export PAM_LLM_MODEL="your-model-name" ./run.sh analyze --config doc_scripts/config.txt.example --text "请分析这次部署" ``` +如果服务需要鉴权,再补 `PAM_LLM_API_KEY`;如果不需要鉴权,可以不配置,程序不会发送 `Authorization` 请求头。 + 也可以用 CLI 参数: ```bash @@ -151,14 +157,25 @@ export PAM_LLM_MODEL="your-model-name" --config doc_scripts/config.txt.example \ --text "请分析这次部署" \ --llm-base-url https://your-llm.example.com/v1 \ - --llm-api-key your-api-key \ --llm-model your-model-name ``` +如需自定义 action 审核提示词: + +```bash +./run.sh analyze \ + --config doc_scripts/config.txt.example \ + --text "请分析这次部署" \ + --llm-base-url https://your-llm.example.com/v1 \ + --llm-model your-model-name \ + --llm-action-analysis-prompt-file prompts/action_review.txt +``` + chat 内也可以热加载 LLM: ```text PAM> llm config base_url=https://your-llm.example.com/v1 api_key=your-api-key model=your-model-name +PAM> llm config action_analysis_prompt_file=prompts/action_review.txt PAM> llm action-analysis on PAM> llm fallback ``` @@ -203,5 +220,8 @@ MCP token 获取方式与 HOME 一致,默认按 `client_credentials` POST 到 - 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL`、`CLIENT_ID`、`CLIENT_SECRET`、`AIRPORT_CODE`、`APP_NAME`、`MODULE_NAME`、`VERSION_NUMBER`、`ZIP_FILE_PATH`。 - `chat` 中输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时请直接描述部署任务,或显式使用 `analyze <需求>`。 +- 每个 action 完成后都会自动执行一次 LLM/规则审核;`--analyze-actions` 和 `llm action-analysis on` 只控制是否把详细审核结果写入 `events`。 +- 如果审核建议停止、审核本身失败,或用户在执行中按下 `Ctrl+C`,流程都会保存 checkpoint 并进入暂停状态;后续可使用 `resume` 继续。 +- `set KEY=VALUE` 和 `load params <路径>` 会热更新当前运行任务的参数,并回写运行中的 `config.txt` 和 checkpoint。 - `checkpoint` 会保存完整运行参数,请放在受控目录。 - `hybrid_node_mcp`、`resume`、`confirm` 如果需要执行 MCP action,请同时传入 `--mcp-config`。 diff --git a/packaging/build_linux_self_contained.sh b/packaging/build_linux_self_contained.sh index 118ff81..83da61e 100644 --- a/packaging/build_linux_self_contained.sh +++ b/packaging/build_linux_self_contained.sh @@ -69,6 +69,9 @@ cp -a doc_scripts/config.txt.example "$RELEASE_DIR/doc_scripts/config.txt.exampl cp -a doc_scripts/PAM_AUTO_DEPLY_SKILL.md "$RELEASE_DIR/doc_scripts/PAM_AUTO_DEPLY_SKILL.md" chmod +x "$RELEASE_DIR/doc_scripts/deploy.sh" +mkdir -p "$RELEASE_DIR/prompts" +cp -a prompts/action_review.txt "$RELEASE_DIR/prompts/action_review.txt" + cp -a packaging/README_packaged_agent.md "$RELEASE_DIR/README.md" cp -a packaging/mcp_client.example.json "$RELEASE_DIR/mcp_client.example.json" cp -a LICENSE "$RELEASE_DIR/LICENSE" @@ -162,12 +165,13 @@ LLM 环境变量: 说明: 1. 本包已包含 Python 运行时和 Python 依赖,目标机器不需要安装 Python 包。 2. doc_scripts 只包含运行必需文件:deploy.sh、config.txt.example、PAM_AUTO_DEPLY_SKILL.md。 - 3. mcp_client.example.json 是 MCP server URL + 独立鉴权配置示例,需要按真实 MCP server 修改。 - 4. confirm 会通过 LangGraph interrupt resume 处理确认,并继续后续图节点;进程中断时再使用 resume。 - 5. chat 会在执行前归一化并展示实际写入脚本配置的参数;script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。 - 6. chat 执行过程中会播报每个 action 的开始、完成或失败;普通问候不会触发 LLM/结构化分析。 - 7. chat 内可使用 params、events、list checkpoints、load checkpoint、llm config、mcp config 等命令。 - 8. checkpoint 会保存完整运行参数,请放在受控目录。 + 3. prompts/action_review.txt 是当前默认 action 审核提示词基线,可复制后自行修改。 + 4. mcp_client.example.json 是 MCP server URL + 独立鉴权配置示例,需要按真实 MCP server 修改。 + 5. confirm 会通过 LangGraph interrupt resume 处理确认,并继续后续图节点;进程中断时再使用 resume。 + 6. chat 会在执行前归一化并展示实际写入脚本配置的参数;script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。 + 7. chat 执行过程中会播报每个 action 的开始、完成或失败;普通问候不会触发 LLM/结构化分析。 + 8. chat 内可使用 params、events、list checkpoints、load checkpoint、load params、llm config、mcp config 等命令。 + 9. checkpoint 会保存完整运行参数,请放在受控目录。 HELP_TEXT } diff --git a/pam_deploy_graph/agent.py b/pam_deploy_graph/agent.py index 5b4b8ea..4684bfa 100644 --- a/pam_deploy_graph/agent.py +++ b/pam_deploy_graph/agent.py @@ -19,7 +19,7 @@ from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENC from .fake_runner import FakeActionRunner from .llm import LlmClient, RuleBasedLlmClient, validate_deploy_plan, validate_intent_result from .mcp_runner import McpActionRunner -from .models import ActionResult, AgentState, ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult +from .models import ActionResult, AgentState, ExecutionStrategy, LlmActionAnalysis, LlmDeployPlan, LlmIntentResult, LlmParamResult from .script_runner import ScriptActionRunner, select_script_entry from .skill_policy import load_skill_policy @@ -144,6 +144,38 @@ class PamDeployAgent: target_ips=target_ips or [], ) + def pause_state( + self, + state: AgentState, + *, + reason: str, + review_context: dict[str, Any] | None = None, + ) -> AgentState: + """将当前 state 标记为暂停,并持久化 checkpoint。""" + state.paused = True + state.pause_reason = reason + state.review_context = dict(review_context or {}) + self._save_checkpoint(state) + return state + + def resume_state(self, state: AgentState) -> AgentState: + """清理暂停标记,允许后续继续执行。""" + state.paused = False + state.pause_reason = "" + state.review_context = {} + self._save_checkpoint(state) + return state + + def update_state_params(self, state: AgentState, updates: dict[str, Any]) -> AgentState: + """热更新 state 中的参数,并回写 config 文件。""" + merged = {**state.params, **updates} + normalized = self.normalize_params(merged) + state.params = normalized + if state.config_path: + write_config(normalized, state.config_path) + self._save_checkpoint(state) + return state + def preview(self, params: dict[str, Any], strategy: ExecutionStrategy = "hybrid_node_mcp") -> str: """渲染部署预览,展示参数和 action 路由。""" normalized = self.normalize_params(params) @@ -177,6 +209,9 @@ class PamDeployAgent: def run_global_flow(self, state: AgentState) -> AgentState: """执行全局部署阶段,并跳过 checkpoint 中已完成的步骤。""" + if state.paused: + self._save_checkpoint(state) + return state while True: action = self.next_global_action(state) if action is None: @@ -185,6 +220,8 @@ class PamDeployAgent: def next_global_action(self, state: AgentState) -> str | None: """返回下一个未完成的全局 action。""" + if state.paused: + return None for action in GLOBAL_ACTION_SEQUENCE: if action in state.completed_global_steps: continue @@ -221,7 +258,7 @@ class PamDeployAgent: "message": result.error_summary or "ok", } ) - self._append_action_analysis(state, action, result) + analysis = self._append_action_analysis(state, action, result) if not result.ok: self._emit_progress( { @@ -232,6 +269,11 @@ class PamDeployAgent: } ) state.last_failed_step = action + self.pause_state( + state, + reason="action_failed", + review_context=self._review_context(action=action, analysis=analysis, result=result), + ) self._save_checkpoint(state) raise RuntimeError(f"{action} 执行失败: {result.error_summary}") missing_values = self._missing_required_values(action, result.values) @@ -246,6 +288,16 @@ class PamDeployAgent: } ) state.last_failed_step = action + self.pause_state( + state, + reason="action_missing_required_values", + review_context={ + "type": "action_review", + "stage": action, + "message": message, + "missing_values": missing_values, + }, + ) self._save_checkpoint(state) raise RuntimeError(message) self._apply_result(state, action, result.values) @@ -259,6 +311,14 @@ class PamDeployAgent: "message": result.values.get("MESSAGE", "ok"), } ) + if analysis is not None and not analysis.should_continue: + state.last_failed_step = action + self.pause_state( + state, + reason="llm_review_blocked", + review_context=self._review_context(action=action, analysis=analysis, result=result), + ) + return state self._save_checkpoint(state) return state @@ -269,7 +329,7 @@ class PamDeployAgent: def run_deploy_flow(self, state: AgentState) -> AgentState: """执行完整部署流程:全局阶段后进入逐 IP 阶段。""" - if state.pending_confirmation: + if state.pending_confirmation or state.paused: self._save_checkpoint(state) return state self.run_global_flow(state) @@ -278,6 +338,9 @@ class PamDeployAgent: def run_ip_flow(self, state: AgentState) -> AgentState: """执行逐 IP 部署流程,失败时停在人工确认点。""" + if state.paused: + self._save_checkpoint(state) + return state while True: work = self.next_ip_action(state) if work is None: @@ -287,7 +350,7 @@ class PamDeployAgent: def next_ip_action(self, state: AgentState) -> tuple[str, str] | None: """返回下一个待执行的单 IP action,并按需初始化 IP 状态。""" - if state.pending_confirmation: + if state.pending_confirmation or state.paused: self._save_checkpoint(state) return None self._resolve_target_ips(state) @@ -358,7 +421,7 @@ class PamDeployAgent: "message": result.error_summary or result.values.get("MESSAGE", "ok"), } ) - self._append_action_analysis(state, action, result, ip=ip) + analysis = self._append_action_analysis(state, action, result, ip=ip) if failed: self._emit_progress( @@ -370,6 +433,11 @@ class PamDeployAgent: "message": result.error_summary or result.values.get("MESSAGE", "action 执行失败"), } ) + self.pause_state( + state, + reason="action_failed", + review_context=self._review_context(action=action, analysis=analysis, result=result, ip=ip), + ) self._record_ip_failure(state, ip, action, result.error_summary or str(result.values)) if action != "download-log": self._download_log_best_effort(state, ip) @@ -388,6 +456,13 @@ class PamDeployAgent: "message": result.values.get("MESSAGE", "ok"), } ) + if analysis is not None and not analysis.should_continue: + self.pause_state( + state, + reason="llm_review_blocked", + review_context=self._review_context(action=action, analysis=analysis, result=result, ip=ip), + ) + return state self._save_checkpoint(state) return state @@ -433,6 +508,9 @@ class PamDeployAgent: } ) state.pending_confirmation = "" + state.paused = False + state.pause_reason = "" + state.review_context = {} self._save_checkpoint(state) return state @@ -474,6 +552,9 @@ class PamDeployAgent: state.pending_confirmation = "" state.last_success_step = "rollback-ip" state.last_failed_step = "" + state.paused = False + state.pause_reason = "" + state.review_context = {} self._emit_progress( { "type": "ACTION_DONE", @@ -486,6 +567,8 @@ class PamDeployAgent: else: state.pending_confirmation = f"rollback-ip:{ip}" state.last_failed_step = "rollback-ip" + state.paused = True + state.pause_reason = "rollback_failed" self._emit_progress( { "type": "ACTION_FAIL", @@ -652,17 +735,23 @@ class PamDeployAgent: result, *, ip: str | None = None, - ) -> None: + ) -> Any: """启用 action 后分析时,把诊断结果追加到 events。""" - if not self.action_analysis_enabled: - return + self._emit_progress( + { + "type": "ACTION_REVIEW_START", + "stage": action, + "ip": ip or "", + "message": "LLM 开始分析 action 结果", + } + ) try: analysis = self.llm_client.analyze_action_result( action=action, result=result, state_summary=self._state_summary_for_llm(state, ip=ip), ) - except Exception as exc: # pragma: no cover - 诊断失败不应影响部署主流程 + except Exception as exc: # pragma: no cover - 审核失败时也要显式暂停,避免黑盒继续执行 state.events.append( { "type": "ACTION_ANALYSIS_FAIL", @@ -671,12 +760,42 @@ class PamDeployAgent: "message": str(exc), } ) - return + self._emit_progress( + { + "type": "ACTION_REVIEW_FAIL", + "stage": action, + "ip": ip or "", + "message": str(exc), + } + ) + return LlmActionAnalysis( + action=action, + has_anomaly=True, + severity="high", + possible_reason=f"LLM 审核失败: {exc}", + suggested_action="请检查 LLM 配置、网络或 action 审核提示词文件后再继续。", + requires_confirmation=True, + should_continue=False, + notes=["action 结果未完成 LLM 审核,流程已自动暂停。"], + ) payload = asdict(analysis) payload.update({"type": "ACTION_ANALYSIS", "stage": action}) if ip: payload["ip"] = ip - state.events.append(payload) + if self.action_analysis_enabled: + state.events.append(payload) + self._emit_progress( + { + "type": "ACTION_REVIEW_DONE", + "stage": action, + "ip": ip or "", + "message": analysis.suggested_action or analysis.possible_reason or "LLM 审核完成", + "has_anomaly": analysis.has_anomaly, + "severity": analysis.severity, + "should_continue": analysis.should_continue, + } + ) + return analysis def _state_summary_for_llm(self, state: AgentState, *, ip: str | None = None) -> dict[str, Any]: """生成给 LLM action 分析使用的脱敏状态摘要。""" @@ -689,10 +808,42 @@ class PamDeployAgent: "current_ip": ip or "", "current_ip_state": state.ip_states.get(ip, {}) if ip else {}, "pending_confirmation": state.pending_confirmation, + "paused": state.paused, + "pause_reason": state.pause_reason, "last_success_step": state.last_success_step, "last_failed_step": state.last_failed_step, } + def _review_context( + self, + *, + action: str, + analysis, + result, + ip: str | None = None, + ) -> dict[str, Any]: + """构造面向用户展示的审核暂停上下文。""" + context = { + "type": "action_review", + "stage": action, + "ip": ip or "", + "backend": result.backend, + "ok": result.ok, + "error_summary": result.error_summary, + } + if analysis is not None: + context.update( + { + "severity": analysis.severity, + "has_anomaly": analysis.has_anomaly, + "possible_reason": analysis.possible_reason, + "suggested_action": analysis.suggested_action, + "should_continue": analysis.should_continue, + "notes": list(analysis.notes), + } + ) + return context + def render_report(self, state: AgentState) -> str: """渲染当前部署状态报告。""" success = sum(1 for item in state.ip_states.values() if item.get("status") == "SUCCESS") @@ -710,6 +861,8 @@ class PamDeployAgent: f"- 成功: {success}", f"- 失败: {failed}", f"- 待确认: {state.pending_confirmation or '-'}", + f"- 暂停状态: {'是' if state.paused else '否'}", + f"- 暂停原因: {state.pause_reason or '-'}", "", "| IP | 状态 | 失败阶段 | 回滚状态 | 日志 |", "| --- | --- | --- | --- | --- |", diff --git a/pam_deploy_graph/cli.py b/pam_deploy_graph/cli.py index af8892d..9e6eafc 100644 --- a/pam_deploy_graph/cli.py +++ b/pam_deploy_graph/cli.py @@ -20,6 +20,7 @@ def add_llm_args(parser: argparse.ArgumentParser) -> None: parser.add_argument("--llm-base-url") parser.add_argument("--llm-api-key") parser.add_argument("--llm-model") + parser.add_argument("--llm-action-analysis-prompt-file") def add_mcp_args(parser: argparse.ArgumentParser) -> None: @@ -93,6 +94,7 @@ def main() -> None: run.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"]) run.add_argument("--checkpoint") run.add_argument("--confirm", action="store_true") + add_llm_args(run) add_mcp_args(run) add_action_analysis_arg(run) @@ -102,12 +104,14 @@ def main() -> None: deploy.add_argument("--target-ip", action="append", default=[]) deploy.add_argument("--checkpoint") deploy.add_argument("--confirm", action="store_true") + add_llm_args(deploy) add_mcp_args(deploy) add_action_analysis_arg(deploy) resume = sub.add_parser("resume") resume.add_argument("--checkpoint", required=True) resume.add_argument("--confirm", action="store_true") + add_llm_args(resume) add_mcp_args(resume) add_action_analysis_arg(resume) @@ -116,17 +120,19 @@ def main() -> None: confirm.add_argument("--decision", required=True, choices=["approve", "reject"]) confirm.add_argument("--note", default="") confirm.add_argument("--confirm", action="store_true") + add_llm_args(confirm) add_mcp_args(confirm) add_action_analysis_arg(confirm) args = parser.parse_args() params = load_params_file(args.config) if getattr(args, "config", None) else {} llm_client = None - if args.command in ("analyze", "chat"): + if args.command != "preview": llm_client = build_llm_client( - base_url=args.llm_base_url, - api_key=args.llm_api_key, - model=args.llm_model, + base_url=getattr(args, "llm_base_url", None), + api_key=getattr(args, "llm_api_key", None), + model=getattr(args, "llm_model", None), + action_analysis_prompt_path=getattr(args, "llm_action_analysis_prompt_file", None), ) mcp_runner = None if getattr(args, "mcp_config", None): @@ -173,6 +179,8 @@ def main() -> None: if args.command == "resume": state = load_agent_state(args.checkpoint) state.checkpoint_path = state.checkpoint_path or args.checkpoint + if state.paused: + state = agent.resume_state(state) result = run_graph_once(agent, state, flow="deploy") print_graph_result(agent, result) return diff --git a/pam_deploy_graph/interactive.py b/pam_deploy_graph/interactive.py index 87739e1..fa83198 100644 --- a/pam_deploy_graph/interactive.py +++ b/pam_deploy_graph/interactive.py @@ -19,6 +19,7 @@ from .llm import build_llm_client from .llm.rule_based import RuleBasedLlmClient from .mcp_factory import build_mcp_runner_from_config from .models import AgentState, ExecutionStrategy +from .params_loader import load_params_file InputFunc = Callable[[str], str] OutputFunc = Callable[[str], None] @@ -30,9 +31,9 @@ COMMAND_HELP = """可用命令: params 脱敏展示当前会话参数 events [数量] 查看最近 action 事件,默认 10 条 set KEY=VALUE 修改当前会话参数 - llm config KEY=VALUE 配置真实 LLM,支持 base_url/api_key/model + llm config KEY=VALUE 配置真实 LLM,支持 base_url/api_key/model/action_analysis_prompt_file llm fallback 切回本地规则 fallback - llm action-analysis on|off 开关 action 后诊断 + llm action-analysis on|off 开关 action 审核详情写入 events mcp config <路径> 加载 MCP client JSON 配置 run 创建部署任务并执行 status 查看当前运行状态 @@ -40,11 +41,13 @@ COMMAND_HELP = """可用命令: reject [原因] 拒绝待处理回滚 resume 从当前 checkpoint 续跑 list checkpoints 列出 checkpoint 目录下的 JSON 文件 + load params <路径> 加载并热更新参数文件 load checkpoint <路径> 加载指定 checkpoint checkpoint 显示 checkpoint 路径 exit 退出 也可以直接输入自然语言需求,Agent 会先分析并更新会话参数;执行仍需输入 run。 +执行中可按 Ctrl+C 中断,保存 checkpoint 后再用 resume 继续。 """ @@ -85,6 +88,9 @@ class InteractiveCliSession: while True: try: line = self.input("pam-deploy-agent> ") + except KeyboardInterrupt: + self.output("已取消当前输入。输入 exit 退出,或继续输入命令。") + continue except EOFError: self.output("bye") return @@ -148,6 +154,9 @@ class InteractiveCliSession: if normalized == "list" and rest.strip().lower() == "checkpoints": self._list_checkpoints() return True + if normalized == "load" and rest.strip().lower().startswith("params"): + self._load_params(rest.strip()[len("params") :].strip()) + return True if normalized == "load" and rest.strip().lower().startswith("checkpoint"): self._load_checkpoint(rest.strip()[len("checkpoint") :].strip()) return True @@ -184,6 +193,7 @@ class InteractiveCliSession: user_ips = param_result.extracted_control.get("user_specified_ips") if isinstance(user_ips, list): self.target_ips = [str(item) for item in user_ips] + self._sync_params_to_state() safe_payload = redact_mapping({key: asdict(value) for key, value in result.items()}) self.output("已生成结构化理解:") @@ -208,6 +218,7 @@ class InteractiveCliSession: self.output("参数名不能为空。") return self.params[key] = value.strip() + self._sync_params_to_state() self.output(f"已设置 {key}") def _show_params(self) -> None: @@ -230,7 +241,7 @@ class InteractiveCliSession: def _configure_llm(self, text: str) -> None: """热加载 LLM 配置,或开关 action 后诊断。""" if not text: - self.output("格式:llm config base_url=... api_key=... model=... | llm fallback | llm action-analysis on|off") + self.output("格式:llm config base_url=... api_key=... model=... action_analysis_prompt_file=... | llm fallback | llm action-analysis on|off") return parts = shlex.split(text) if parts[0] == "fallback": @@ -243,7 +254,7 @@ class InteractiveCliSession: self.output("格式:llm action-analysis on|off") return self.agent.action_analysis_enabled = parts[1] == "on" - self.output(f"action 后诊断已{'开启' if self.agent.action_analysis_enabled else '关闭'}。") + self.output(f"action 审核详情写入 events 已{'开启' if self.agent.action_analysis_enabled else '关闭'}。") return if parts[0] != "config": self.output("未知 llm 命令。") @@ -255,6 +266,7 @@ class InteractiveCliSession: base_url=self.llm_config.get("base_url"), api_key=self.llm_config.get("api_key"), model=self.llm_config.get("model"), + action_analysis_prompt_path=self.llm_config.get("action_analysis_prompt_file"), ) except Exception as exc: self.output(f"LLM 配置失败: {exc}") @@ -315,6 +327,31 @@ class InteractiveCliSession: self.output(f"已加载 checkpoint: {checkpoint}") if self.state.pending_confirmation: self._print_confirmation() + self._print_pause_context() + + def _load_params(self, path_text: str) -> None: + """从参数文件热更新当前会话参数,并同步到已暂停 state。""" + if not path_text: + self.output("格式:load params <路径>") + return + path = Path(path_text) + if not path.exists(): + self.output(f"参数文件不存在: {path}") + return + try: + updates = load_params_file(path) + except Exception as exc: + self.output(f"参数文件加载失败: {exc}") + return + self.params.update(updates) + try: + self.params = self.agent.normalize_params(self.params) + except ValueError as exc: + self.output(f"参数热更新失败: {exc}") + return + self._sync_params_to_state() + self.output(f"已加载参数文件: {path}") + self.output(_format_redacted_params(redact_mapping(self.params))) def _run_deploy(self) -> None: """在用户确认后创建状态并执行完整部署流程。""" @@ -370,6 +407,8 @@ class InteractiveCliSession: return self.state = load_agent_state(checkpoint) self.state.checkpoint_path = self.state.checkpoint_path or str(checkpoint) + if self.state.paused: + self.state = self.agent.resume_state(self.state) if self.graph_runtime and self.graph_runtime.waiting_confirmation: self._print_confirmation() return @@ -388,6 +427,9 @@ class InteractiveCliSession: self.graph_runtime = None try: self.state = self.agent.run_deploy_flow(self.state) + except KeyboardInterrupt: + self._handle_execution_interrupt() + return except Exception as fallback_exc: self._handle_execution_error(fallback_exc) return @@ -395,6 +437,9 @@ class InteractiveCliSession: return try: result = self.graph_runtime.start(self.state) + except KeyboardInterrupt: + self._handle_execution_interrupt() + return except Exception as exc: self._handle_execution_error(exc) return @@ -432,11 +477,27 @@ class InteractiveCliSession: return if self.state.last_failed_step: self.output(f"最后失败步骤: {self.state.last_failed_step}") + self._print_pause_context() if self.state.pending_confirmation: self._print_confirmation() self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}") self.output("请修正参数或外部环境后,使用 load checkpoint <路径> / resume 继续,或重新 run。") + def _handle_execution_interrupt(self) -> None: + """处理执行中的用户中断,并保留断点。""" + if self.state is None: + self.output("执行已中断。") + return + self.graph_runtime = None + self.state = self.agent.pause_state( + self.state, + reason="user_interrupted", + review_context={"type": "user_interrupt", "message": "用户手动中断执行"}, + ) + self.output("执行已由用户中断,当前 checkpoint 已保存。") + self._print_pause_context() + self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}") + def _apply_graph_result(self, result: LangGraphRunResult) -> None: """把 LangGraph 运行结果同步回 chat 会话并输出用户可见状态。""" if result.state is not None: @@ -449,6 +510,7 @@ class InteractiveCliSession: self._print_confirmation_request(result.confirmation) elif self.state.pending_confirmation: self._print_confirmation() + self._print_pause_context() self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}") def _print_state_report_and_checkpoint(self) -> None: @@ -456,6 +518,7 @@ class InteractiveCliSession: if self.state is None: return self.output(self.agent.render_report(self.state)) + self._print_pause_context() if self.state.pending_confirmation: self._print_confirmation() self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}") @@ -467,6 +530,7 @@ class InteractiveCliSession: self.output(f"checkpoint: {self.checkpoint_path}") return self.output(self.agent.render_report(self.state)) + self._print_pause_context() if self.state.pending_confirmation: self._print_confirmation() @@ -495,9 +559,49 @@ class InteractiveCliSession: self.state = self.agent.confirm_pending(self.state, approved=approved, operator_note=note) self.output(self.agent.render_report(self.state)) + self._print_pause_context() if self.state.pending_confirmation: self._print_confirmation() + def _sync_params_to_state(self) -> None: + """若当前已有 state,则把热更新参数同步到 checkpoint/config。""" + if self.state is None: + return + try: + self.state = self.agent.update_state_params(self.state, self.params) + except ValueError as exc: + self.output(f"参数同步到当前任务失败: {exc}") + return + self.params = dict(self.state.params) + if self.target_ips: + self.state.target_ips = list(self.target_ips) + + def _print_pause_context(self) -> None: + """输出暂停原因和审核建议,避免黑盒暂停。""" + if self.state is None or not self.state.paused: + return + context = self.state.review_context or {} + reason = self.state.pause_reason or "unknown" + self.output(f"当前流程已暂停: {reason}") + if context.get("stage"): + self.output(f"- stage: {context.get('stage')}") + if context.get("ip"): + self.output(f"- ip: {context.get('ip')}") + if context.get("possible_reason"): + self.output(f"- reason: {context.get('possible_reason')}") + elif context.get("error_summary"): + self.output(f"- reason: {context.get('error_summary')}") + if context.get("suggested_action"): + self.output(f"- suggestion: {context.get('suggested_action')}") + if context.get("severity"): + self.output(f"- severity: {context.get('severity')}") + if context.get("notes"): + self.output("- notes: " + "; ".join(str(item) for item in context.get("notes", []))) + if reason == "user_interrupted": + self.output("输入 resume 可从当前 checkpoint 继续。") + elif reason == "llm_review_blocked": + self.output("请根据以上建议判断后续;如需继续,输入 resume。") + def _on_progress(self, payload: dict[str, Any]) -> None: """把 Agent action 进度转成 chat 可见输出。""" event_type = str(payload.get("type", "")) @@ -519,6 +623,14 @@ class InteractiveCliSession: elif event_type == "ACTION_FAIL": detail = f": {message}" if message else "" self.output(f"失败 action: {stage}{suffix}{detail}") + elif event_type == "ACTION_REVIEW_START": + self.output(f"开始分析 action 结果: {stage}{suffix}") + elif event_type == "ACTION_REVIEW_DONE": + detail = f": {message}" if message else "" + self.output(f"分析完成: {stage}{suffix}{detail}") + elif event_type == "ACTION_REVIEW_FAIL": + detail = f": {message}" if message else "" + self.output(f"分析失败: {stage}{suffix}{detail}") def _print_confirmation(self) -> None: """输出当前待人工确认事项。""" @@ -559,6 +671,7 @@ class InteractiveCliSession: self.output(f"已加载 checkpoint: {checkpoint}") if self.state.pending_confirmation: self._print_confirmation() + self._print_pause_context() def run_interactive_chat( @@ -704,6 +817,7 @@ def _build_prompt_input(input_func: InputFunc) -> InputFunc: "reject", "resume", "list checkpoints", + "load params", "load checkpoint", "checkpoint", "exit", diff --git a/pam_deploy_graph/llm/base.py b/pam_deploy_graph/llm/base.py index 9d8a31c..eeefe0f 100644 --- a/pam_deploy_graph/llm/base.py +++ b/pam_deploy_graph/llm/base.py @@ -42,5 +42,5 @@ class LlmClient(Protocol): result: ActionResult, state_summary: dict[str, Any], ) -> LlmActionAnalysis: - """分析 action 执行结果,并给出辅助诊断建议。""" + """分析 action 执行结果,并给出是否允许继续执行的建议。""" ... diff --git a/pam_deploy_graph/llm/factory.py b/pam_deploy_graph/llm/factory.py index 189fc9b..f85d676 100644 --- a/pam_deploy_graph/llm/factory.py +++ b/pam_deploy_graph/llm/factory.py @@ -5,7 +5,7 @@ from __future__ import annotations import os from .base import LlmClient -from .openai_compatible import OpenAICompatibleLlmClient +from .openai_compatible import OpenAICompatibleLlmClient, load_prompt_text from .rule_based import RuleBasedLlmClient @@ -14,11 +14,17 @@ def build_llm_client( base_url: str | None = None, api_key: str | None = None, model: str | None = None, + action_analysis_prompt_path: str | None = None, ) -> LlmClient: """根据显式参数或环境变量构造 LLM client。""" - actual_base_url = base_url or os.getenv("PAM_LLM_BASE_URL", "") - actual_api_key = api_key or os.getenv("PAM_LLM_API_KEY", "") - actual_model = model or os.getenv("PAM_LLM_MODEL", "") + actual_base_url = base_url if base_url is not None else os.getenv("PAM_LLM_BASE_URL", "") + actual_api_key = api_key if api_key is not None else os.getenv("PAM_LLM_API_KEY", "") + actual_model = model if model is not None else os.getenv("PAM_LLM_MODEL", "") + actual_action_prompt_path = ( + action_analysis_prompt_path + if action_analysis_prompt_path is not None + else os.getenv("PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE", "") + ) if not actual_base_url and not actual_api_key and not actual_model: return RuleBasedLlmClient() @@ -35,4 +41,5 @@ def build_llm_client( base_url=actual_base_url, api_key=actual_api_key, model=actual_model, + action_analysis_prompt=load_prompt_text(actual_action_prompt_path), ) diff --git a/pam_deploy_graph/llm/openai_compatible.py b/pam_deploy_graph/llm/openai_compatible.py index 76e6ed2..45d1b77 100644 --- a/pam_deploy_graph/llm/openai_compatible.py +++ b/pam_deploy_graph/llm/openai_compatible.py @@ -7,6 +7,7 @@ from __future__ import annotations import json +from pathlib import Path import urllib.request from collections.abc import Callable from typing import Any @@ -36,6 +37,7 @@ class OpenAICompatibleLlmClient: base_url: str, api_key: str, model: str, + action_analysis_prompt: str | None = None, timeout_sec: float = 30, temperature: float = 0, transport: JsonTransport | None = None, @@ -48,6 +50,7 @@ class OpenAICompatibleLlmClient: self.base_url = base_url.rstrip("/") self.api_key = api_key self.model = model + self.action_analysis_prompt = action_analysis_prompt or ACTION_ANALYSIS_PROMPT self.timeout_sec = timeout_sec self.temperature = temperature self.transport = transport or _default_transport @@ -135,7 +138,7 @@ class OpenAICompatibleLlmClient: ) -> LlmActionAnalysis: """调用 LLM 分析 action 结果,返回结构化诊断建议。""" payload = self._complete_json( - ACTION_ANALYSIS_PROMPT, + self.action_analysis_prompt, { "action": action, "result": { @@ -157,6 +160,7 @@ class OpenAICompatibleLlmClient: possible_reason=_string(payload, "possible_reason", ""), suggested_action=_string(payload, "suggested_action", ""), requires_confirmation=bool(payload.get("requires_confirmation", False)), + should_continue=bool(payload.get("should_continue", True)), notes=_string_list(payload.get("notes")), ) @@ -213,6 +217,14 @@ def _default_transport( return decoded +def load_prompt_text(path: str | None) -> str: + """读取自定义提示词文件。""" + if not path: + return ACTION_ANALYSIS_PROMPT + prompt_path = Path(path) + return prompt_path.read_text(encoding="utf-8").strip() or ACTION_ANALYSIS_PROMPT + + def _chat_completions_url(base_url: str) -> str: """把 base_url 规范化为 chat/completions endpoint。""" clean = base_url.rstrip("/") diff --git a/pam_deploy_graph/llm/prompts.py b/pam_deploy_graph/llm/prompts.py index 674eed7..2c59869 100644 --- a/pam_deploy_graph/llm/prompts.py +++ b/pam_deploy_graph/llm/prompts.py @@ -76,11 +76,12 @@ ACTION_ANALYSIS_PROMPT = """分析一次 PAM action 执行结果。 "possible_reason": "...", "suggested_action": "...", "requires_confirmation": false, + "should_continue": true, "notes": ["..."] } 要求: -- 只给诊断建议,不决定继续执行、回滚或修改参数。 +- 必须明确给出 `should_continue`:没有问题时为 true;存在需要人工判断的问题时为 false。 - 如果 exit_code 非 0、ok=false、verify-ip SUCCESS=false、出现 pending_confirmation,应标记异常。 - 不要输出密钥、token、Authorization 或完整日志原文。 """ diff --git a/pam_deploy_graph/llm/rule_based.py b/pam_deploy_graph/llm/rule_based.py index 363371e..0e55dc3 100644 --- a/pam_deploy_graph/llm/rule_based.py +++ b/pam_deploy_graph/llm/rule_based.py @@ -161,12 +161,14 @@ class RuleBasedLlmClient: possible_reason = "" suggested_action = "继续观察。" requires_confirmation = False + should_continue = True if not result.ok: severity = "medium" possible_reason = result.error_summary or "action 返回失败状态。" suggested_action = "查看 action stderr/raw_output,确认参数、网络和目标服务状态。" notes.append("硬规则检测到 action 执行失败。") + should_continue = False if action == "verify-ip": success = result.values.get("SUCCESS") @@ -177,12 +179,14 @@ class RuleBasedLlmClient: suggested_action = "先下载日志并人工确认是否执行回滚。" requires_confirmation = True notes.append("verify-ip SUCCESS 非成功值。") + should_continue = False if action == "rollback-ip" and not result.ok: severity = "high" suggested_action = "保持待确认状态,人工排查回滚失败原因后重试或转人工处理。" requires_confirmation = True notes.append("rollback-ip 失败需要人工处理。") + should_continue = False if result.values.get("PENDING_AGENT_CONFIRMATION"): has_anomaly = True @@ -191,6 +195,7 @@ class RuleBasedLlmClient: suggested_action = "暂停自动流程,等待人工确认。" requires_confirmation = True notes.append("action 返回待人工确认标记。") + should_continue = False return LlmActionAnalysis( action=action, @@ -199,6 +204,7 @@ class RuleBasedLlmClient: possible_reason=possible_reason, suggested_action=suggested_action, requires_confirmation=requires_confirmation, + should_continue=should_continue, notes=notes, ) diff --git a/pam_deploy_graph/models.py b/pam_deploy_graph/models.py index 90c8e91..2f645bd 100644 --- a/pam_deploy_graph/models.py +++ b/pam_deploy_graph/models.py @@ -99,6 +99,7 @@ class LlmActionAnalysis: possible_reason: str = "" suggested_action: str = "" requires_confirmation: bool = False + should_continue: bool = True notes: list[str] = field(default_factory=list) @@ -126,4 +127,7 @@ class AgentState: last_success_step: str = "" last_failed_step: str = "" checkpoint_path: str = "" + paused: bool = False + pause_reason: str = "" + review_context: dict[str, Any] = field(default_factory=dict) events: list[dict[str, Any]] = field(default_factory=list) diff --git a/pam_deploy_graph/tool_catalog.py b/pam_deploy_graph/tool_catalog.py new file mode 100644 index 0000000..62ef370 --- /dev/null +++ b/pam_deploy_graph/tool_catalog.py @@ -0,0 +1,214 @@ +"""统一维护面向 LLM 和 runtime 的 PAM action tool schema。""" + +from __future__ import annotations + +from pam_deploy_graph.action_router import build_action_backends +from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE +from pam_deploy_graph.models import AgentExecutionMode, ExecutionStrategy +from pam_deploy_graph.models import ActionToolSpec, SkillPolicy + + +ACTION_TOOL_SPECS: dict[str, ActionToolSpec] = { + "get-token": ActionToolSpec( + name="get_token", + action="get-token", + scope="global", + description="获取 PAM HOME OAuth token。", + risk_level="low", + required_param_fields=("HOME_BASE_URL", "CLIENT_ID", "CLIENT_SECRET"), + preferred_backend="script", + ), + "create-version": ActionToolSpec( + name="create_version", + action="create-version", + scope="global", + description="创建版本记录。", + risk_level="medium", + preferred_backend="script", + ), + "upload-package": ActionToolSpec( + name="upload_package", + action="upload-package", + scope="global", + description="上传软件包并返回 HASH_CODE。", + risk_level="high", + preferred_backend="script", + ), + "publish-version": ActionToolSpec( + name="publish_version", + action="publish-version", + scope="global", + description="发布版本,需要已有 HASH_CODE。", + risk_level="high", + requires_confirmation=True, + required_runtime_fields=("hash_code",), + preferred_backend="script", + ), + "get-node-url": ActionToolSpec( + name="get_node_url", + action="get-node-url", + scope="global", + description="获取目标 PAM NODE 地址。", + risk_level="low", + preferred_backend="script", + ), + "get-online-ips": ActionToolSpec( + name="get_online_ips", + action="get-online-ips", + scope="global", + description="获取当前在线工作站 IP 列表。", + risk_level="low", + ), + "create-download-task": ActionToolSpec( + name="create_download_task", + action="create-download-task", + scope="global", + description="创建云下载任务。", + risk_level="high", + ), + "poll-download-progress": ActionToolSpec( + name="poll_download_progress", + action="poll-download-progress", + scope="global", + description="轮询云下载任务进度。", + risk_level="medium", + ), + "upgrade-ip": ActionToolSpec( + name="upgrade_ip", + action="upgrade-ip", + scope="ip", + description="对单个工作站创建升级任务。", + risk_level="high", + requires_confirmation=True, + ), + "poll-upgrade-progress": ActionToolSpec( + name="poll_upgrade_progress", + action="poll-upgrade-progress", + scope="ip", + description="轮询单个工作站升级进度。", + risk_level="medium", + ), + "start-ip": ActionToolSpec( + name="start_ip", + action="start-ip", + scope="ip", + description="启动单个工作站应用。", + risk_level="high", + requires_confirmation=True, + ), + "stop-ip": ActionToolSpec( + name="stop_ip", + action="stop-ip", + scope="ip", + description="停止单个工作站应用。", + risk_level="high", + requires_confirmation=True, + ), + "verify-ip": ActionToolSpec( + name="verify_ip", + action="verify-ip", + scope="ip", + description="校验单个工作站版本和健康状态。", + risk_level="medium", + ), + "download-log": ActionToolSpec( + name="download_log", + action="download-log", + scope="ip", + description="下载单个工作站日志。", + risk_level="low", + ), + "rollback-ip": ActionToolSpec( + name="rollback_ip", + action="rollback-ip", + scope="ip", + description="对单个工作站执行回滚。", + risk_level="high", + requires_confirmation=True, + ), +} + + +def ordered_actions_for_skill(policy: SkillPolicy) -> list[str]: + """根据 skill 策略返回默认 action 顺序。""" + global_actions = list(policy.action_sequence or GLOBAL_ACTION_SEQUENCE) + ip_actions = list(policy.ip_action_sequence or IP_ACTION_SEQUENCE) + return [*global_actions, *ip_actions] + + +ACTION_DEPENDENCIES: dict[str, tuple[str, ...]] = { + "create-version": ("get-token",), + "upload-package": ("get-token", "create-version"), + "publish-version": ("get-token", "create-version", "upload-package"), + "get-node-url": ("get-token",), + "get-online-ips": ("get-token", "get-node-url"), + "create-download-task": ("get-token", "get-node-url", "get-online-ips"), + "poll-download-progress": ("get-token", "get-node-url", "get-online-ips", "create-download-task"), +} +for _ip_action in IP_ACTION_SEQUENCE: + ACTION_DEPENDENCIES[_ip_action] = tuple(GLOBAL_ACTION_SEQUENCE) + + +def allowed_tool_specs(policy: SkillPolicy) -> list[ActionToolSpec]: + """按 skill 限制过滤并排序 tool specs。""" + ordered_actions = ordered_actions_for_skill(policy) + specs: list[ActionToolSpec] = [] + for action in ordered_actions: + if action not in policy.allowed_actions: + continue + if action in policy.forbidden_actions: + continue + spec = ACTION_TOOL_SPECS.get(action) + if spec is not None: + specs.append(spec) + return specs + + +def tool_summaries(policy: SkillPolicy, strategy: ExecutionStrategy) -> list[dict[str, str]]: + """生成给 LLM 使用的受控 tool 摘要。""" + routes = build_action_backends(strategy) + summaries: list[dict[str, str]] = [] + for spec in allowed_tool_specs(policy): + summaries.append( + { + "name": spec.name, + "action": spec.action, + "scope": spec.scope, + "description": spec.description, + "risk_level": spec.risk_level, + "backend": routes.get(spec.action, spec.preferred_backend or ""), + "requires_confirmation": "true" if spec.requires_confirmation else "false", + } + ) + return summaries + + +def normalize_planned_actions( + planned_actions: list[str], + *, + policy: SkillPolicy, + mode: AgentExecutionMode, +) -> list[str]: + """按 skill 限制和依赖关系归一化 planned actions。""" + allowed = set(policy.allowed_actions) + forbidden = set(policy.forbidden_actions) + ordered = ordered_actions_for_skill(policy) + if not planned_actions: + return [action for action in ordered if action in allowed and action not in forbidden] + + normalized: list[str] = [] + for action in planned_actions: + if action in allowed and action not in forbidden and action not in normalized: + normalized.append(action) + + expanded: list[str] = [] + for action in normalized: + for dependency in ACTION_DEPENDENCIES.get(action, ()): + if dependency in allowed and dependency not in forbidden and dependency not in expanded: + expanded.append(dependency) + if action not in expanded: + expanded.append(action) + + global_order = [action for action in ordered if action in GLOBAL_ACTION_SEQUENCE and action in expanded] + ip_order = [action for action in ordered if action in IP_ACTION_SEQUENCE and action in expanded] + return [*global_order, *ip_order] diff --git a/prompts/action_review.txt b/prompts/action_review.txt new file mode 100644 index 0000000..a9a15fc --- /dev/null +++ b/prompts/action_review.txt @@ -0,0 +1,18 @@ +分析一次 PAM action 执行结果。 + +输出 JSON schema: +{ + "action": "...", + "has_anomaly": false, + "severity": "info|low|medium|high", + "possible_reason": "...", + "suggested_action": "...", + "requires_confirmation": false, + "should_continue": true, + "notes": ["..."] +} + +要求: +- 必须明确给出 `should_continue`:没有问题时为 true;存在需要人工判断的问题时为 false。 +- 如果 exit_code 非 0、ok=false、verify-ip SUCCESS=false、出现 pending_confirmation,应标记异常。 +- 不要输出密钥、token、Authorization 或完整日志原文。 diff --git a/tests/test_agent_flow.py b/tests/test_agent_flow.py index 66a8dd1..b8073b3 100644 --- a/tests/test_agent_flow.py +++ b/tests/test_agent_flow.py @@ -6,6 +6,7 @@ from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.checkpoint_store import load_agent_state from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE from pam_deploy_graph.fake_runner import FakeActionRunner +from pam_deploy_graph.models import LlmActionAnalysis PARAMS = { @@ -20,6 +21,25 @@ PARAMS = { } +class BlockingReviewLlmClient: + def analyze_action_result(self, *, action, result, state_summary): + return LlmActionAnalysis( + action=action, + has_anomaly=True, + severity="high", + possible_reason="review blocked", + suggested_action="stop and inspect", + requires_confirmation=True, + should_continue=False, + notes=["blocked by test llm"], + ) + + +class BrokenReviewLlmClient: + def analyze_action_result(self, *, action, result, state_summary): + raise RuntimeError("review transport failed") + + def test_run_deploy_flow_success(tmp_path: Path): agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( @@ -124,6 +144,49 @@ def test_action_analysis_event_is_recorded_when_enabled(tmp_path: Path): assert verify_analysis["requires_confirmation"] is True +def test_successful_action_can_be_blocked_by_llm_review(tmp_path: Path): + agent = PamDeployAgent( + fake_runner=FakeActionRunner(), + llm_client=BlockingReviewLlmClient(), + ) + state = agent.create_state( + params=PARAMS, + execution_strategy="fake", + config_path=str(tmp_path / "config.txt"), + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + agent.run_deploy_flow(state) + + assert state.paused is True + assert state.pause_reason == "llm_review_blocked" + assert state.last_failed_step == "get-token" + assert state.completed_global_steps == ["get-token"] + assert state.review_context["stage"] == "get-token" + assert state.review_context["suggested_action"] == "stop and inspect" + + +def test_action_review_failure_pauses_flow(tmp_path: Path): + agent = PamDeployAgent( + fake_runner=FakeActionRunner(), + llm_client=BrokenReviewLlmClient(), + ) + state = agent.create_state( + params=PARAMS, + execution_strategy="fake", + config_path=str(tmp_path / "config.txt"), + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + agent.run_deploy_flow(state) + + assert state.paused is True + assert state.pause_reason == "llm_review_blocked" + assert state.review_context["stage"] == "get-token" + assert "LLM 审核失败" in state.review_context["possible_reason"] + assert any(event["type"] == "ACTION_ANALYSIS_FAIL" for event in state.events) + + def test_confirm_pending_rollback_runs_rollback_and_resume_continues(tmp_path: Path): fake = FakeActionRunner( { @@ -215,3 +278,54 @@ def test_checkpoint_resume_skips_completed_global_and_success_ip(tmp_path: Path) assert "get-token" not in called_actions assert all(call[1].get("ip") != "192.168.1.10" for call in fake.calls) assert loaded.ip_states["192.168.1.11"]["status"] == "SUCCESS" + + +def test_update_state_params_rewrites_config_and_checkpoint(tmp_path: Path): + initial_package = tmp_path / "pkg-a.zip" + updated_package = tmp_path / "pkg-b.zip" + checkpoint = tmp_path / "checkpoint.json" + config_path = tmp_path / "config.txt" + agent = PamDeployAgent(fake_runner=FakeActionRunner()) + state = agent.create_state( + params={**PARAMS, "ZIP_FILE_PATH": str(initial_package)}, + execution_strategy="fake", + config_path=str(config_path), + checkpoint_path=str(checkpoint), + ) + + agent.update_state_params( + state, + { + "APP_NAME": "PAM-NEW", + "ZIP_FILE_PATH": str(updated_package), + }, + ) + loaded = load_agent_state(checkpoint) + config_text = config_path.read_text(encoding="utf-8") + + assert state.params["APP_NAME"] == "PAM-NEW" + assert state.params["ZIP_FILE_PATH"] == str(updated_package.resolve()) + assert loaded.params["APP_NAME"] == "PAM-NEW" + assert loaded.params["ZIP_FILE_PATH"] == str(updated_package.resolve()) + assert "APP_NAME=PAM-NEW" in config_text + assert f"ZIP_FILE_PATH={updated_package.resolve()}" in config_text + + +def test_resume_state_clears_pause_fields(tmp_path: Path): + checkpoint = tmp_path / "checkpoint.json" + agent = PamDeployAgent(fake_runner=FakeActionRunner()) + state = agent.create_state( + params=PARAMS, + execution_strategy="fake", + checkpoint_path=str(checkpoint), + ) + + agent.pause_state(state, reason="manual_test", review_context={"stage": "get-token"}) + resumed = agent.resume_state(state) + loaded = load_agent_state(checkpoint) + + assert resumed.paused is False + assert resumed.pause_reason == "" + assert resumed.review_context == {} + assert loaded.paused is False + assert loaded.pause_reason == "" diff --git a/tests/test_interactive_cli.py b/tests/test_interactive_cli.py index bccebfa..17b0d39 100644 --- a/tests/test_interactive_cli.py +++ b/tests/test_interactive_cli.py @@ -6,6 +6,7 @@ import pytest from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.fake_runner import FakeActionRunner from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input +from pam_deploy_graph.models import LlmActionAnalysis PARAMS = { @@ -20,6 +21,20 @@ PARAMS = { } +class BlockingReviewLlmClient: + def analyze_action_result(self, *, action, result, state_summary): + return LlmActionAnalysis( + action=action, + has_anomaly=True, + severity="high", + possible_reason="review blocked", + suggested_action="stop and inspect", + requires_confirmation=True, + should_continue=False, + notes=["blocked by test llm"], + ) + + def run_session(session: InteractiveCliSession, inputs: list[str]) -> list[str]: output: list[str] = [] iterator = iter(inputs) @@ -74,6 +89,8 @@ def test_chat_run_prints_action_progress(tmp_path: Path): assert any("开始执行 action: get-token" in item for item in output) assert any("完成 action: verify-ip" in item for item in output) + assert any("开始分析 action 结果: get-token" in item for item in output) + assert any("分析完成: verify-ip" in item for item in output) def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path): @@ -181,6 +198,68 @@ def test_chat_params_events_and_checkpoint_commands(tmp_path: Path): assert any("checkpoint 列表" in item for item in output) +def test_chat_load_params_hot_updates_running_state_and_config(tmp_path: Path): + checkpoint = tmp_path / "checkpoint.json" + params_file = tmp_path / "params.txt" + params_file.write_text( + "\n".join( + [ + "APP_NAME=PAM-HOT", + f"ZIP_FILE_PATH={tmp_path / 'updated.zip'}", + ] + ) + + "\n", + encoding="utf-8", + ) + session = InteractiveCliSession( + agent=PamDeployAgent(fake_runner=FakeActionRunner()), + params=PARAMS, + strategy="fake", + checkpoint_path=str(checkpoint), + ) + + run_session( + session, + [ + "run", + "yes", + "yes", + "yes", + "load params " + str(params_file), + "exit", + ], + ) + + assert session.state is not None + assert session.state.params["APP_NAME"] == "PAM-HOT" + assert session.state.params["ZIP_FILE_PATH"] == str((tmp_path / "updated.zip").resolve()) + config_text = Path(session.state.config_path).read_text(encoding="utf-8") + assert "APP_NAME=PAM-HOT" in config_text + assert f"ZIP_FILE_PATH={(tmp_path / 'updated.zip').resolve()}" in config_text + + +def test_chat_llm_review_block_message_is_visible(tmp_path: Path): + checkpoint = tmp_path / "checkpoint.json" + session = InteractiveCliSession( + agent=PamDeployAgent( + fake_runner=FakeActionRunner(), + llm_client=BlockingReviewLlmClient(), + ), + params=PARAMS, + strategy="fake", + checkpoint_path=str(checkpoint), + ) + + output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) + + assert session.state is not None + assert session.state.paused is True + assert session.state.pause_reason == "llm_review_blocked" + assert any("当前流程已暂停: llm_review_blocked" in item for item in output) + assert any("- suggestion: stop and inspect" in item for item in output) + assert any("如需继续,输入 resume" in item for item in output) + + def test_chat_can_hot_load_mcp_config(tmp_path: Path): mcp_config = tmp_path / "mcp.json" mcp_config.write_text('{"transport": "stdio", "command": "python"}', encoding="utf-8")