Compare commits

..

12 Commits

Author SHA1 Message Date
dark
1b4c07fec6 增强过滤 2026-06-05 13:18:52 +08:00
dark
87c48a74a5 新增 <think>...</think> 过滤器,支持完整标签、跨流式 chunk 标签、未闭合 <think>。
OpenAICompatibleLlmClient 新增 chat_stream(),使用 OpenAI-compatible /chat/completions 的 stream: true。
chat 普通对话现在优先流式分段输出;流式不可用或服务端不返回 SSE 时,会提示并自动 fallback 到非流式 chat()。
普通 chat 和 log analyze 都会过滤 think 内容,并且日志只记录过滤后的摘要。
更新了 chat/log 分析提示词,明确禁止输出 think/内部思考。
同步 README、打包 README、run.sh --help。
补充了过滤器、OpenAI 流式、CLI fallback、日志分析过滤等测试。
2026-06-05 12:32:58 +08:00
dark
85afabcd94 增强 chat LLM 交互与单 action 执行能力
- 扩展 LLM client 协议,支持普通对话、日志分析和单 action 解析
- chat 非内置输入默认进入 LLM 普通对话,不再本地拦截问候
- 新增 ask、log analyze、action propose、action run 等交互命令
- 单 action 执行前强制人工确认,并复用现有 ActionRouter、审核、事件和 checkpoint
- 日志分析默认读取尾部内容并脱敏后再提交给 LLM
- 更新 README、发布包 README 和 run.sh help
- 补充 LLM 与 chat 交互相关测试
2026-06-05 11:49:13 +08:00
dark
33065f6c09 更新日志维护策略 2026-06-05 10:41:24 +08:00
dark
039a3e1bdc 支持云下载继承版本参数并调整回滚请求格式
- 新增 PARENT_VERSION_NUMBER 可选配置,默认空值不传
- create-download-task 非空时透传 parentVersionNumber
- 支持 LLM/规则从自然语言和 key=value 中抽取继承版本参数
- 将 rollback 接口参数从表单 body 改为 URL query 拼接
- 同步 README、打包说明和 Skill 文档
- 增加 MCP 参数透传、配置写入和 rollback query 调用测试
2026-06-05 10:33:53 +08:00
dark
4250a7b221 LLM action 结果分析不再传 state_summary
调整了 agent.py 和 LLM client 协议/实现。
现在只传当前 action 的结构化结果和必要诊断日志,避免历史运行态影响判断。
提示词和文档也已同步说明。

verify-ip 增加健康检查重试
默认 VERIFY_INTERVAL_SEC=10、VERIFY_MAX_ATTEMPTS=12,约 2 分钟。
verify-ip 未通过但未达到最大次数时,会播报进度、保存 checkpoint,并继续从当前 verify-ip 重试,不会进入 download-log。
参数已加入 config.txt.example、脚本配置读取、README、打包 README、Skill 文档和流程图。
2026-06-04 16:57:16 +08:00
dark
e572a26e6f pam_deploy_graph/agent.py:progress action 未完成不标记 completed,超时暂停在当前 action,支持断点继续。
llm 提示词和规则:新增 progress_complete 判断字段。
deploy.sh / deploy.ps1:poll-* action 入口改为单次查询。
interactive.py:chat 会播报进度更新。
config.txt.example / README / packaging 文档 / Skill 文档:同步进度查询参数和新 workflow 语义。
测试补充了进度重复查询、超时暂停、chat 进度播报。
2026-06-04 16:28:18 +08:00
dark
1cb1b42395 重试bug修复 2026-06-04 14:49:15 +08:00
dark
30c6532f23 处理prompt_toolkit 2026-06-04 13:59:55 +08:00
dark
badcce5d2d verify-ip 等逐 IP action 失败后不再进入自动回滚确认,改为保存 failed_stage 并暂停。
用户修复外部问题后输入 resume,会从失败 action 重新执行,而不是结束整个流程。
回滚从 workflow 中拆出,新增显式命令:
chat:rollback [IP]
CLI:rollback --checkpoint ... [--ip ...] [--stop-first|--no-stop-first]
旧 confirm approve/reject 只保留为旧 checkpoint 兼容入口,新流程不再推荐使用。
LangGraph workflow 已移除回滚确认 interrupt 节点,失败暂停和续跑走业务 checkpoint。
README、打包 README、run.sh --help、流程图、todo、提示词基线和测试都已同步。
2026-06-04 13:46:19 +08:00
dark
9e10bf11cf 优化llm分析逻辑 2026-06-04 11:55:38 +08:00
dark
d3f5c82d98 feat: 补充 Agent 运行日志并增加 LLM 测试命令
- 新增统一日志工具,支持日志文件路径和级别配置
- 记录 CLI/chat、Agent、LLM、action、MCP、LangGraph、checkpoint 等关键流程
- 对日志中的 token、secret、api_key、Authorization 等敏感信息做脱敏
- chat 新增 llm test 命令,用于验证当前 LLM client 是否正常加载
- 同步 README、打包文档和 run.sh 帮助说明
- 补充日志脱敏和 llm test 相关测试
2026-06-04 10:51:59 +08:00
45 changed files with 4847 additions and 1205 deletions

View File

@ -23,13 +23,12 @@ pam_deploy_graph/
fake_runner.py # 测试用 runner不访问真实环境
output_parser.py # 解析 key=value、MCP JSON、待确认回滚标记
skill_policy.py # 从 PAM_AUTO_DEPLY_SKILL.md 加载 Skill 策略
tool_catalog.py # 统一 action tool schema、tool 摘要和计划动作归一化
config_writer.py # 生成脚本 action 所需 config 文件
checkpoint_store.py # 业务 checkpoint JSON 读写
params_loader.py # 读取 JSON 或 config.txt 风格参数文件
llm/ # LLM structured output 接口、真实 HTTP client、提示词、规则 fallback 和 guardrails
graph.py # LangGraph StateGraph 集成入口
langgraph_runtime.py # chat 人工确认点的 LangGraph interrupt 运行器
langgraph_runtime.py # action 级 LangGraph 运行器
mcp_client.py # MCP stdio/HTTP/SSE client、鉴权 token 和配置读取
interactive.py # 常驻式 CLI 对话框,会话命令、确认和续跑
cli.py # CLI 入口
@ -65,37 +64,36 @@ packaging/
- 实现 `config.txt.example` 风格和 JSON 风格参数读取。
- 实现 fake 全局流程和完整部署流程,便于不触碰真实环境地验证 Agent 路由。
- 实现逐 IP 处理骨架:升级、轮询、启动、校验、日志下载。
- 实现单 IP 失败后的待回滚确认状态,不自动执行回滚
- 实现人工确认入口:`confirm --decision approve|reject` 只处理待确认回滚
- 实现 action 失败或审核阻断后暂停并保留当前 action修复后 `resume` 会从当前 action 重试
- 回滚已从主 workflow 中拆出,改为 chat/CLI 的显式 `rollback` 命令;旧 `confirm` 入口仅作为兼容保留
- 实现 checkpoint 自动保存和 `resume` 续跑:全局步骤、成功 IP、单 IP 已完成 action 会跳过。
- 实现 LLM structured output 骨架:意图识别、参数抽取、部署计划生成。
- 增加 LLM 双模式决策:`fixed_runtime``agentic_skill`,并把模式决策结构化输出纳入 analyze/chat 主链路。
- 实现 OpenAI-compatible 真实 LLM client支持 `base_url` / `model` 配置,`api_key` 可为空。
- 固化真实 LLM 提示词:意图识别、参数抽取、部署计划生成均要求 JSON structured output。
- 增加规则 fallback `RuleBasedLlmClient`,用于本地开发和测试。
- 增加 LLM 输出 guardrails禁止计划中出现可执行脚本命令和非法 action。
- 引入 `langgraph` 依赖CLI/chat 执行流程统一通过 action 级 LangGraph runtime 调度。
- chat/CLI 人工确认点已接入 LangGraph interrupt/checkpointer运行到待回滚确认时暂停`approve/reject` 通过 `Command(resume=...)` 恢复
- CLI/chat 执行流程统一通过 action 级 LangGraph runtime 调度;失败暂停状态写入业务 checkpoint`resume` 会重新进入图并从断点继续
- 引入 MCP client adapter可包装 SDK session、普通 callable、stdio server、HTTP/SSE server并提供 JSON client 配置读取。
- CLI/chat 支持 `--mcp-config` 直接加载 MCP server URL、鉴权和可选 tool 覆盖配置。
- 本地已安装 `langgraph``mcp`,并完成 LangGraph fake 全局流程 smoke。
- CLI `analyze` 输出已做敏感字段脱敏。
- 增加 `chat` 常驻式 CLI 对话框,支持自然语言分析、参数设置、执行确认、回滚确认、状态查看、事件查看、checkpoint 选择和续跑。
- chat 在开发环境可选启用 `rich` / `prompt_toolkit`PyInstaller 打包环境默认使用普通文本输入,避免交互兼容问题
- 增加 `chat` 常驻式 CLI 对话框,支持自然语言分析、参数设置、执行确认、显式回滚、状态查看、事件查看、checkpoint 选择和续跑。
- chat 在开发环境和默认发布包中都会优先启用 `rich` / `prompt_toolkit`;如果增强输入初始化失败,会自动降级到普通 `input()`
- chat 执行前会归一化参数并展示实际写入脚本配置的值;`script_only` / `hybrid_node_mcp` 会提前检查 `ZIP_FILE_PATH` 是否存在。
- chat 执行中会播报每个 action 的开始、完成或失败action 执行失败会停在当前 checkpoint不再误报 LangGraph 不可用。
- `SkillPolicy` 不再只是加载元数据;已接入 LLM prompt、planned actions 归一化和 runtime 动作裁剪。
- 新增统一 action tool schema脚本 action 与 MCP action 通过同一套受控 tool 描述暴露给 LLM。
- `AgentState` 已引入 `execution_mode``planned_actions``mode_reason` 等字段,支持“按计划动作子集执行”。
- 增加 action 后 LLM/规则诊断,可通过 `--analyze-actions``llm action-analysis on` 显式开启。
- 添加基础测试,当前本地结果为 `54 passed, 2 skipped`
- 每个 action 完成后都会进入一次 LLM/规则审核;如果审核建议停止,流程会暂停并给出建议,等待用户 `resume`
- 每个 action 完成后都会进入一次 LLM/规则审核;只有审核通过才会把 action 记为 completed如果审核建议停止流程会暂停并等待用户 `resume` 重试当前 action。
- `poll-download-progress``poll-upgrade-progress` 已改为单次进度查询workflow 负责按配置重复调用,每次查询结果都会交给 LLM/规则审核判断是否完成,并通过 chat 播报进度。
- `--analyze-actions``llm action-analysis on` 改为只控制是否把详细审核结果写入 `events`,不再控制审核是否执行。
- chat 会播报 action 审核开始、审核完成和审核失败,避免黑盒执行。
- chat 支持执行中按 `Ctrl+C` 中断,保存 checkpoint 后再 `resume`
- chat 支持普通 LLM 对话、日志尾部分析和单 action 执行:`ask <问题>``log analyze <路径>``action propose <需求>``action run ...`
- chat 普通对话会优先使用 OpenAI-compatible streaming 输出;如果服务端不支持流式,会自动退回普通请求。`<think>...</think>` 思考内容会被过滤,不展示也不写入运行日志。
- chat 支持 `set KEY=VALUE``load params <路径>` 热更新当前运行参数,并同步回写运行中的 `config.txt` 与 checkpoint。
- 支持通过 `--llm-action-analysis-prompt-file``PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。
- 添加基础测试,当前本地结果为 `57 passed, 2 skipped`
- 增加统一运行日志,默认写入 `logs/pam_deploy_agent.log`,覆盖 CLI/chat、LLM 调用、action 路由、脚本/MCP 调用、LangGraph、checkpoint 等关键流程,并按天切分、默认保留 14 个历史日切文件。
- chat 支持 `llm test [文本]`,可用当前 LLM client 做一次轻量调用,确认真实 LLM 或规则 fallback 是否正常加载。
- 添加基础测试,当前本地结果为 `83 passed, 3 skipped`
未完成:
@ -138,7 +136,13 @@ python -m pam_deploy_graph.cli analyze \
仓库内已提供 [prompts/action_review.txt](/e:/AIcoding/agent_deply/prompts/action_review.txt) 作为“当前默认 action 审核提示词”的落地副本,后续自定义时可以先复制它再改,便于和内置默认行为对照。
真实 LLM 调用位置在 `pam_deploy_graph/llm/openai_compatible.py`,提示词在 `pam_deploy_graph/llm/prompts.py`。发送给 LLM 的 `base_params` 会脱敏,`CLIENT_SECRET` 不会进入 prompt本地生成计划后仍会执行 guardrails 校验。
真实 LLM 调用位置在 `pam_deploy_graph/llm/openai_compatible.py`,提示词在 `pam_deploy_graph/llm/prompts.py`。发送给 LLM 的 `base_params` 会脱敏,`CLIENT_SECRET` 不会进入 prompt本地生成计划后仍会执行 guardrails 校验。chat 普通对话优先使用 `/chat/completions` streaming服务端不支持时会自动退回非流式请求。普通对话和日志分析会过滤 `<think>...</think>`、未闭合 `<think>` 及内部思考内容。
chat 内可以用当前 client 做一次轻量测试,确认真实 LLM 或规则 fallback 是否正常加载:
```text
PAM> llm test 请返回一次连通性测试结果
```
如果服务需要鉴权,再补充:
@ -276,9 +280,6 @@ python -m pam_deploy_graph.cli chat --config doc_scripts/config.txt.example --st
```text
PAM> 请用 MCP 预演部署 HET PAM Node 版本 2.0.5,不要动环境
PAM> analyze 请按 skill 自主编排并自动选择工具,帮我排查 HET PAM Node 部署异常
- mode: agentic_skill
- mode_reason: 用户明确要求按 skill 自主编排,或任务更偏探索/诊断。
PAM> preview
PAM> set VERSION_NUMBER=2.0.6
PAM> load params runtime/override.txt
@ -290,18 +291,49 @@ PAM> run
PAM> status
PAM> params
PAM> events 5
PAM> ask 这个 agent 能做什么
PAM> log analyze logs/pam_deploy_agent.log 请帮我看最近异常 --tail 400
PAM> action propose 请单独执行 verify-ip 192.168.1.10
PAM> action run verify-ip ip=192.168.1.10
PAM> action run llm 请单独执行 get-online-ips
PAM> llm test
PAM> llm action-analysis on
PAM> llm config action_analysis_prompt_file=prompts/action_review.txt
PAM> mcp config mcp_client.example.json
PAM> list checkpoints
PAM> load checkpoint runtime/checkpoints/chat-demo.json
PAM> approve
PAM> rollback
PAM> resume
PAM> exit
```
`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好``hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。分析阶段现在会额外输出 `mode``mode_reason``planned_actions`,用于区分 `fixed_runtime``agentic_skill`。如果某个 IP 失败,会通过 LangGraph interrupt 暂停并提示输入 `approve``reject [原因]`,确认后恢复同一个图线程继续执行。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model``--mcp-config``--analyze-actions`
`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好``hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。每个 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;如果审核建议停止或审核本身失败,流程会暂停并输出建议,等待用户决定是否 `resume``--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted``set KEY=VALUE``load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file``--mcp-config``--analyze-actions`
`chat` 默认把非内置命令交给当前 LLM 做普通对话,不会自动触发部署 workflow普通对话优先流式展示`<think>...</think>` 思考内容会被过滤。需要结构化分析部署需求时请显式使用 `analyze <需求>`,完整部署仍要求输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM`action propose <需求>` 只让 LLM 解析单 action 计划,不执行;`action run <action> [ip=...] [KEY=VALUE...]``action run llm <需求>` 会展示 action、backend、ip、风险和参数用户输入 `yes` 后才会复用现有 ActionRouter 执行单 action。每个 workflow action 和单 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;审核输入只包含当前 action 的结构化结果和必要诊断日志,不会把完整运行态 `state_summary` 交给大模型,避免跨步骤状态干扰判断;只有 workflow 审核通过才会把 action 记为 completed如果审核建议停止或审核本身失败流程会暂停并输出建议等待用户决定是否 `resume` 重试当前 action。`create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会在云下载接口中传入 `parentVersionNumber`,用于指定继承哪个版本的规则;默认空值不传,沿用正在使用的版本规则。`poll-download-progress``poll-upgrade-progress` 每次只查询一次进度workflow 会按 `POLL_INTERVAL_SEC``DOWNLOAD_POLL_MAX_ATTEMPTS``UPGRADE_POLL_MAX_ATTEMPTS` 重复调用,并在每次返回后让 LLM/规则判断是否完成、播报进度;未完成时不会跳到下一个 action。`verify-ip` 用于应用启动后的健康检查,失败时 workflow 会按 `VERIFY_INTERVAL_SEC` 重试,最多 `VERIFY_MAX_ATTEMPTS` 次;默认约每 10 秒一次、最多 12 次,仍未通过才暂停。逐 IP action 失败时也会暂停,修复外部环境后输入 `resume` 会从当前 action 重试;如果确实需要回滚,使用 `rollback [IP]` 显式执行。`llm test [文本]` 可测试当前 LLM client 是否可用。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted``set KEY=VALUE``load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file``--mcp-config``--analyze-actions`
云下载相关参数:
- `PARENT_VERSION_NUMBER`:可选,创建云下载任务时映射为接口参数 `parentVersionNumber`;默认空值不发送,表示继承正在使用的版本规则。
重试和进度查询相关参数:
- `POLL_INTERVAL_SEC`:两次进度查询之间的等待秒数,默认 `2`
- `DOWNLOAD_POLL_MAX_ATTEMPTS`:云下载进度最大查询次数,默认 `60`
- `UPGRADE_POLL_MAX_ATTEMPTS`:单 IP 推送进度最大查询次数,默认 `600`
- `VERIFY_INTERVAL_SEC``verify-ip` 健康检查失败后的重试间隔秒数,默认 `10`
- `VERIFY_MAX_ATTEMPTS``verify-ip` 健康检查最大尝试次数,默认 `12`
## 日志
Agent 默认写入运行日志到 `logs/pam_deploy_agent.log`。日志覆盖 CLI/chat 输入、LLM 请求和响应摘要、action 路由、脚本/MCP 调用、LangGraph 节点、checkpoint 保存、暂停/续跑等关键流程。日志会在本地时间每日 0 点后首次写入时自动切分,历史文件形如 `pam_deploy_agent.log.YYYY-MM-DD`,默认保留 14 个历史日切文件。日志会递归脱敏 `CLIENT_SECRET``MCP_CLIENT_SECRET`、token、Authorization、api_key、password 等字段并截断长文本chat 普通对话和日志分析的 `<think>` 内容会先过滤,不记录原始思考过程。
可通过环境变量调整日志位置、级别和保留策略:
```bash
export PAM_AGENT_LOG_FILE=logs/pam_deploy_agent.log
export PAM_AGENT_LOG_LEVEL=INFO
export PAM_AGENT_LOG_RETENTION_DAYS=14
```
调试 LLM 或 MCP 调用时可临时把 `PAM_AGENT_LOG_LEVEL` 设为 `DEBUG``PAM_AGENT_LOG_RETENTION_DAYS` 表示保留的历史日切文件数量,设为 `0` 时不自动清理历史切分文件;仍建议把日志目录放在受控位置。
预演:
@ -321,18 +353,23 @@ fake 完整部署流程验证:
python -m pam_deploy_graph.cli run-deploy --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json --confirm
```
如果某个 IP 失败并进入待回滚确认,先查看输出中的 `confirmation`,再人工决定
如果 action 失败或审核阻断,流程会保存 checkpoint 并暂停修复外部环境后可直接续跑Agent 会从当前 action 重试
```bash
python -m pam_deploy_graph.cli confirm --checkpoint runtime/checkpoints/demo.json --decision approve --confirm
python -m pam_deploy_graph.cli resume --checkpoint runtime/checkpoints/demo.json --confirm
```
`confirm` 会通过 LangGraph interrupt resume 处理确认,并在确认后继续执行后续图节点;如果流程此前处于 `paused` 状态,`resume` 会先清理暂停标记,再从 checkpoint 继续执行。
拒绝回滚:
如果需要回滚失败 IP请显式执行 rollback。未传 `--ip` 时会使用当前失败 IP执行完成后再用 `resume` 继续主流程。
```bash
python -m pam_deploy_graph.cli confirm --checkpoint runtime/checkpoints/demo.json --decision reject --note "人工决定暂不回滚" --confirm
python -m pam_deploy_graph.cli rollback --checkpoint runtime/checkpoints/demo.json --confirm
python -m pam_deploy_graph.cli resume --checkpoint runtime/checkpoints/demo.json --confirm
```
也可以指定 IP 和停机策略:
```bash
python -m pam_deploy_graph.cli rollback --checkpoint runtime/checkpoints/demo.json --ip 192.168.1.10 --stop-first --note "人工决定回滚该 IP" --confirm
```
checkpoint 用于断点续跑会保存完整运行状态和参数。为了支持真实续跑Agent 写入 checkpoint 时不会脱敏参数;请把 checkpoint 放在受控目录中。如果不传 `--checkpoint`,流程仍可运行,但不能跨进程 `resume`
@ -353,5 +390,5 @@ pytest -q
1. 接入真实 PAM_NODE MCP session并用 `SessionMcpToolClient` 包装。
2. 在测试环境中做 smokeHOME 脚本 `get-token/get-node-url` + NODE MCP `get-online-ips`
3. 在测试环境验证真实脚本 action 的失败、回滚确认和续跑链路。
3. 在测试环境验证真实脚本 action 的失败重试显式回滚和续跑链路。
4. 继续细化参数确认、IP 范围确认的交互式 UI 或上层编排。

View File

@ -1,6 +1,6 @@
---
name: pam-auto-deply
description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解用户需求、收集并确认参数、选择执行模式、编排主流程、控制回滚确认与最终汇总;由现有 deploy.sh / deploy.ps1 提供 action 能力执行建版、上传、发布、节点发现、云下载、升级、启停、校验、日志下载和手动回滚。禁止自动生成或修改脚本,禁止使用脚本主流程做部署。
description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解用户需求、收集并确认参数、选择执行模式、编排主流程、控制进度查询与最终汇总;由现有 deploy.sh / deploy.ps1 提供 action 能力执行建版、上传、发布、节点发现、云下载、升级、启停、校验、日志下载和手动回滚。禁止自动生成或修改脚本,禁止使用脚本主流程做部署。
---
# PAM_AUTO_DEPLY Skill
@ -22,7 +22,7 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
- 禁止自动生成、重建、覆盖或修改 `deploy.sh``deploy.ps1``deploy.bat``test_deploy.sh``test_deploy.ps1``test_deploy.bat`
- 在任何真实调用前,必须先向用户展示归一化后的参数并得到确认。
- 在真实部署执行过程中,必须持续向用户展示当前阶段、下一步动作和阶段结果,禁止长时间静默执行。
- 回滚不得自动执行。脚本只能输出 `PENDING_AGENT_CONFIRMATION(...)`,必须由 Agent 先向用户确认
- 回滚不得自动执行;主 workflow 失败后只暂停在当前 action。需要回滚时必须由用户显式输入 `rollback [IP]` 或直接调用 `rollback-ip` action
## 2. 执行模式选择
@ -68,6 +68,12 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
| `actionType` | `ACTION_TYPE` | 否 | 升级类型,默认 `FULL` |
| `timeOut` | `TIMEOUT` | 否 | 接口级超时参数,默认 `120` |
| `logName` | `LOG_NAME` | 否 | 日志文件名,默认 `app.log` |
| `parentVersionNumber` | `PARENT_VERSION_NUMBER` | 否 | 云下载时指定继承哪个版本的规则;默认空值不传,继承正在使用的版本规则 |
| `pollIntervalSec` | `POLL_INTERVAL_SEC` | 否 | 两次进度查询间隔,默认 `2` 秒 |
| `downloadPollMaxAttempts` | `DOWNLOAD_POLL_MAX_ATTEMPTS` | 否 | 云下载进度最大查询次数,默认 `60` |
| `upgradePollMaxAttempts` | `UPGRADE_POLL_MAX_ATTEMPTS` | 否 | 单 IP 推送进度最大查询次数,默认 `600` |
| `verifyIntervalSec` | `VERIFY_INTERVAL_SEC` | 否 | `verify-ip` 健康检查失败后的重试间隔,默认 `10` 秒 |
| `verifyMaxAttempts` | `VERIFY_MAX_ATTEMPTS` | 否 | `verify-ip` 健康检查最大尝试次数,默认 `12` |
### 3.2 运行控制参数
@ -77,13 +83,12 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
- `showUsageOnly`: 是否只说明现有脚本用法而不执行
- `userSpecifiedIps`: 用户指定的目标 IP 子集
- `allOrNothing`: 是否要求全有或全无
- `rollbackApproved`: 用户是否已确认回滚
- `rollbackApproved`: 用户是否已明确要求执行回滚
- `osTarget`: 目标脚本入口环境
- `checkpointPath`: 检查点文件路径
- `resumeFromCheckpoint`: 是否按已有检查点断点续试
- `traceFilePath`: 当前部署统一复用的接口跟踪日志文件路径
- `stepIntervalSec`: 全局 action 与 action 之间的执行间隔
- `firstPollDelaySec`: 创建下载任务后,到首次轮询下载进度前的等待间隔
- `perIpStepIntervalSec`: 同一台 IP 内部步骤之间的执行间隔
- `perIpIntervalSec`: 一台 IP 完成后到下一台 IP 开始前的间隔
- `failurePauseSec`: 某步骤失败后进入下一分支前的等待间隔
@ -91,7 +96,6 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
推荐默认值:
- `stepIntervalSec = 2`
- `firstPollDelaySec = 2`
- `perIpStepIntervalSec = 1`
- `perIpIntervalSec = 3`
- `failurePauseSec = 0`
@ -111,6 +115,7 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
- `actionType`
- `timeOut`
- `logName`
- `parentVersionNumber`(可选;空值表示不传)
- 用户指定 IP 子集(如有)
确认规则:
@ -135,6 +140,7 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
- ACTION_TYPE: FULL
- TIMEOUT: 120
- LOG_NAME: app.log
- PARENT_VERSION_NUMBER: -
- 指定IP: 192.168.1.10, 192.168.1.11
- CLIENT_ID: 已提供
- CLIENT_SECRET: 已提供
@ -160,6 +166,12 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
- `ACTION_TYPE`
- `TIMEOUT`
- `LOG_NAME`
- `PARENT_VERSION_NUMBER`
- `POLL_INTERVAL_SEC`
- `DOWNLOAD_POLL_MAX_ATTEMPTS`
- `UPGRADE_POLL_MAX_ATTEMPTS`
- `VERIFY_INTERVAL_SEC`
- `VERIFY_MAX_ATTEMPTS`
- 命令行只传 action 级控制参数:
- `--action` / `-Action`
- `--ip` / `-Ip`
@ -168,7 +180,9 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
- 不要把整套业务参数直接拼接到命令行。
- `client_secret` 等敏感字段不得通过命令行透传。
- 如果用户明确要求“不落地配置文件”,则本 Skill 不执行真实部署,只说明限制和原因。
- `traceFilePath` 与间隔控制参数不写入 `config.txt`,由 Agent 在运行时持有并应用。
- `traceFilePath` 不写入 `config.txt`,由 Agent 在运行时持有并应用。
- `PARENT_VERSION_NUMBER` 写入 `config.txt` 但默认可为空;只有非空时,`create-download-task` 才把它作为云下载接口参数 `parentVersionNumber` 发送。
- 进度查询和健康检查重试参数写入 `config.txt`,由 Agent workflow 和脚本调试流程共同读取。
## 4. 主流程(硬约束)
@ -193,39 +207,42 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
11. 调用 `get-node-url`
12. 调用 `get-online-ips`
13. 若用户指定了目标 IP则基于在线 IP 列表做过滤。
14. 调用 `create-download-task`
15. 调用 `poll-download-progress`,直到下载完成、失败或超时
14. 调用 `create-download-task`;如 `PARENT_VERSION_NUMBER` 非空,则云下载接口携带 `parentVersionNumber`,否则不传该参数
15. 重复调用 `poll-download-progress` 单次查询进度;每次返回后交给 LLM/规则判断,直到下载完成、失败或达到最大查询次数
16. 按在线 IP 或过滤后的目标 IP 列表逐台执行:
- `upgrade-ip`
- `poll-upgrade-progress`
- 重复调用 `poll-upgrade-progress` 单次查询进度;每次返回后交给 LLM/规则判断,直到推送完成、失败或达到最大查询次数
- `start-ip`
- `verify-ip`
- 重复调用 `verify-ip` 健康检查;`SUCCESS=false` 时按 `VERIFY_INTERVAL_SEC` 等待后重试,直到成功或达到 `VERIFY_MAX_ATTEMPTS`
- `download-log`
17. 汇总每台 IP 的结果。
18. 若出现 `PENDING_AGENT_CONFIRMATION(...)`,立即中止自动后续动作,转入回滚确认分支
19. 输出最终报告。
18. 若 action 失败、LLM/规则审核要求停止,或出现 legacy `PENDING_AGENT_CONFIRMATION(...)`,暂停在当前 action 并输出建议
19. 输出最终报告;需要回滚时,等待用户显式执行 `rollback [IP]`
主流程补充规则:
1. 一次完整部署中的所有 action 调用,应复用同一个 `traceFilePath`,禁止每个 action 各自新建独立 trace 文件。
2. 全局 action 与下一 action 之间,按 `stepIntervalSec` 等待。
3. `create-download-task` 成功后,到首次 `poll-download-progress` 前,按 `firstPollDelaySec` 等待。
4. 同一台 IP 内部:
3. `create-download-task` 成功后,直接进入 `poll-download-progress`;未完成时按 `POLL_INTERVAL_SEC` 等待后再次查询当前 action。
4. `PARENT_VERSION_NUMBER` 只影响 `create-download-task` / `download-cloud`,不得透传到推送、启动、校验或日志下载 action。
5. 同一台 IP 内部:
- `upgrade-ip -> poll-upgrade-progress`
- `poll-upgrade-progress -> start-ip`
- `start-ip -> verify-ip`
- `verify-ip -> download-log`
之间按 `perIpStepIntervalSec` 等待。
5. 当前一台 IP 处理完成后,到下一台 IP 开始前,按 `perIpIntervalSec` 等待。
6. 若某步骤失败后需要进入提示、确认或分支流程,可按 `failurePauseSec` 等待。
7. 若某个间隔值为 `0`,表示该层级不等待,直接进入下一动作。
6. 当前一台 IP 处理完成后,到下一台 IP 开始前,按 `perIpIntervalSec` 等待。
7. 若某步骤失败后需要进入提示、确认或分支流程,可按 `failurePauseSec` 等待。
8. 若某个间隔值为 `0`,表示该层级不等待,直接进入下一动作。
9. `poll-download-progress``poll-upgrade-progress` 的脚本 action 只执行一次进度查询;正式 workflow 的循环、checkpoint、LLM 判断和进度播报由 Agent Runtime 负责。
10. `verify-ip` 失败但未达到 `VERIFY_MAX_ATTEMPTS` 时,不进入 `download-log`,也不把当前 action 记为 completed正式 workflow 会播报健康检查进度、保存 checkpoint并按 `VERIFY_INTERVAL_SEC` 重试当前 action。
### 4.2 主流程中的强制确认点
以下节点必须等待用户确认,不能自动越过:
1. 参数确认单确认前。
2. 出现回滚条件时
2. 执行 `rollback [IP]``rollback-ip`
3. 用户指定 IP 与在线 IP 过滤结果不一致,且会影响部署范围时。
4. 用户显式要求修改默认间隔策略时。
@ -238,14 +255,15 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
3. 在每个全局步骤成功后,告知用户该步骤已完成,并说明关键结果。
4. 在每个全局步骤失败后,立即告知用户失败阶段、失败原因和后续处理。
5. 在逐台 IP 处理时,必须告知当前正在处理哪一台 IP。
6. 在云下载进度轮询阶段,必须持续汇报当前进度,不能静默等待完成。
7. 若执行耗时较长,必须按阶段持续播报,不能等全部结束后一次性汇总。
8. 若进入回滚确认状态,必须明确告诉用户:
6. 在云下载和单 IP 推送进度查询阶段,每次 `poll-*` 返回后都必须汇报当前进度,不能静默等待完成。
7. 在 `verify-ip` 健康检查阶段,每次未通过都必须播报当前检查次数、最大次数和返回信息,不能静默等待应用启动。
8. 若执行耗时较长,必须按阶段持续播报,不能等全部结束后一次性汇总。
9. 若失败后建议回滚,必须明确告诉用户:
- 哪一台 IP 失败
- 失败阶段
- 建议是否回滚
- 是否需要 `stopFirst`
9. 若当前处于 action 间隔等待中,也必须告诉用户等待时长和下一步动作。
10. 若当前处于 action 间隔等待中,也必须告诉用户等待时长和下一步动作。
建议的阶段播报格式:
@ -349,9 +367,10 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
4. 若部分 IP 已成功完成:
- 默认跳过成功 IP
- 只继续未完成或失败的 IP
5. 若存在 `PENDING_AGENT_CONFIRMATION(...)`
- 检查点中必须保留该状态
- 未得到用户确认前,不得自动继续后续动作
5. 若存在失败暂停或 legacy `PENDING_AGENT_CONFIRMATION(...)`
- 检查点中必须保留失败阶段、失败原因和审核建议
- 修复后 `resume` 默认从当前失败 action 重试
- 需要回滚时必须由用户显式执行 `rollback [IP]`
6. 若用户要求“从头重新开始”:
- 先明确说明将忽略现有检查点
- 再从第 1 步重新执行
@ -430,14 +449,14 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
| 12 | 获取在线 IP | `get-online-ips` | 返回 `COUNT>0` 且有 `IP=...` 行 | 停止并报告 `GET_ONLINE_IPS` 失败 |
| 13 | 过滤目标 IP | 按用户指定 IP 与在线 IP 交集过滤 | 过滤结果明确 | 过滤后为空时停止;范围变化需确认 |
| 14 | 创建云下载任务 | `create-download-task` | 返回 `RESULT=TASK_CREATED` | 停止并报告 `CREATE_DOWNLOAD_TASK` 失败 |
| 15 | 轮询下载进度 | `poll-download-progress` | `STEP=DONE` `MSG=success``RATE_OF_PROGRESS=100` | 停止并报告 `POLL_DOWNLOAD_PROGRESS` 失败或超时 |
| 16.1 | 创建单 IP 推送任务 | `upgrade-ip --ip ...` | 返回 `RESULT=TASK_CREATED` | 记录失败,标记 `PENDING_AGENT_CONFIRMATION(stopFirst=false)` |
| 16.2 | 轮询单 IP 推送进度 | `poll-upgrade-progress --ip ...` | `STEP=DONE``FINISH=true``MSG=success``RATE_OF_PROGRESS=100` | 记录失败,标记 `PENDING_AGENT_CONFIRMATION(stopFirst=false)` |
| 16.3 | 启动单 IP | `start-ip --ip ...` | action 成功返回 | 记录失败,标记 `PENDING_AGENT_CONFIRMATION(stopFirst=true)` |
| 16.4 | 校验单 IP | `verify-ip --ip ...` | 返回 `SUCCESS=true` | 记录失败,标记 `PENDING_AGENT_CONFIRMATION(stopFirst=true)` |
| 15 | 查询下载进度 | 重复调用单次 `poll-download-progress` | LLM/规则判断 `progress_complete=true`;或 `STEP=DONE` / `MSG=success``RATE_OF_PROGRESS=100` | 停止并报告 `POLL_DOWNLOAD_PROGRESS` 失败或超时 |
| 16.1 | 创建单 IP 推送任务 | `upgrade-ip --ip ...` | 返回 `RESULT=TASK_CREATED` | 暂停在当前 action修复后 `resume` 重试;需要回滚时显式执行 rollback |
| 16.2 | 查询单 IP 推送进度 | 重复调用单次 `poll-upgrade-progress --ip ...` | LLM/规则判断 `progress_complete=true`;或 `STEP=DONE` / `FINISH=true` / `MSG=success``RATE_OF_PROGRESS=100` | 暂停在当前 action修复后 `resume` 重试;需要回滚时显式执行 rollback |
| 16.3 | 启动单 IP | `start-ip --ip ...` | action 成功返回 | 暂停在当前 action修复后 `resume` 重试;需要回滚时显式执行 rollback |
| 16.4 | 校验单 IP | 重复调用单次 `verify-ip --ip ...` | 返回 `SUCCESS=true` | `VERIFY_INTERVAL_SEC` 重试,达到 `VERIFY_MAX_ATTEMPTS` 后仍失败才暂停在当前 action需要回滚时显式执行 rollback |
| 16.5 | 下载日志 | `download-log --ip ...` | 返回 `LOG_FILE=...` | 记录日志下载失败,但不覆盖原主失败原因 |
| 17 | 汇总结果 | 汇总每台 IP 的阶段、失败原因、回滚状态、日志路径 | 报告内容完整 | 若汇总失败,至少保留原始 action 输出 |
| 18 | 回滚确认分支 | 发现 `PENDING_AGENT_CONFIRMATION(...)` 时进入回滚确认 | 用户明确是否回滚 | 未确认时停止,不自动回滚 |
| 18 | 失败暂停或显式回滚 | 失败后默认停在当前 action用户输入 `rollback [IP]` 后才执行回滚 | 用户明确要求回滚或修复后 `resume` | 未显式要求回滚时不自动回滚 |
| 19 | 最终报告 | 输出最终报告 | 报告包含模式、入口、阶段结果、日志、回滚状态 | 不省略失败细节 |
## 5. 通用执行原则
@ -456,7 +475,7 @@ description: 面向 PAM HOME/NODE 的智能部署 Skill。由 Skill 负责理解
- `[FLOW][FAIL]`
10. 只允许调用脚本 `action` 入口,禁止调用脚本主流程。
11. 脚本 action 输出以 `key=value` 为主Agent 应优先读取这些结果行。
12. 遇到需要回滚的场景,脚本只返回 `PENDING_AGENT_CONFIRMATION(stopFirst=...)`Agent 必须先确认
12. 遇到需要回滚的场景,Agent 只能提示风险和建议;不得自动回滚,必须等待用户显式执行 rollback
## 6. 脚本 action 能力
@ -485,15 +504,15 @@ powershell -File .\deploy.ps1 -ConfigPath .\config.txt -Action <ActionName> [-Ip
| `get-node-url` | 获取目标 Node 地址 | 无 |
| `get-online-ips` | 获取在线工作站 IP 列表 | 无 |
| `create-download-task` | 创建云下载任务 | 无 |
| `poll-download-progress` | 轮询下载进度 | 无 |
| `poll-download-progress` | 单次查询下载进度;是否继续查询由 Agent workflow 和 LLM/规则决定 | 无 |
| `download-cloud-to-node` | 创建下载任务并轮询至完成,仅调试使用,不得进入正式主流程 | 无 |
| `upgrade-ip` | 为指定 IP 创建推送任务,固定使用 `timeOut=0` | `--ip` / `-Ip` |
| `poll-upgrade-progress` | 轮询指定 IP 的推送进度 | `--ip` / `-Ip` |
| `poll-upgrade-progress` | 单次查询指定 IP 的推送进度;是否继续查询由 Agent workflow 和 LLM/规则决定 | `--ip` / `-Ip` |
| `start-ip` | 启动指定 IP 应用 | `--ip` / `-Ip` |
| `stop-ip` | 停止指定 IP 应用 | `--ip` / `-Ip` |
| `verify-ip` | 校验指定 IP | `--ip` / `-Ip` |
| `download-log` | 下载指定 IP 日志压缩包,返回 zip 文件路径 | `--ip` / `-Ip` |
| `rollback-ip` | 执行指定 IP 回滚 | `--ip` / `-Ip`,可带 `--stop-first` / `-RollbackStopFirst` |
| `rollback-ip` | 执行指定 IP 回滚;接口参数使用 URL query不使用表单 body | `--ip` / `-Ip`,可带 `--stop-first` / `-RollbackStopFirst` |
### 6.4 action 输出约定
@ -559,9 +578,9 @@ Agent 读取时:
- `create-download-task`
- `upgrade-ip`
### 7.4 手动回滚分支
### 7.4 显式回滚命令
部署结果出现 `PENDING_AGENT_CONFIRMATION(...)` 且用户明确同意回滚时:
用户明确输入 `rollback [IP]` 或直接要求对指定 IP 回滚时:
1. 再次向用户确认目标 IP 和 `stopFirst` 值。
2. 调用 `rollback-ip` action。
@ -613,19 +632,16 @@ Agent 读取时:
### 8.3 回滚规则
回滚只允许在 Agent 与用户确认后执行。
回滚只允许在用户显式要求后执行。
回滚状态有三类
回滚状态包括
- `ROLLBACK_NOT_RUN`
- `PENDING_AGENT_CONFIRMATION(stopFirst=true|false)`
- 真正执行后的结果:
- `ROLLBACK_SUCCESS`
- `ROLLBACK_FAILED`
- `ROLLBACK_REQUEST_FAILED`
- `ROLLBACK_VERIFY_FAILED`
- `ROLLBACK_DONE`
- `ROLLBACK_FAILED`
- `REJECTED_BY_OPERATOR`
默认确认逻辑
默认建议:
- 升级失败:建议回滚,`stopFirst=false`
- 启动失败:建议回滚,`stopFirst=true`
@ -674,7 +690,9 @@ powershell -File .\deploy.ps1 -ConfigPath .\config.txt -Action rollback-ip -Ip 1
- 失败: 1
- 间隔控制:
- stepIntervalSec: 2
- firstPollDelaySec: 2
- pollIntervalSec: 2
- downloadPollMaxAttempts: 60
- upgradePollMaxAttempts: 600
- perIpStepIntervalSec: 1
- perIpIntervalSec: 3
- failurePauseSec: 0
@ -684,7 +702,7 @@ powershell -File .\deploy.ps1 -ConfigPath .\config.txt -Action rollback-ip -Ip 1
| --- | --- | --- | --- | --- |
| 192.168.1.10 | SUCCESS | - | - | logs/deploy_192.168.1.10.zip |
| 192.168.1.11 | SUCCESS | - | - | logs/deploy_192.168.1.11.zip |
| 192.168.1.12 | FAILED | VERIFY | PENDING_AGENT_CONFIRMATION(stopFirst=true) | logs/deploy_192.168.1.12.zip |
| 192.168.1.12 | FAILED | VERIFY | ROLLBACK_NOT_RUN | logs/deploy_192.168.1.12.zip |
```
更完整的最终报告模板:
@ -709,7 +727,7 @@ powershell -File .\deploy.ps1 -ConfigPath .\config.txt -Action rollback-ip -Ip 1
| IP | 状态 | 失败阶段 | 失败原因 | 回滚状态 | 日志 |
| --- | --- | --- | --- | --- | --- |
| 192.168.1.10 | SUCCESS | - | - | - | logs/deploy_192.168.1.10.log |
| 192.168.1.12 | FAILED | VERIFY | Health check failed | PENDING_AGENT_CONFIRMATION(stopFirst=true) | logs/deploy_192.168.1.12.log |
| 192.168.1.12 | FAILED | VERIFY | Health check failed | ROLLBACK_NOT_RUN | logs/deploy_192.168.1.12.log |
## 检查点摘要
@ -724,9 +742,10 @@ powershell -File .\deploy.ps1 -ConfigPath .\config.txt -Action rollback-ip -Ip 1
- get-online-ips
- create-download-task
## 待确认事项
## 后续建议
- 是否对 192.168.1.12 执行回滚
- 192.168.1.12 停在 verify-ip修复后可 resume 重试当前 action
- 如确认需要回滚,可执行 rollback 192.168.1.12
```
## 10. Agent 执行建议
@ -740,7 +759,7 @@ powershell -File .\deploy.ps1 -ConfigPath .\config.txt -Action rollback-ip -Ip 1
- 回滚需要确认
4. 参数未确认前,不触发任何真实部署 action。
5. 用户只要求“生成脚本不执行”时,由于本 Skill 禁止自动生成或修改脚本,应直接说明限制,而不是自动产出脚本文件。
6. 如果 action 输出中出现 `PENDING_AGENT_CONFIRMATION(...)`,立即中止自动后续动作并请求确认
6. 如果 action 输出中出现 legacy `PENDING_AGENT_CONFIRMATION(...)`,立即暂停当前 workflow输出建议需要回滚时等待用户显式执行 rollback
7. 如果存在检查点,优先评估能否从断点续试,而不是默认从头执行。
8. 任何长耗时阶段都要主动播报进度,尤其是:
- `create-download-task`

View File

@ -9,3 +9,9 @@ ZIP_FILE_PATH=C:\path\to\pam-2.0.5.zip
ACTION_TYPE=FULL
TIMEOUT=120
LOG_NAME=app.log
PARENT_VERSION_NUMBER=
POLL_INTERVAL_SEC=2
DOWNLOAD_POLL_MAX_ATTEMPTS=60
UPGRADE_POLL_MAX_ATTEMPTS=600
VERIFY_INTERVAL_SEC=10
VERIFY_MAX_ATTEMPTS=12

View File

@ -23,6 +23,8 @@ Notes:
- deploy.bat is only a wrapper for this script.
- The wrapper avoids cmd.exe delayed-expansion issues with CLIENT_SECRET values
containing exclamation marks.
- poll-download-progress and poll-upgrade-progress only query progress once.
The Agent workflow repeats them and asks LLM/rules to judge completion.
'@ | Write-Host
}
@ -366,6 +368,12 @@ function Get-PamConfig {
'ACTION_TYPE' { $config[$key] = $value }
'TIMEOUT' { $config[$key] = $value }
'LOG_NAME' { $config[$key] = $value }
'PARENT_VERSION_NUMBER' { $config[$key] = $value }
'POLL_INTERVAL_SEC' { $config[$key] = $value }
'DOWNLOAD_POLL_MAX_ATTEMPTS' { $config[$key] = $value }
'UPGRADE_POLL_MAX_ATTEMPTS' { $config[$key] = $value }
'VERIFY_INTERVAL_SEC' { $config[$key] = $value }
'VERIFY_MAX_ATTEMPTS' { $config[$key] = $value }
}
}
} else {
@ -384,6 +392,12 @@ function Get-PamConfig {
ACTION_TYPE = 'FULL'
TIMEOUT = '120'
LOG_NAME = 'app.log'
PARENT_VERSION_NUMBER = ''
POLL_INTERVAL_SEC = '2'
DOWNLOAD_POLL_MAX_ATTEMPTS = '60'
UPGRADE_POLL_MAX_ATTEMPTS = '600'
VERIFY_INTERVAL_SEC = '10'
VERIFY_MAX_ATTEMPTS = '12'
}
foreach ($name in $defaults.Keys) {
@ -647,8 +661,14 @@ function Wait-DownloadProgress {
RateOfProgress = ''
RawResponse = ''
}
$maxAttempts = 60
[int]::TryParse([string]$Config.DOWNLOAD_POLL_MAX_ATTEMPTS, [ref]$maxAttempts) | Out-Null
if ($maxAttempts -lt 1) { $maxAttempts = 60 }
$pollIntervalSec = 2
[int]::TryParse([string]$Config.POLL_INTERVAL_SEC, [ref]$pollIntervalSec) | Out-Null
if ($pollIntervalSec -lt 0) { $pollIntervalSec = 2 }
for ($attempt = 0; $attempt -lt 60; $attempt++) {
for ($attempt = 0; $attempt -lt $maxAttempts; $attempt++) {
$response = Invoke-PamWebRequest -Method GET -Url $progressUrl -Token $Token -Headers @{
'Target-Node' = $NodeUrl
}
@ -681,7 +701,7 @@ function Wait-DownloadProgress {
if ($progressParts.Count -gt 0) {
Write-Info ("Step 3.3b: async download progress -> {0}" -f ($progressParts -join ', '))
} else {
Write-Info ("Step 3.3b: async download progress polling... ({0}/60)" -f ($attempt + 1))
Write-Info ("Step 3.3b: async download progress polling... ({0}/{1})" -f ($attempt + 1), $maxAttempts)
}
if ($step -eq 'DONE' -or $status -eq 'completed' -or $successFlag -eq 'true' -or (($msg -eq 'success') -and ($progressValue -eq '100'))) {
@ -694,12 +714,64 @@ function Wait-DownloadProgress {
throw "Node download failed: $message"
}
Start-Sleep -Seconds 2
Start-Sleep -Seconds $pollIntervalSec
}
throw 'Node download timed out.'
}
function Read-DownloadProgress {
param($Config, [string]$Token, [string]$NodeUrl)
$query = Join-RequestPairs ([ordered]@{
applicationName = $Config.APP_NAME
moduleName = $Config.MODULE_NAME
airportCode = $Config.AIRPORT_CODE
versionNumber = $Config.VERSION_NUMBER
})
$progressUrl = "$($Config.HOME_BASE_URL)/node-proxy/$($Config.AIRPORT_CODE)/api/mcp/version/upgrade/download-cloud/progress?$query"
$response = Invoke-PamWebRequest -Method GET -Url $progressUrl -Token $Token -Headers @{
'Target-Node' = $NodeUrl
}
$status = Get-ResponseValue -Response $response -Candidates @('status')
$successFlag = Get-ResponseValue -Response $response -Candidates @('success')
$step = Get-ResponseValue -Response $response -Candidates @('step')
$msg = Get-ResponseValue -Response $response -Candidates @('msg')
$progressValue = Get-ResponseValue -Response $response -Candidates @('rateOfProgress', 'progress', 'percent', 'data.rateOfProgress', 'data.progress', 'data.percent')
$message = Get-ResponseValue -Response $response -Candidates @('message')
if (-not $message) { $message = $msg }
$script:DownloadProgressState = [ordered]@{
Status = [string]$status
Success = [string]$successFlag
Step = [string]$step
Msg = [string]$msg
Message = [string]$message
RateOfProgress = [string]$progressValue
RawResponse = [string]$response
}
$progressParts = [System.Collections.Generic.List[string]]::new()
if ($msg) { $progressParts.Add("msg=$msg") }
if ($step) { $progressParts.Add("step=$step") }
if ($progressValue) { $progressParts.Add("rateOfProgress=$progressValue") }
if ($status) { $progressParts.Add("status=$status") }
if ($successFlag) { $progressParts.Add("success=$successFlag") }
if ($message -and $message -ne $msg) { $progressParts.Add("message=$message") }
if ($progressParts.Count -gt 0) {
Write-Info ("Step 3.3b: async download progress single query -> {0}" -f ($progressParts -join ', '))
} else {
Write-Info 'Step 3.3b: async download progress single query returned no explicit progress fields.'
}
if ((@($step, $message, $msg, $status) -join ' ') -match '(?i)fail|error') {
if (-not $message) { $message = $step }
if (-not $message) { $message = $msg }
throw "Node download failed: $message"
}
}
function Create-DownloadTask {
param($Config, [string]$Token, [string]$NodeUrl)
@ -710,6 +782,11 @@ function Create-DownloadTask {
moduleName = $Config.MODULE_NAME
timeOut = '0'
})
if ($Config.PARENT_VERSION_NUMBER) {
$query += '&' + (Join-RequestPairs ([ordered]@{
parentVersionNumber = $Config.PARENT_VERSION_NUMBER
}))
}
[void](Invoke-PamWebRequest -Method GET -Url "$($Config.HOME_BASE_URL)/node-proxy/$($Config.AIRPORT_CODE)/api/mcp/version/upgrade/download-cloud?$query" -Token $Token -Headers @{
'Target-Node' = $NodeUrl
@ -751,8 +828,14 @@ function Wait-UpgradeProgress {
LastModify = ''
RawResponse = ''
}
$maxAttempts = 600
[int]::TryParse([string]$Config.UPGRADE_POLL_MAX_ATTEMPTS, [ref]$maxAttempts) | Out-Null
if ($maxAttempts -lt 1) { $maxAttempts = 600 }
$pollIntervalSec = 2
[int]::TryParse([string]$Config.POLL_INTERVAL_SEC, [ref]$pollIntervalSec) | Out-Null
if ($pollIntervalSec -lt 0) { $pollIntervalSec = 2 }
for ($attempt = 0; $attempt -lt 60; $attempt++) {
for ($attempt = 0; $attempt -lt $maxAttempts; $attempt++) {
$response = Invoke-PamWebRequest -Method GET -Url $progressUrl -Token $Token -Headers @{
'Target-Node' = $NodeUrl
}
@ -797,7 +880,7 @@ function Wait-UpgradeProgress {
if ($progressParts.Count -gt 1) {
Write-Info ("Step 3.4a: async upgrade progress -> {0}" -f ($progressParts -join ', '))
} else {
Write-Info ("Step 3.4a: async upgrade progress polling... ip={0} ({1}/60)" -f $Ip, ($attempt + 1))
Write-Info ("Step 3.4a: async upgrade progress polling... ip={0} ({1}/{2})" -f $Ip, ($attempt + 1), $maxAttempts)
}
if ($step -eq 'DONE' -or $finish -eq 'true' -or $status -eq 'completed' -or $successFlag -eq 'true') {
@ -821,12 +904,88 @@ function Wait-UpgradeProgress {
throw "Node upgrade failed: ip=$Ip, message=$message"
}
Start-Sleep -Seconds 2
Start-Sleep -Seconds $pollIntervalSec
}
throw "Node upgrade timed out: ip=$Ip"
}
function Read-UpgradeProgress {
param(
$Config,
[string]$Token,
[string]$NodeUrl,
[string]$Ip
)
$query = Join-RequestPairs ([ordered]@{
applicationName = $Config.APP_NAME
moduleName = $Config.MODULE_NAME
airportCode = $Config.AIRPORT_CODE
versionNumber = $Config.VERSION_NUMBER
})
$progressUrl = "$($Config.HOME_BASE_URL)/node-proxy/$($Config.AIRPORT_CODE)/api/mcp/version/upgrade/progress?$query"
$response = Invoke-PamWebRequest -Method GET -Url $progressUrl -Token $Token -Headers @{
'Target-Node' = $NodeUrl
}
$progressResponse = Get-ScopedResponseObject -Response $response -ScopeKey $Ip
$status = Get-ResponseValue -Response $progressResponse -Candidates @('status')
$successFlag = Get-ResponseValue -Response $progressResponse -Candidates @('success')
$step = Get-ResponseValue -Response $progressResponse -Candidates @('step')
$msg = Get-ResponseValue -Response $progressResponse -Candidates @('msg')
$progressValue = Get-ResponseValue -Response $progressResponse -Candidates @('rateOfProgress', 'progress', 'percent', 'data.rateOfProgress', 'data.progress', 'data.percent')
$message = Get-ResponseValue -Response $progressResponse -Candidates @('message')
$code = Get-ResponseValue -Response $progressResponse -Candidates @('code')
$finish = Get-ResponseValue -Response $progressResponse -Candidates @('finish')
$lastModify = Get-ResponseValue -Response $progressResponse -Candidates @('lastModify')
if (-not $message) { $message = $msg }
$script:UpgradeProgressState = [ordered]@{
Status = [string]$status
Success = [string]$successFlag
Step = [string]$step
Msg = [string]$msg
Message = [string]$message
RateOfProgress = [string]$progressValue
Code = [string]$code
Finish = [string]$finish
LastModify = [string]$lastModify
RawResponse = [string]$response
}
$progressParts = [System.Collections.Generic.List[string]]::new()
$progressParts.Add("ip=$Ip")
if ($msg) { $progressParts.Add("msg=$msg") }
if ($step) { $progressParts.Add("step=$step") }
if ($progressValue) { $progressParts.Add("rateOfProgress=$progressValue") }
if ($code) { $progressParts.Add("code=$code") }
if ($finish) { $progressParts.Add("finish=$finish") }
if ($status) { $progressParts.Add("status=$status") }
if ($successFlag) { $progressParts.Add("success=$successFlag") }
if ($lastModify) { $progressParts.Add("lastModify=$lastModify") }
if ($message -and $message -ne $msg) { $progressParts.Add("message=$message") }
if ($progressParts.Count -gt 1) {
Write-Info ("Step 3.4a: async upgrade progress single query -> {0}" -f ($progressParts -join ', '))
} else {
Write-Info ("Step 3.4a: async upgrade progress single query returned no explicit progress fields: ip={0}" -f $Ip)
}
if ($code -and $code -ne '0') {
if (-not $message) { $message = $msg }
if (-not $message) { $message = $step }
if (-not $message) { $message = "code=$code" }
throw "Node upgrade failed: ip=$Ip, message=$message"
}
if ((@($step, $message, $msg, $status) -join ' ') -match '(?i)fail|error') {
if (-not $message) { $message = $step }
if (-not $message) { $message = $msg }
throw "Node upgrade failed: ip=$Ip, message=$message"
}
}
function Invoke-UpgradeRequest {
param($Config, [string]$Token, [string]$NodeUrl, [string]$Ip)
@ -945,16 +1104,16 @@ function Invoke-Rollback {
}
try {
$body = Join-RequestPairs ([ordered]@{
$query = Join-RequestPairs ([ordered]@{
airportCode = $Config.AIRPORT_CODE
targetIp = $Ip
applicationName = $Config.APP_NAME
moduleName = $Config.MODULE_NAME
timeOut = $Config.TIMEOUT
})
$response = Invoke-PamWebRequest -Method POST -Url "$($Config.HOME_BASE_URL)/node-proxy/$($Config.AIRPORT_CODE)/api/mcp/version/upgrade/rollback" -Token $Token -Headers @{
$response = Invoke-PamWebRequest -Method POST -Url "$($Config.HOME_BASE_URL)/node-proxy/$($Config.AIRPORT_CODE)/api/mcp/version/upgrade/rollback?$query" -Token $Token -Headers @{
'Target-Node' = $NodeUrl
} -Body $body -ContentType 'application/x-www-form-urlencoded'
}
$rollbackSuccess = Get-ResponseValue -Response $response -Candidates @('success')
if ($rollbackSuccess -and $rollbackSuccess -ne 'true') {
@ -1273,7 +1432,7 @@ function Invoke-PamAction {
'poll-download-progress' {
$token = Invoke-FlowStep -Name 'Get-Token' -Action { Get-Token -Config $config }
$nodeUrl = Invoke-FlowStep -Name 'Get-NodeUrl' -Action { Get-NodeUrl -Config $config -Token $token }
Invoke-FlowStep -Name 'Wait-DownloadProgress' -Action { Wait-DownloadProgress -Config $config -Token $token -NodeUrl $nodeUrl } | Out-Null
Invoke-FlowStep -Name 'Read-DownloadProgress' -Action { Read-DownloadProgress -Config $config -Token $token -NodeUrl $nodeUrl } | Out-Null
Write-DownloadProgressResult
}
'download-cloud-to-node' {
@ -1287,7 +1446,7 @@ function Invoke-PamAction {
Require-IpArgument -TargetIp $Ip
$token = Invoke-FlowStep -Name 'Get-Token' -Action { Get-Token -Config $config }
$nodeUrl = Invoke-FlowStep -Name 'Get-NodeUrl' -Action { Get-NodeUrl -Config $config -Token $token }
Invoke-FlowStep -Name "Wait-UpgradeProgress[$Ip]" -Action { Wait-UpgradeProgress -Config $config -Token $token -NodeUrl $nodeUrl -Ip $Ip } | Out-Null
Invoke-FlowStep -Name "Read-UpgradeProgress[$Ip]" -Action { Read-UpgradeProgress -Config $config -Token $token -NodeUrl $nodeUrl -Ip $Ip } | Out-Null
Write-UpgradeProgressResult -Ip $Ip
}
'upgrade-ip' {

View File

@ -57,6 +57,16 @@ usage() {
ACTION_TYPE
TIMEOUT
LOG_NAME
PARENT_VERSION_NUMBER
POLL_INTERVAL_SEC
DOWNLOAD_POLL_MAX_ATTEMPTS
UPGRADE_POLL_MAX_ATTEMPTS
VERIFY_INTERVAL_SEC
VERIFY_MAX_ATTEMPTS
说明:
--action poll-download-progress 和 poll-upgrade-progress 只执行一次进度查询。
Agent workflow 会重复调用单次进度查询,并在每次返回后交给 LLM/规则审核判断是否完成。
EOF
}
@ -342,6 +352,12 @@ set_defaults() {
: "${ACTION_TYPE:=FULL}"
: "${TIMEOUT:=120}"
: "${LOG_NAME:=app.log}"
: "${PARENT_VERSION_NUMBER:=}"
: "${POLL_INTERVAL_SEC:=2}"
: "${DOWNLOAD_POLL_MAX_ATTEMPTS:=60}"
: "${UPGRADE_POLL_MAX_ATTEMPTS:=600}"
: "${VERIFY_INTERVAL_SEC:=10}"
: "${VERIFY_MAX_ATTEMPTS:=12}"
}
load_config() {
@ -366,7 +382,7 @@ load_config() {
value="$(strip_inline_comment "$value")"
case "$key" in
HOME_BASE_URL|CLIENT_ID|CLIENT_SECRET|AIRPORT_CODE|APP_NAME|MODULE_NAME|VERSION_NUMBER|ZIP_FILE_PATH|ACTION_TYPE|TIMEOUT|LOG_NAME)
HOME_BASE_URL|CLIENT_ID|CLIENT_SECRET|AIRPORT_CODE|APP_NAME|MODULE_NAME|VERSION_NUMBER|ZIP_FILE_PATH|ACTION_TYPE|TIMEOUT|LOG_NAME|PARENT_VERSION_NUMBER|POLL_INTERVAL_SEC|DOWNLOAD_POLL_MAX_ATTEMPTS|UPGRADE_POLL_MAX_ATTEMPTS|VERIFY_INTERVAL_SEC|VERIFY_MAX_ATTEMPTS)
printf -v "$key" '%s' "$value"
;;
esac
@ -961,8 +977,6 @@ get_online_ips() {
poll_download_progress() {
local progress_url="${HOME_BASE_URL}/node-proxy/${AIRPORT_CODE}/api/mcp/version/upgrade/download-cloud/progress?applicationName=${APP_NAME}&moduleName=${MODULE_NAME}&airportCode=${AIRPORT_CODE}&versionNumber=${VERSION_NUMBER}"
local attempt=0
local max_attempts=60
local error_regex='[Ff]ail|[Ee]rror'
DOWNLOAD_PROGRESS_STATUS=""
@ -973,7 +987,6 @@ poll_download_progress() {
DOWNLOAD_PROGRESS_RATE=""
DOWNLOAD_PROGRESS_RESPONSE=""
while (( attempt < max_attempts )); do
local response
response=$(http_request "GET" "$progress_url" "" "" "Target-Node: ${NODE_URL}") || return 1
@ -1010,28 +1023,42 @@ poll_download_progress() {
[[ -n "$status" ]] && progress_parts+=("status=${status}")
[[ -n "$success_flag" ]] && progress_parts+=("success=${success_flag}")
[[ -n "$message" && "$message" != "$msg_value" ]] && progress_parts+=("message=${message}")
log_info "Step 3.3b: 异步下载进度 -> ${progress_parts[*]}"
log_info "Step 3.3b: 异步下载进度单次查询 -> ${progress_parts[*]}"
else
log_info "Step 3.3b: 异步下载进度轮询中... ($((attempt + 1))/${max_attempts})"
log_info "Step 3.3b: 异步下载进度单次查询未返回明确进度字段。"
fi
if [[ "$step_value" == "DONE" || "$status" == "completed" || "$success_flag" == "true" ]]; then
return 0
fi
if [[ "$msg_value" == "success" && "$progress_value" == "100" ]]; then
return 0
fi
if [[ "${step_value} ${message} ${msg_value}" =~ $error_regex ]]; then
if [[ "${step_value} ${message} ${msg_value} ${status}" =~ $error_regex ]]; then
[[ -z "$message" ]] && message="$step_value"
[[ -z "$message" ]] && message="$msg_value"
log_error "Node 下载失败: $message"
return 1
fi
return 0
}
download_progress_complete() {
[[ "$DOWNLOAD_PROGRESS_STEP" == "DONE" || "$DOWNLOAD_PROGRESS_STATUS" == "completed" || "$DOWNLOAD_PROGRESS_SUCCESS" == "true" ]] && return 0
[[ "$DOWNLOAD_PROGRESS_MSG" == "success" && "$DOWNLOAD_PROGRESS_RATE" == "100" ]] && return 0
return 1
}
wait_download_progress() {
local attempt=0
local max_attempts="${DOWNLOAD_POLL_MAX_ATTEMPTS:-60}"
local interval_sec="${POLL_INTERVAL_SEC:-2}"
[[ "$max_attempts" =~ ^[0-9]+$ ]] || max_attempts=60
[[ -n "$interval_sec" ]] || interval_sec=2
while (( attempt < max_attempts )); do
poll_download_progress || return 1
if download_progress_complete; then
return 0
fi
attempt=$((attempt + 1))
sleep 2
log_info "Step 3.3b: 异步下载进度未完成,等待下一次查询... (${attempt}/${max_attempts})"
sleep "$interval_sec"
done
log_error "Node 下载超时。"
@ -1040,8 +1067,13 @@ poll_download_progress() {
create_download_task() {
log_info "Step 3.3: 下载软件包到 Node..."
local download_query="versionNumber=${VERSION_NUMBER}&applicationName=${APP_NAME}&moduleName=${MODULE_NAME}&timeOut=0"
if [[ -n "${PARENT_VERSION_NUMBER:-}" ]]; then
download_query="${download_query}&parentVersionNumber=${PARENT_VERSION_NUMBER}"
fi
http_request "GET" \
"${HOME_BASE_URL}/node-proxy/${AIRPORT_CODE}/api/mcp/version/upgrade/download-cloud?versionNumber=${VERSION_NUMBER}&applicationName=${APP_NAME}&moduleName=${MODULE_NAME}&timeOut=0" \
"${HOME_BASE_URL}/node-proxy/${AIRPORT_CODE}/api/mcp/version/upgrade/download-cloud?${download_query}" \
"" \
"" \
"Target-Node: ${NODE_URL}" \
@ -1050,14 +1082,12 @@ create_download_task() {
download_cloud_to_node() {
create_download_task || return 1
poll_download_progress
wait_download_progress
}
poll_upgrade_progress() {
local ip="$1"
local progress_url="${HOME_BASE_URL}/node-proxy/${AIRPORT_CODE}/api/mcp/version/upgrade/progress?applicationName=${APP_NAME}&moduleName=${MODULE_NAME}&airportCode=${AIRPORT_CODE}&versionNumber=${VERSION_NUMBER}"
local attempt=0
local max_attempts=600
local error_regex='[Ff]ail|[Ee]rror'
UPGRADE_PROGRESS_STATUS=""
@ -1071,7 +1101,6 @@ poll_upgrade_progress() {
UPGRADE_PROGRESS_LAST_MODIFY=""
UPGRADE_PROGRESS_RESPONSE=""
while (( attempt < max_attempts )); do
local response
response=$(http_request "GET" "$progress_url" "" "" "Target-Node: ${NODE_URL}") || return 1
@ -1120,17 +1149,9 @@ poll_upgrade_progress() {
[[ -n "$success_flag" ]] && progress_parts+=("success=${success_flag}")
[[ -n "$last_modify_value" ]] && progress_parts+=("lastModify=${last_modify_value}")
[[ -n "$message" && "$message" != "$msg_value" ]] && progress_parts+=("message=${message}")
log_info "Step 3.4a: async push progress -> ${progress_parts[*]}"
log_info "Step 3.4a: async push progress single query -> ${progress_parts[*]}"
else
log_info "Step 3.4a: async push progress polling... ip=${ip} ($((attempt + 1))/${max_attempts})"
fi
if [[ "$step_value" == "DONE" || "$finish_value" == "true" || "$status" == "completed" || "$success_flag" == "true" ]]; then
return 0
fi
if [[ "$msg_value" == "success" && "$progress_value" == "100" ]] && [[ -z "$code_value" || "$code_value" == "0" ]]; then
return 0
log_info "Step 3.4a: async push progress single query returned no explicit progress fields: ip=${ip}"
fi
if [[ -n "$code_value" && "$code_value" != "0" ]]; then
@ -1148,8 +1169,31 @@ poll_upgrade_progress() {
return 1
fi
return 0
}
upgrade_progress_complete() {
[[ "$UPGRADE_PROGRESS_STEP" == "DONE" || "$UPGRADE_PROGRESS_FINISH" == "true" || "$UPGRADE_PROGRESS_STATUS" == "completed" || "$UPGRADE_PROGRESS_SUCCESS" == "true" ]] && return 0
[[ "$UPGRADE_PROGRESS_MSG" == "success" && "$UPGRADE_PROGRESS_RATE" == "100" ]] && [[ -z "$UPGRADE_PROGRESS_CODE" || "$UPGRADE_PROGRESS_CODE" == "0" ]] && return 0
return 1
}
wait_upgrade_progress() {
local ip="$1"
local attempt=0
local max_attempts="${UPGRADE_POLL_MAX_ATTEMPTS:-600}"
local interval_sec="${POLL_INTERVAL_SEC:-2}"
[[ "$max_attempts" =~ ^[0-9]+$ ]] || max_attempts=600
[[ -n "$interval_sec" ]] || interval_sec=2
while (( attempt < max_attempts )); do
poll_upgrade_progress "$ip" || return 1
if upgrade_progress_complete; then
return 0
fi
attempt=$((attempt + 1))
sleep 2
log_info "Step 3.4a: async push progress not complete, waiting for next query... ip=${ip} (${attempt}/${max_attempts})"
sleep "$interval_sec"
done
log_error "Node push timed out: ip=${ip}"
@ -1257,10 +1301,11 @@ rollback_ip() {
fi
local response
local rollback_query="airportCode=${AIRPORT_CODE}&targetIp=${ip}&applicationName=${APP_NAME}&moduleName=${MODULE_NAME}&timeOut=${TIMEOUT}"
if ! response=$(http_request "POST" \
"${HOME_BASE_URL}/node-proxy/${AIRPORT_CODE}/api/mcp/version/upgrade/rollback" \
"airportCode=${AIRPORT_CODE}&targetIp=${ip}&applicationName=${APP_NAME}&moduleName=${MODULE_NAME}&timeOut=${TIMEOUT}" \
"application/x-www-form-urlencoded" \
"${HOME_BASE_URL}/node-proxy/${AIRPORT_CODE}/api/mcp/version/upgrade/rollback?${rollback_query}" \
"" \
"" \
"Target-Node: ${NODE_URL}"); then
printf '%s' "ROLLBACK_REQUEST_FAILED"
return 0
@ -1522,7 +1567,7 @@ deploy_one_ip() {
return
fi
if ! run_flow_step "poll_upgrade_progress[${ip}]" poll_upgrade_progress "$ip"; then
if ! run_flow_step "wait_upgrade_progress[${ip}]" wait_upgrade_progress "$ip"; then
local message
message="$UPGRADE_PROGRESS_MESSAGE"
[[ -z "$message" ]] && message="$UPGRADE_PROGRESS_MSG"

View File

@ -1,6 +1,6 @@
# 当前整体逻辑结构流程图
本文描述当前 PAM 部署 Agent 的主要模块、运行路径、LLM 审核、人工确认点、热更新和断点续跑逻辑。
本文描述当前 PAM 部署 Agent 的主要模块、运行路径、LLM 审核、失败重试、显式回滚、热更新和断点续跑逻辑。
## 模块结构
@ -27,7 +27,7 @@ flowchart TD
REAL --> AGENT
LGR --> AGENT
LGR --> LGCHECK[LangGraph InMemorySaver checkpointer/interrupt]
LGR --> LGCHECK[LangGraph InMemorySaver checkpointer]
AGENT --> ROUTER[ActionRouter]
ROUTER --> SCRIPT[ScriptActionRunner]
ROUTER --> MCP[McpActionRunner]
@ -68,8 +68,8 @@ flowchart TD
C --> D[build_action_backends 生成 action 路由表]
D --> E[LangGraph entry 节点]
E --> F{是否存在 pending_confirmation}
F -- 是 --> P[confirm interrupt 节点]
E --> F{是否已暂停}
F -- 是 --> R[render_report 输出报告]
F -- 否 --> G[global_action 节点循环]
G --> G1[get-token]
@ -79,15 +79,24 @@ flowchart TD
G4 --> G5[get-node-url]
G5 --> G6[get-online-ips]
G6 --> G7[create-download-task]
G7 --> G8[poll-download-progress]
G8 --> H[prepare_ip 节点选择下一个 IP action]
G7 --> G8[poll-download-progress 单次查询]
G8 --> G9{LLM/规则判断下载完成}
G9 -- 未完成且正常 --> G8
G9 -- 已完成 --> H[prepare_ip 节点选择下一个 IP action]
G9 -- 异常或超时 --> R
H --> I[resolve_target_ips 计算目标 IP]
I --> J[ip_action 节点执行 upgrade-ip]
J --> K[ip_action 节点执行 poll-upgrade-progress]
K --> L[ip_action 节点执行 start-ip]
J --> K[ip_action 节点执行 poll-upgrade-progress 单次查询]
K --> K1{LLM/规则判断推送完成}
K1 -- 未完成且正常 --> K
K1 -- 已完成 --> L[ip_action 节点执行 start-ip]
K1 -- 异常或超时 --> R
L --> M[ip_action 节点执行 verify-ip]
M --> N[ip_action 节点执行 download-log]
M --> M1{健康检查通过或达到最大次数}
M1 -- 未通过且未超时 --> M
M1 -- 已通过 --> N[ip_action 节点执行 download-log]
M1 -- 仍未通过且超时 --> R
N --> O{还有下一个 IP}
O -- 是 --> J
O -- 否 --> R[render_report 输出报告]
@ -109,15 +118,15 @@ flowchart LR
```mermaid
flowchart TD
A[action 执行完成] --> C[整理 ActionResult 和 AgentState 摘要]
C --> D[敏感字段脱敏并截断长日志]
A[action 执行完成] --> C[整理当前 ActionResult]
C --> D[敏感字段脱敏;仅在异常时附带必要诊断日志]
D --> E{真实 LLM 是否配置}
E -- 是 --> F[OpenAICompatibleLlmClient 输出结构化审核]
E -- 否 --> G[RuleBasedLlmClient 本地规则审核]
F --> H{should_continue}
G --> H
H -- true --> I[继续后续 action]
H -- false --> J[暂停流程并写入 review_context]
H -- true --> I[标记 action completed 并继续后续 action]
H -- false --> J[不写 completed暂停流程并写入 review_context]
J --> K[chat/CLI 播报审核建议并等待 resume]
F --> L{是否开启 analyze-actions}
G --> L
@ -128,37 +137,78 @@ flowchart TD
说明:
- 每个 action 完成后都会进入一次审核,不再依赖 `--analyze-actions` 开关。
- 审核输入只包含当前 action 的结构化结果和必要诊断日志,不再传入完整运行态 `state_summary`,避免历史状态干扰大模型判断。
- `--analyze-actions``llm action-analysis on` 只控制是否把详细审核结果写入 `events`
- 只有 action 执行成功且审核允许继续时,才会写入 `completed_global_steps``ip_states[ip].completed_steps`
- 如果审核建议停止或审核本身失败,当前 action 不会计入 completed`resume` 会重试当前 action。
- 如果审核本身失败,也会生成“停止继续”的审核结果并暂停流程,避免黑盒继续执行。
## 失败、人工确认和续跑
## verify-ip 健康检查重试
```mermaid
flowchart TD
A[执行 verify-ip] --> B[LLM/规则审核单次返回]
B --> C{SUCCESS 是否为 true}
C -- 是 --> D[清理重试计数,标记 verify-ip completed]
C -- 否 --> E{是否达到 VERIFY_MAX_ATTEMPTS}
E -- 否 --> F[播报 ACTION_PROGRESS 并保存 checkpoint]
F --> G[等待 VERIFY_INTERVAL_SEC]
G --> A
E -- 是 --> H[暂停在 verify-ip写入 review_context]
```
说明:
- `verify-ip` 用于应用启动后的健康检查,失败时默认每 `10` 秒重试一次,最多 `12` 次,约两分钟。
- 重试参数来自 `VERIFY_INTERVAL_SEC``VERIFY_MAX_ATTEMPTS`,支持通过 `config.txt`、chat `set``load params` 热更新。
- 未达到最大次数时不会把 `verify-ip` 写入 completed也不会进入 `download-log`;中断或失败后 `resume` 仍从 `verify-ip` 继续。
## 进度查询 action 语义
```mermaid
flowchart TD
A[poll-download-progress / poll-upgrade-progress] --> B[执行一次进度查询]
B --> C[ActionResult 返回结构化进度字段]
C --> D[LLM/规则审核 progress_complete]
D --> E{是否完成}
E -- 是 --> F[写入 completed进入下一个 action]
E -- 否但正常 --> G[追加 ACTION_PROGRESS保存 checkpoint]
G --> H[按 POLL_INTERVAL_SEC 等待]
H --> A
E -- 异常 --> I[暂停在当前 progress action]
G --> J{达到最大查询次数}
J -- 是 --> I
J -- 否 --> H
```
- `poll-download-progress``poll-upgrade-progress` 不再在脚本内部长时间循环;脚本/MCP/fake 每次只返回一次进度查询结果。
- LLM/规则通过 `progress_complete` 判断进度是否完成。未完成但正常时,`should_continue=true``progress_complete=false`workflow 会保留当前 action 并再次查询。
- 查询间隔由 `POLL_INTERVAL_SEC` 控制,下载最大次数由 `DOWNLOAD_POLL_MAX_ATTEMPTS` 控制,单 IP 推送最大次数由 `UPGRADE_POLL_MAX_ATTEMPTS` 控制。
- 每次进度查询都会播报 `ACTION_PROGRESS` 并保存 checkpoint中断或失败后 `resume` 会从同一个 progress action 继续。
## 失败、显式回滚和续跑
```mermaid
flowchart TD
A[逐 IP action 执行] --> B{action 失败或业务校验失败}
B -- 否 --> C[记录 completed_steps 并保存 checkpoint]
C --> C1{LLM 审核是否允许继续}
C1 -- 是 --> C2[继续后续 action]
C1 -- 否 --> G[保存 checkpoint 并暂停]
B -- 否 --> C{LLM 审核是否允许继续}
C -- 是 --> C1[记录 completed_steps 并保存 checkpoint]
C1 --> C2[继续后续 action]
C -- 否 --> G[不记录 completed_steps保存 checkpoint 并暂停]
B -- 是 --> D[记录 ip_state 为 FAILED]
D --> E[download-log 尽力下载日志]
E --> F[设置 pending_confirmation=rollback-ip:IP]
D --> F[保存 failed_stage 和 failure_reason]
F --> G[保存 checkpoint 并暂停]
G --> LG{是否来自 CLI/chat 图运行}
LG -- 是 --> LGI[LangGraph interrupt 输出确认请求]
LGI --> LGRS[approve/reject 通过 Command resume 恢复]
LGRS --> H{用户决定}
LG -- 否 --> H{用户决定}
H -- approve --> I[confirm_pending 执行 rollback-ip]
I --> J{rollback 是否成功}
J -- 是 --> K[清空 pending_confirmation]
J -- 否 --> L[保持 pending_confirmation等待再次处理]
H -- reject --> M[标记 REJECTED_BY_OPERATOR 并清空 pending_confirmation]
K --> N[resume 续跑]
M --> N
N --> O[跳过已完成全局步骤、成功 IP 和单 IP 已完成 action]
G --> H{用户决定}
H -- 修复后继续 --> I[resume 清理 paused]
I --> J[next_ip_action 返回 failed_stage]
J --> K[重试当前 action]
H -- 需要回滚 --> L[rollback IP 显式执行 rollback-ip]
L --> M{rollback 是否成功}
M -- 是 --> N[标记 ROLLBACK_DONE]
M -- 否 --> O[暂停为 rollback_failed]
N --> P[resume 续跑]
P --> Q[跳过已完成全局步骤、成功 IP、已回滚 IP 和单 IP 已完成 action]
```
## 用户中断与热更新
@ -183,13 +233,16 @@ flowchart TD
## checkpoint 续跑语义
- `completed_global_steps`:全局阶段已经完成的 action 会跳过。
- `completed_global_steps` 只记录“执行成功且审核通过”的全局 action审核阻断时不会提前写入`resume` 会重试该 action。
- `ip_states[ip].status == SUCCESS`:成功 IP 会跳过。
- `ip_states[ip].completed_steps`:同一个 IP 已完成的 action 会跳过。
- `pending_confirmation`:存在待确认事项时,部署流程不继续执行,必须先 `approve``reject`
- `paused` / `pause_reason`:流程可能因 LLM 审核阻断、用户中断、回滚失败等原因暂停;`resume` 会先清理暂停标记,再继续执行。
- `ip_states[ip].rollback_status == ROLLBACK_DONE`:已显式回滚的失败 IP 会跳过,继续后续目标。
- `ip_states[ip].failed_stage`:失败 IP 未回滚时,`resume` 会从该 action 重试。
- `ip_states[ip].completed_steps`:同一个 IP 已完成且审核通过的 action 会跳过;审核阻断时不会提前写入,`resume` 会重试当前 action。
- `pending_confirmation`:仅保留为旧 checkpoint/旧 confirm 入口的兼容字段,新失败流程不再自动设置。
- `paused` / `pause_reason`:流程可能因 action 失败、LLM 审核阻断、用户中断、回滚失败等原因暂停;`resume` 会先清理暂停标记,再继续执行。
- `review_context`保存最近一次暂停时的审核建议、失败原因、IP 和阶段,供 chat/CLI 输出给用户。
- CLI/chat 的运行调度由 `langgraph_runtime.py` 通过 action 级 LangGraph 节点执行chat 和 CLI confirm 的确认点使用 LangGraph interrupt 和 InMemorySaver。
- 跨进程续跑读取业务 checkpoint JSONLangGraph checkpointer 负责单进程图恢复和 interrupt resume
- CLI/chat 的运行调度由 `langgraph_runtime.py` 通过 action 级 LangGraph 节点执行;失败暂停和续跑依赖业务 checkpoint JSON
- 跨进程续跑读取业务 checkpoint JSONLangGraph checkpointer 负责单进程图状态保存
- checkpoint 为了真实续跑会保存完整参数,请放在受控目录中。
## 真实外部能力接入点

View File

@ -11,75 +11,14 @@
- [x] 增加参数确认和目标 IP 范围确认,不只在回滚阶段确认。
- [x] 增加 LLM/MCP 配置热加载,例如 `llm config``mcp config`
- [x] 增加执行中 `Ctrl+C` 中断处理:保存 checkpoint、标记 `user_interrupted`,再由 `resume` 继续。
- [x] 将 chat 的人工确认点接入 LangGraph interrupt/checkpointer`run` 执行到回滚确认点后由 interrupt 暂停,`approve/reject` 通过 `Command(resume=...)` 恢复同一图线程。跨进程续跑仍保留业务 checkpoint JSON
- [x] 将 chat 执行接入 action 级 LangGraph runtimeaction 失败或审核阻断后保存 checkpoint 并暂停,`resume` 从当前 action 重试,`rollback [IP]` 作为显式命令单独执行
## LLM action 后分析
- [x] 每次 action 完成后,可把 `action``backend``ok``values``stderr``error_summary` 和当前 `AgentState` 摘要交给 LLM 分析。
- [x] LLM 输出结构化结果:是否异常、异常等级、可能原因、建议动作、是否需要人工确认。
- [x] LLM 分析结果会影响流程是否继续:`should_continue=false` 时自动暂停,并把建议输出给用户。
- [x] 本地保留规则兜底exit code、`verify-ip SUCCESS=false`、pending confirmation 等硬规则优先于 LLM。
- [x] 本地保留规则兜底exit code、`verify-ip SUCCESS=false`旧版 pending confirmation 等硬规则优先于 LLM。
- [x] 对 LLM 输入做脱敏,禁止把 `CLIENT_SECRET`、token、Authorization、完整日志原文发送给模型。
- [x] 每个 action 都会执行审核;`--analyze-actions``llm action-analysis on` 只控制是否把详细审核结果写入 `events`
- [x] 支持通过 `--llm-action-analysis-prompt-file`、环境变量或 chat 命令热加载自定义 action 审核提示词。
- [x] 通过 `--analyze-actions``llm action-analysis on` 显式开启,真实部署默认不启用。
## 双模式 Agent 改造
- [x] 明确双模式入口:`fixed_runtime``agentic_skill`,由 LLM 基于用户意图、风险和任务类型先做模式决策。
- [x] 为 LLM 增加模式决策 structured output schema至少输出`mode``reason``risk_level``requires_confirmation`
- [x] 保留固定 runtime 为默认主链路:标准部署、高风险动作、回滚和批量升级优先走确定性流程。
- [x] 仅在诊断类、探索类、半结构化任务,或用户明确要求“按 skill 自主编排”时进入 `agentic_skill` 模式。
## Skill 驱动执行
- [x] 将 `PAM_AUTO_DEPLY_SKILL.md` 从“只加载元数据”升级为真正驱动执行的规则源。
- [x] 让 `SkillPolicy` 进入 LLM prompt明确 allowed actions、required params、required confirmations、forbidden actions。
- [x] 让 `SkillPolicy` 进入 runtime/graph 路由,用于裁剪不可执行 action而不是只挂在 `PamDeployAgent.skill_policy` 上。
- [ ] 支持把 skill 中的执行顺序、确认点、回滚约束映射到 LangGraph 节点和边。
- [ ] 评估是否需要把 markdown skill 拆成“机器可读配置 + 人类可读说明”双文件结构,降低解析歧义。
## Tool Schema 收口
- [x] 不给 LLM 原始 shell 或 powershell 执行权限,只允许调用受控 typed tools。
- [x] 把现有脚本 action 统一抽象为标准 tool schema例如 `create_version``upload_package``publish_version``get_online_ips``upgrade_ip``verify_ip``rollback_ip`
- [x] 把 MCP action 统一抽象为同一套 tool schema避免“大模型看见的是脚本”和“大模型看见的是 MCP tool”两套接口。
- [ ] 为每个 tool 明确入参、出参、是否幂等、是否高风险、是否需要人工确认。
- [ ] 在 tool 层增加白名单、步数限制、超时限制和调用次数限制,避免 agentic 模式失控。
## MCP 与 LLM 协同
- [ ] 明确 MCP 在 agentic 模式中的角色:作为 LLM 可调用 tool而不是仅作为固定 runtime 的后端。
- [ ] 为 MCP tool 增加面向 LLM 的描述信息,至少包含用途、必填参数、成功返回字段、失败语义。
- [ ] 支持在 agentic 模式下先 `list_tools` 再做工具匹配,但最终仍映射回受控 action/tool schema。
- [ ] 评估是否需要增加 MCP tool 结果摘要层,避免把原始复杂返回直接喂回 LLM。
## LangGraph Workflow 收敛
- [ ] 合并当前 `graph.py``langgraph_runtime.py` 的职责,避免维护两套相近的部署图工厂。
- [ ] 在 LangGraph 中补齐完整工作流:`analyze -> clarify -> decide_mode -> confirm -> execute -> diagnose -> resume`
- [ ] 将人工确认点从“只覆盖 rollback”扩展到模式切换、高风险 tool 调用、关键参数缺失修正等场景。
- [ ] 为 `agentic_skill` 模式增加 LangGraph tool loop 节点,而不是仅复用当前固定 action 图。
- [ ] 明确单进程 LangGraph checkpointer 和跨进程业务 checkpoint 的职责边界,避免双状态源混乱。
## Chat 交互升级
- [x] 在 chat 中增加“当前模式”展示,让用户知道当前是固定 runtime 还是 agentic skill。
- [ ] 在 chat 中增加模式切换确认,例如从固定 runtime 切到 agentic 模式前提示风险和限制。
- [ ] 在 chat 中展示大模型最近一次 tool 决策摘要,包括原因、目标 action/tool 和关键参数。
- [ ] 为 agentic 模式增加中途打断、继续、回退到固定 runtime 的命令能力。
## 安全与可观测性
- [ ] 为 agentic 模式增加完整审计日志用户输入、LLM 决策、tool 调用、tool 返回、确认记录。
- [ ] 在高风险 tool 调用前增加统一确认策略,不允许 LLM 自行越过人工确认直接执行。
- [ ] 对发送给 LLM 的 tool 返回做脱敏和截断,避免把脚本原始日志、敏感配置和 token 直接暴露给模型。
- [ ] 增加失败保护:当 LLM structured output 非法、模式决策异常或 tool loop 超限时自动降级回固定 runtime。
## 测试与验收
- [x] 增加双模式决策测试,覆盖固定 runtime 与 agentic skill 的分流条件。
- [x] 增加 skill 约束测试,确认 forbidden action、required confirmation、required params 会真正生效。
- [ ] 增加 agentic tool loop 测试,验证 LLM 只能调用白名单工具,不能执行任意命令。
- [ ] 增加 MCP + LLM 协同测试,验证 MCP tools 可以被统一 schema 包装并参与自主编排。
- [ ] 补充文档,明确当前项目是“固定 runtime 为主agentic skill 为辅”的演进路线,而不是直接替代现有部署流。

View File

@ -70,11 +70,23 @@ cd pam-deploy-agent-linux-x86_64
本次发布包对应的运行时行为也已同步到包内 `README.md`
- 每个 action 完成后都会自动执行一次 LLM/规则审核。
- 每个 action 完成后都会自动执行一次 LLM/规则审核,只有审核通过才会把 action 记为 completed。
- action 审核输入不包含完整运行态 `state_summary`,只包含当前 action 的结构化结果和必要诊断日志,避免历史状态干扰大模型判断。
- `create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会传给云下载接口的 `parentVersionNumber`;默认空值不发送,继承正在使用的版本规则。
- `poll-download-progress``poll-upgrade-progress` 是单次进度查询 actionAgent workflow 会按配置重复调用,每次返回后交给 LLM/规则判断是否完成并播报进度。
- `verify-ip` 会按 `VERIFY_INTERVAL_SEC` / `VERIFY_MAX_ATTEMPTS` 做应用健康检查重试,默认每 10 秒一次、最多 12 次,仍未通过才暂停。
- `--analyze-actions` 只控制是否把详细审核结果写入 `events`
- chat 支持执行中 `Ctrl+C` 中断后保存 checkpoint再通过 `resume` 继续。
- action 失败或审核阻断后会保存 checkpoint 并暂停;修复外部环境后通过 `resume` 从当前 action 重试。
- 回滚不再属于主 workflow 自动分支;需要时使用 chat 内 `rollback [IP]` 或 CLI `rollback --checkpoint ...` 显式执行。
- chat 中非内置命令默认交给当前 LLM 普通对话,不会自动触发部署 workflow普通对话优先流式展示`<think>...</think>` 思考内容会被过滤;完整部署仍需 `analyze` / `run` 并人工确认。
- chat 支持 `ask <问题>``log analyze <路径>``action propose <需求>``action run ...`,可用于普通问答、日志尾部分析和确认后执行单 action日志分析输出同样会过滤 `<think>` 内容。
- chat 支持执行中 `Ctrl+C` 中断后保存 checkpoint再通过 `resume` 重试当前 action。
- chat 支持 `set KEY=VALUE``load params <路径>` 热更新当前运行任务参数。
- 进度查询和健康检查重试参数可通过 `POLL_INTERVAL_SEC``DOWNLOAD_POLL_MAX_ATTEMPTS``UPGRADE_POLL_MAX_ATTEMPTS``VERIFY_INTERVAL_SEC``VERIFY_MAX_ATTEMPTS` 配置。
- 支持通过 `--llm-action-analysis-prompt-file` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。
- chat 支持 `llm test [文本]` 测试当前 LLM client 是否正常加载。
- 默认运行日志写入 `logs/pam_deploy_agent.log`,按天切分并默认保留 14 个历史日切文件,可通过 `PAM_AGENT_LOG_FILE``PAM_AGENT_LOG_LEVEL``PAM_AGENT_LOG_RETENTION_DAYS` 调整。
- 日志会脱敏 token、secret、api_key、Authorization 等字段chat 普通对话和日志分析不会记录原始 `<think>` 内容checkpoint 仍保存完整运行参数,请放在受控目录。
## 包大小评估

View File

@ -34,9 +34,10 @@ pam-deploy-agent-linux-x86_64/
./run.sh run-deploy --help
```
发布包默认使用普通文本输入,避免 PyInstaller 环境下 `prompt_toolkit` 兼容性问题;输出仍会在可用时使用 `rich` 做更清晰的文本展示。
chat 内的失败回滚确认由 LangGraph interrupt 托管;执行停在确认点后,输入 `approve``reject [原因]` 会恢复同一个图线程继续处理。
chat 会在执行前归一化并展示实际写入脚本配置的参数;`script_only` / `hybrid_node_mcp` 会先检查 `ZIP_FILE_PATH` 是否存在,避免脚本运行后才用默认路径失败。执行过程中每个 action 都会输出开始、完成或失败状态;每个 action 完成后还会自动进入一次 LLM/规则审核,并播报审核开始和审核结果。
发布包默认会优先使用 `prompt_toolkit` 增强输入,支持更稳定的退格、历史记录和补全;如果增强输入初始化失败,会自动降级到普通 `input()`。输出仍会在可用时使用 `rich` 做更清晰的文本展示。
action 失败或审核阻断后会保存 checkpoint 并暂停;修复外部环境后输入 `resume` 会从当前 action 重试。回滚不再属于主 workflow 自动分支,需要时在 chat 内输入 `rollback [IP]` 显式执行。
chat 会在执行前归一化并展示实际写入脚本配置的参数;`script_only` / `hybrid_node_mcp` 会先检查 `ZIP_FILE_PATH` 是否存在,避免脚本运行后才用默认路径失败。执行过程中每个 action 都会输出开始、完成或失败状态;每个 action 完成后还会自动进入一次 LLM/规则审核,并播报审核开始和审核结果;审核输入只包含当前 action 的结构化结果和必要诊断日志,不会把完整运行态 `state_summary` 交给大模型;只有审核通过才会把 action 记为 completed。`create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会作为云下载接口参数 `parentVersionNumber` 传入;默认空值不发送,表示继承正在使用的版本规则。
`poll-download-progress``poll-upgrade-progress` 每次只查询一次进度Agent workflow 会按 `POLL_INTERVAL_SEC``DOWNLOAD_POLL_MAX_ATTEMPTS``UPGRADE_POLL_MAX_ATTEMPTS` 重复调用,并在每次返回后交给 LLM/规则判断是否完成、向 chat 播报进度。`verify-ip` 健康检查失败时Agent workflow 会按 `VERIFY_INTERVAL_SEC` 重试,最多 `VERIFY_MAX_ATTEMPTS` 次;默认每 10 秒一次、最多 12 次,仍未通过才暂停。
## 交互式使用
@ -71,12 +72,18 @@ PAM> run
PAM> status
PAM> params
PAM> events 5
PAM> ask 这个 agent 能做什么
PAM> log analyze logs/pam_deploy_agent.log 请帮我看最近异常 --tail 400
PAM> action propose 请单独执行 verify-ip 192.168.1.10
PAM> action run verify-ip ip=192.168.1.10
PAM> action run llm 请单独执行 get-online-ips
PAM> llm test
PAM> llm action-analysis on
PAM> llm config action_analysis_prompt_file=prompts/action_review.txt
PAM> mcp config mcp_client.example.json
PAM> list checkpoints
PAM> load checkpoint runtime/checkpoints/demo.json
PAM> approve
PAM> rollback
PAM> resume
PAM> exit
```
@ -123,18 +130,23 @@ PAM> exit
--confirm
```
处理失败后的回滚确认
失败后从断点重试
```bash
./run.sh confirm --checkpoint runtime/checkpoints/demo.json --decision approve --confirm
./run.sh resume --checkpoint runtime/checkpoints/demo.json --confirm
```
`confirm` 会通过 LangGraph interrupt resume 处理确认,并在确认后继续执行后续图节点;进程中断或需要再次续跑时,再使用 `resume`
拒绝回滚:
需要回滚失败 IP 时显式执行 rollback未指定 `--ip` 时会使用当前失败 IP
```bash
./run.sh confirm --checkpoint runtime/checkpoints/demo.json --decision reject --note "人工决定暂不回滚" --confirm
./run.sh rollback --checkpoint runtime/checkpoints/demo.json --confirm
./run.sh resume --checkpoint runtime/checkpoints/demo.json --confirm
```
也可以指定 IP 和停机策略:
```bash
./run.sh rollback --checkpoint runtime/checkpoints/demo.json --ip 192.168.1.10 --stop-first --note "人工决定回滚该 IP" --confirm
```
## LLM 配置
@ -176,10 +188,27 @@ chat 内也可以热加载 LLM
```text
PAM> llm config base_url=https://your-llm.example.com/v1 api_key=your-api-key model=your-model-name
PAM> llm config action_analysis_prompt_file=prompts/action_review.txt
PAM> llm test 请返回一次连通性测试结果
PAM> llm action-analysis on
PAM> llm fallback
```
`llm test [文本]` 会使用当前 LLM client 做一次轻量意图识别调用,并输出 client 类型、intent、strategy 和 confidence便于确认真实 LLM 或规则 fallback 是否正常加载。
## 日志
Agent 默认写入运行日志到 `logs/pam_deploy_agent.log`。日志覆盖 chat/CLI 输入、LLM 请求和响应摘要、action 路由、脚本/MCP 调用、LangGraph 节点、checkpoint 保存、暂停/续跑等关键流程。日志会在本地时间每日 0 点后首次写入时自动切分,历史文件形如 `pam_deploy_agent.log.YYYY-MM-DD`,默认保留 14 个历史日切文件。
可通过环境变量调整日志位置、级别和保留策略:
```bash
export PAM_AGENT_LOG_FILE=logs/pam_deploy_agent.log
export PAM_AGENT_LOG_LEVEL=INFO
export PAM_AGENT_LOG_RETENTION_DAYS=14
```
日志会递归脱敏 `CLIENT_SECRET``MCP_CLIENT_SECRET`、token、Authorization、api_key、password 等字段,并截断长文本。`PAM_AGENT_LOG_RETENTION_DAYS` 表示保留的历史日切文件数量,设为 `0` 时不自动清理历史切分文件。checkpoint 仍会保存完整运行参数,请放在受控目录。
## 策略说明
- `fake`:全部使用 fake runner不访问真实环境。
@ -219,9 +248,16 @@ MCP token 获取方式与 HOME 一致,默认按 `client_credentials` POST 到
## 注意事项
- 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL``CLIENT_ID``CLIENT_SECRET``AIRPORT_CODE``APP_NAME``MODULE_NAME``VERSION_NUMBER``ZIP_FILE_PATH`
- `chat` 中输入 `你好``hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时请直接描述部署任务,或显式使用 `analyze <需求>`
- `PARENT_VERSION_NUMBER` 是云下载可选参数;非空时会传给 `download-cloud``parentVersionNumber`,空值不会发送。
- `chat` 中非内置命令默认交给当前 LLM 做普通对话,不会自动触发部署 workflow普通对话优先流式展示`<think>...</think>` 思考内容会被过滤;需要分析部署需求时请显式使用 `analyze <需求>`,完整部署仍需 `run` 并逐步确认。
- `ask <问题>` 可显式普通对话;`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM日志分析输出同样会过滤 `<think>` 内容。
- `action propose <需求>` 只展示 LLM 解析出的单 action 计划;`action run <action> [ip=...] [KEY=VALUE...]``action run llm <需求>` 会在用户输入 `yes` 后才执行单 action。
- 每个 action 完成后都会自动执行一次 LLM/规则审核;`--analyze-actions``llm action-analysis on` 只控制是否把详细审核结果写入 `events`
- 如果审核建议停止、审核本身失败,或用户在执行中按下 `Ctrl+C`,流程都会保存 checkpoint 并进入暂停状态;后续可使用 `resume` 继续。
- action 审核输入不包含完整运行态 `state_summary`,只包含当前 action 的结构化结果和必要诊断日志。
- `poll-download-progress``poll-upgrade-progress` 是单次进度查询 action未完成时不会进入下一个 action最大查询次数和间隔可通过 `config.txt` 或 chat `set` 热更新。
- `verify-ip` 会按 `VERIFY_INTERVAL_SEC` / `VERIFY_MAX_ATTEMPTS` 做健康检查重试,默认每 10 秒一次、最多 12 次。
- `llm test [文本]` 可测试当前 LLM client 是否可用。
- 如果审核建议停止、审核本身失败,或用户在执行中按下 `Ctrl+C`,流程都会保存 checkpoint 并进入暂停状态;后续可使用 `resume` 重试当前 action。
- `set KEY=VALUE``load params <路径>` 会热更新当前运行任务的参数,并回写运行中的 `config.txt` 和 checkpoint。
- `checkpoint` 会保存完整运行参数,请放在受控目录。
- `hybrid_node_mcp``resume``confirm` 如果需要执行 MCP action请同时传入 `--mcp-config`
- `hybrid_node_mcp``resume``rollback` 如果需要执行 MCP action请同时传入 `--mcp-config`

View File

@ -44,6 +44,14 @@ else
python -m pip install -e .
fi
PYINSTALLER_EXTRA_ARGS=()
if python -c "import importlib.util; raise SystemExit(0 if importlib.util.find_spec('prompt_toolkit') else 1)"; then
PYINSTALLER_EXTRA_ARGS+=(--collect-submodules prompt_toolkit --collect-data prompt_toolkit)
fi
if python -c "import importlib.util; raise SystemExit(0 if importlib.util.find_spec('rich') else 1)"; then
PYINSTALLER_EXTRA_ARGS+=(--collect-submodules rich)
fi
echo "==> 使用 PyInstaller 生成自带 Python 运行时的可执行目录"
python -m PyInstaller \
--clean \
@ -57,6 +65,7 @@ python -m PyInstaller \
--collect-submodules pam_deploy_graph \
--collect-submodules langgraph \
--hidden-import pam_deploy_graph.cli \
"${PYINSTALLER_EXTRA_ARGS[@]}" \
packaging/pyinstaller_entry.py
echo "==> 组装发布目录"
@ -96,7 +105,8 @@ PAM 部署 Agent 解压即用包
run-global 执行全局阶段token、版本、上传、发布、Node URL、下载任务。
run-deploy 执行完整部署流程:全局阶段 + 逐 IP 阶段。
resume 从 checkpoint 继续执行。
confirm 处理待人工确认事项,目前用于失败 IP 回滚确认。
rollback 显式回滚失败 IP不传 --ip 时使用当前失败 IP。
confirm 兼容旧 checkpoint 的人工确认命令,新流程通常不需要使用。
通用参数:
--config <路径>
@ -110,7 +120,7 @@ PAM 部署 Agent 解压即用包
hybrid_node_mcp PAM_HOME 走脚本PAM_NODE 走 MCP。
--checkpoint <路径>
checkpoint JSON 路径。用于断点续跑和人工确认恢复
checkpoint JSON 路径。用于断点续跑和显式回滚
示例runtime/checkpoints/demo.json
--target-ip <IP>
@ -119,7 +129,7 @@ PAM 部署 Agent 解压即用包
--mcp-config <路径>
MCP client JSON 配置文件。通常配置 server_url 和独立鉴权信息;
Agent 会从 server list_tools 自动发现 tools。hybrid_node_mcp 策略、
resume 或 confirm 需要执行 MCP action 时使用。
resume 或 rollback 需要执行 MCP action 时使用。
示例mcp_client.example.json
--confirm
@ -127,8 +137,9 @@ PAM 部署 Agent 解压即用包
chat 模式会在会话中要求输入 run并分别确认参数、目标范围和最终执行。
--analyze-actions
每个 action 完成后追加 LLM/规则诊断建议。诊断只作为辅助建议,
不会自动决定继续、回滚或修改参数。
每个 action 完成后的 LLM/规则审核默认都会执行;该参数只控制
是否把详细审核结果写入 events。审核建议停止时流程会暂停
resume 会重试当前 action。
LLM 参数:
--llm-base-url <URL>
@ -140,10 +151,25 @@ LLM 参数:
--llm-model <模型名>
LLM 模型名称。也可通过环境变量 PAM_LLM_MODEL 提供。
--llm-action-analysis-prompt-file <路径>
自定义 action 审核提示词文件。打包内置基线:
prompts/action_review.txt
LLM 环境变量:
PAM_LLM_BASE_URL
PAM_LLM_API_KEY
PAM_LLM_MODEL
PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE
日志环境变量:
PAM_AGENT_LOG_FILE
运行日志路径,默认 logs/pam_deploy_agent.log。
PAM_AGENT_LOG_LEVEL
日志级别,默认 INFO。排查 LLM/MCP 时可临时设为 DEBUG。
PAM_AGENT_LOG_RETENTION_DAYS
历史日切日志保留数量,默认 14。设为 0 时不自动清理历史切分文件。
示例:
./run.sh chat --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json
@ -154,8 +180,10 @@ LLM 环境变量:
./run.sh run-deploy --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json --confirm
./run.sh confirm --checkpoint runtime/checkpoints/demo.json --decision approve --confirm
# 如果进程中断或需要再次续跑:
# 失败或审核阻断暂停后,修复外部环境并从当前 action 重试:
./run.sh resume --checkpoint runtime/checkpoints/demo.json --confirm
# 需要回滚失败 IP 时显式执行:
./run.sh rollback --checkpoint runtime/checkpoints/demo.json --confirm
./run.sh resume --checkpoint runtime/checkpoints/demo.json --confirm
查看子命令原始参数:
@ -167,11 +195,14 @@ LLM 环境变量:
2. doc_scripts 只包含运行必需文件deploy.sh、config.txt.example、PAM_AUTO_DEPLY_SKILL.md。
3. prompts/action_review.txt 是当前默认 action 审核提示词基线,可复制后自行修改。
4. mcp_client.example.json 是 MCP server URL + 独立鉴权配置示例,需要按真实 MCP server 修改。
5. confirm 会通过 LangGraph interrupt resume 处理确认,并继续后续图节点;进程中断时再使用 resume
5. action 失败或审核阻断后会暂停;修复后用 resume 从当前 action 重试,需要回滚时用 rollback 显式执行
6. chat 会在执行前归一化并展示实际写入脚本配置的参数script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。
7. chat 执行过程中会播报每个 action 的开始、完成或失败;普通问候不会触发 LLM/结构化分析。
8. chat 内可使用 params、events、list checkpoints、load checkpoint、load params、llm config、mcp config 等命令。
9. checkpoint 会保存完整运行参数,请放在受控目录。
7. PARENT_VERSION_NUMBER 是云下载可选参数;空值不发送,非空时传给 parentVersionNumber。
8. chat 执行过程中会播报每个 action 的开始、完成或失败;非内置输入默认交给 LLM 普通对话,不会自动触发部署 workflow。
9. chat 普通对话优先流式展示;模型返回的 <think>...</think> 思考内容会被过滤,不展示也不写入日志。
10. chat 内可使用 ask、log analyze、action propose、action run、params、events、rollback、list checkpoints、load checkpoint、load params、llm config、llm test、mcp config 等命令。
11. 日志默认写入 logs/pam_deploy_agent.log按天切分并默认保留 14 个历史日切文件;日志会脱敏 token、secret、api_key、Authorization 等字段。
12. checkpoint 会保存完整运行参数,请放在受控目录。
HELP_TEXT
}

View File

@ -2,9 +2,14 @@
from __future__ import annotations
import logging
from .constants import ALLOWED_ACTIONS, HOME_ACTIONS, NODE_ACTIONS
from .logging_utils import json_for_log
from .models import AgentState, BackendName, ExecutionStrategy, ActionResult
logger = logging.getLogger(__name__)
def build_action_backends(strategy: ExecutionStrategy) -> dict[str, BackendName]:
"""根据执行策略生成每个 action 对应的后端类型。"""
@ -33,6 +38,13 @@ class ActionRouter:
backend = state.action_backends.get(action)
if not backend:
raise ValueError(f"action 未配置路由: {action}")
logger.info(
"ActionRouter 路由 action run_id=%s action=%s backend=%s kwargs=%s",
state.run_id,
action,
backend,
json_for_log(kwargs),
)
if backend == "script":
return self.script_runner.run(
action,
@ -48,6 +60,13 @@ class ActionRouter:
mcp_kwargs = dict(kwargs)
hash_code = mcp_kwargs.pop("hash_code", None) or state.hash_code
node_url = mcp_kwargs.pop("node_url", None) or state.node_url
logger.info(
"ActionRouter 调用 MCP action run_id=%s action=%s hash_code_present=%s node_url_present=%s",
state.run_id,
action,
bool(hash_code),
bool(node_url),
)
return self.mcp_runner.run(
action,
params=state.params,

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import argparse
import json
import logging
from dataclasses import asdict
from .agent import PamDeployAgent
@ -11,9 +12,12 @@ from .checkpoint_store import load_agent_state, redact_mapping
from .interactive import run_interactive_chat
from .langgraph_runtime import LangGraphDeploymentRuntime, LangGraphRunResult
from .llm import build_llm_client
from .logging_utils import configure_logging, json_for_log
from .mcp_factory import build_mcp_runner_from_config
from .params_loader import load_params_file
logger = logging.getLogger(__name__)
def add_llm_args(parser: argparse.ArgumentParser) -> None:
"""为子命令追加真实 LLM 配置参数。"""
@ -124,8 +128,23 @@ def main() -> None:
add_mcp_args(confirm)
add_action_analysis_arg(confirm)
rollback = sub.add_parser("rollback")
rollback.add_argument("--checkpoint", required=True)
rollback.add_argument("--ip", help="要回滚的 IP不传时使用当前失败 IP")
rollback.add_argument("--stop-first", dest="stop_first", action="store_true", default=None, help="回滚前先停机")
rollback.add_argument("--no-stop-first", dest="stop_first", action="store_false", default=None, help="回滚前不先停机")
rollback.add_argument("--note", default="")
rollback.add_argument("--confirm", action="store_true")
add_llm_args(rollback)
add_mcp_args(rollback)
add_action_analysis_arg(rollback)
args = parser.parse_args()
log_path = configure_logging()
logger.info("CLI 启动 command=%s args=%s log_path=%s", args.command, json_for_log(vars(args)), log_path)
params = load_params_file(args.config) if getattr(args, "config", None) else {}
if getattr(args, "config", None):
logger.info("参数文件已加载 command=%s config=%s params=%s", args.command, args.config, json_for_log(params))
llm_client = None
if args.command != "preview":
llm_client = build_llm_client(
@ -136,7 +155,9 @@ def main() -> None:
)
mcp_runner = None
if getattr(args, "mcp_config", None):
logger.info("开始加载 MCP 配置 path=%s", args.mcp_config)
mcp_runner = build_mcp_runner_from_config(args.mcp_config)
logger.info("MCP 配置加载完成 path=%s runner=%s", args.mcp_config, type(mcp_runner).__name__)
agent = PamDeployAgent(
llm_client=llm_client,
mcp_runner=mcp_runner,
@ -144,12 +165,15 @@ def main() -> None:
)
if args.command == "analyze":
logger.info("开始执行 analyze text_len=%s", len(args.text))
result = agent.analyze_request(args.text, params)
payload = redact_mapping({key: asdict(value) for key, value in result.items()})
logger.info("analyze 完成 result=%s", json_for_log(payload))
print(json.dumps(payload, ensure_ascii=False, indent=2))
return
if args.command == "chat":
logger.info("进入 chat 模式 strategy=%s checkpoint=%s target_ips=%s", args.strategy, args.checkpoint, args.target_ip)
run_interactive_chat(
agent=agent,
params=params,
@ -160,11 +184,13 @@ def main() -> None:
return
if args.command == "preview":
logger.info("执行 preview strategy=%s", args.strategy)
print(agent.preview(params, args.strategy))
return
require_confirm(args)
if args.command == "run-global":
logger.info("开始 run-global strategy=%s checkpoint=%s", args.strategy, args.checkpoint)
state = agent.create_state(
params=params,
execution_strategy=args.strategy,
@ -177,6 +203,7 @@ def main() -> None:
return
if args.command == "resume":
logger.info("开始 resume checkpoint=%s", args.checkpoint)
state = load_agent_state(args.checkpoint)
state.checkpoint_path = state.checkpoint_path or args.checkpoint
if state.paused:
@ -186,17 +213,29 @@ def main() -> None:
return
if args.command == "confirm":
logger.info("开始 confirm checkpoint=%s decision=%s note_len=%s", args.checkpoint, args.decision, len(args.note))
state = load_agent_state(args.checkpoint)
state.checkpoint_path = state.checkpoint_path or args.checkpoint
runtime = LangGraphDeploymentRuntime(agent=agent, flow="deploy")
first = runtime.start(state)
if first.interrupted:
result = runtime.resume(approved=args.decision == "approve", note=args.note)
print_graph_result(agent, result)
return
print_graph_result(agent, first)
if not state.pending_confirmation:
raise SystemExit("当前 checkpoint 没有待确认事项;新流程请使用 resume 重试,或 rollback 显式回滚。")
state = agent.confirm_pending(state, approved=args.decision == "approve", operator_note=args.note)
print(agent.render_report(state))
print_pause_payload(agent, state)
return
if args.command == "rollback":
logger.info("开始 rollback checkpoint=%s ip=%s stop_first=%s note_len=%s", args.checkpoint, args.ip, args.stop_first, len(args.note))
state = load_agent_state(args.checkpoint)
state.checkpoint_path = state.checkpoint_path or args.checkpoint
ip = args.ip or _find_current_failed_ip(state)
if not ip:
raise SystemExit("未找到当前失败 IP请传入 --ip。")
state = agent.rollback_ip(state, ip, stop_first=args.stop_first, operator_note=args.note)
print(agent.render_report(state))
print_pause_payload(agent, state)
return
logger.info("开始 run-deploy strategy=%s checkpoint=%s target_ips=%s", args.strategy, args.checkpoint, args.target_ip)
state = agent.create_state(
params=params,
execution_strategy=args.strategy,
@ -207,5 +246,16 @@ def main() -> None:
print_graph_result(agent, result)
def _find_current_failed_ip(state) -> str:
"""从 checkpoint state 中找一个适合显式回滚的失败 IP。"""
context_ip = str((state.review_context or {}).get("ip", ""))
if context_ip and context_ip in state.ip_states:
return context_ip
for ip, ip_state in state.ip_states.items():
if ip_state.get("status") == "FAILED":
return ip
return ""
if __name__ == "__main__":
main()

View File

@ -17,6 +17,12 @@ CONFIG_KEYS = (
"ACTION_TYPE",
"TIMEOUT",
"LOG_NAME",
"PARENT_VERSION_NUMBER",
"POLL_INTERVAL_SEC",
"DOWNLOAD_POLL_MAX_ATTEMPTS",
"UPGRADE_POLL_MAX_ATTEMPTS",
"VERIFY_INTERVAL_SEC",
"VERIFY_MAX_ATTEMPTS",
)

View File

@ -64,6 +64,12 @@ DEFAULT_PARAMS = {
"ACTION_TYPE": "FULL",
"TIMEOUT": 120,
"LOG_NAME": "app.log",
"PARENT_VERSION_NUMBER": "",
"POLL_INTERVAL_SEC": 2,
"DOWNLOAD_POLL_MAX_ATTEMPTS": 60,
"UPGRADE_POLL_MAX_ATTEMPTS": 600,
"VERIFY_INTERVAL_SEC": 10,
"VERIFY_MAX_ATTEMPTS": 12,
}
# 日志、报告和 LLM 输入中需要脱敏的字段。
@ -73,6 +79,12 @@ SENSITIVE_KEYS = {
"MCP_TOKEN",
"TOKEN",
"Authorization",
"authorization",
"access_token",
"ACCESS_TOKEN",
"api_key",
"API_KEY",
"PAM_LLM_API_KEY",
"password",
"PASSWORD",
}

View File

@ -43,6 +43,14 @@ class FakeActionRunner:
return {"ACTION": action, "NODE_URL": "https://fake-node.local"}
if action == "get-online-ips":
return {"ACTION": action, "COUNT": "2", "IP": ["192.168.1.10", "192.168.1.11"]}
if action == "poll-download-progress":
return {
"ACTION": action,
"STEP": "DONE",
"RATE_OF_PROGRESS": "100",
"MSG": "success",
"MESSAGE": "success",
}
if action == "upgrade-ip":
return {"ACTION": action, "IP": kwargs.get("ip", ""), "RESULT": "TASK_CREATED"}
if action == "poll-upgrade-progress":
@ -51,6 +59,7 @@ class FakeActionRunner:
"IP": kwargs.get("ip", ""),
"STEP": "DONE",
"RATE_OF_PROGRESS": "100",
"MSG": "success",
"MESSAGE": "success",
}
if action == "start-ip":

View File

@ -13,7 +13,7 @@ def build_langgraph(agent: PamDeployAgent | None = None, flow: GraphFlow = "depl
输入 state 支持直接传 `params`图内会先调用 `create_state`CLI/chat
默认使用 `LangGraphDeploymentRuntime` runtime 直接接收 `AgentState`
支持 interrupt/checkpointer
由业务 checkpoint 支撑断点续跑
"""
try:
from langgraph.graph import END, START, StateGraph
@ -29,17 +29,12 @@ def build_langgraph(agent: PamDeployAgent | None = None, flow: GraphFlow = "depl
agent_state = runtime.create_state(
params=state["params"],
execution_strategy=state.get("execution_strategy", "hybrid_node_mcp"),
execution_mode=state.get("execution_mode", "fixed_runtime"),
run_id=state.get("run_id"),
script_entry=state.get("script_entry"),
config_path=state.get("config_path"),
trace_file_path=state.get("trace_file_path"),
checkpoint_path=state.get("checkpoint_path"),
target_ips=state.get("target_ips"),
planned_actions=state.get("planned_actions"),
mode_reason=state.get("mode_reason", ""),
mode_risk_level=state.get("mode_risk_level", "medium"),
mode_requires_confirmation=state.get("mode_requires_confirmation", True),
)
return {"agent_state": agent_state}

File diff suppressed because it is too large Load Diff

View File

@ -1,15 +1,18 @@
"""chat 人工确认点的 LangGraph interrupt 运行器。"""
"""PAM 部署 Agent 的 action 级 LangGraph 运行器。"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Literal
from uuid import uuid4
from .agent import PamDeployAgent
from .logging_utils import json_for_log
from .models import AgentState
GraphFlow = Literal["global", "deploy"]
logger = logging.getLogger(__name__)
@dataclass(slots=True)
@ -24,7 +27,7 @@ class LangGraphRunResult:
class LangGraphDeploymentRuntime:
"""用 LangGraph 节点调度部署 action,并托管人工确认 interrupt"""
"""用 LangGraph 节点调度部署 action"""
def __init__(
self,
@ -39,39 +42,56 @@ class LangGraphDeploymentRuntime:
self.flow = flow
self._waiting_confirmation = False
self._graph = build_deployment_graph(agent=self.agent, flow=self.flow)
logger.info(
"LangGraph runtime 初始化 thread_id=%s flow=%s agent=%s",
self.thread_id,
self.flow,
type(self.agent).__name__,
)
@property
def waiting_confirmation(self) -> bool:
"""返回当前 LangGraph 会话是否停在 interrupt 确认点。"""
"""返回当前 LangGraph 会话是否停在旧版 interrupt 确认点。"""
return self._waiting_confirmation
def start(self, state: AgentState) -> LangGraphRunResult:
"""从给定 AgentState 开始执行,直到结束或遇到人工确认点"""
"""从给定 AgentState 开始执行,直到结束或业务状态暂停"""
self._waiting_confirmation = False
logger.info(
"LangGraph start run_id=%s thread_id=%s flow=%s paused=%s pending=%s",
state.run_id,
self.thread_id,
self.flow,
state.paused,
state.pending_confirmation,
)
return self._consume(self._graph.stream({"agent_state": state}, self._config()))
def resume(self, *, approved: bool, note: str = "") -> LangGraphRunResult:
"""把人工确认结果交回 LangGraph并继续执行。"""
"""兼容旧版 LangGraph interrupt 确认恢复;新流程通常不使用"""
try:
from langgraph.types import Command
except ImportError as exc: # pragma: no cover - 依赖缺失时由调用方降级
raise RuntimeError("未安装 langgraph无法恢复 interrupt。") from exc
decision = {"approved": approved, "note": note}
logger.info("LangGraph resume thread_id=%s decision=%s note_len=%s", self.thread_id, approved, len(note))
return self._consume(self._graph.stream(Command(resume=decision), self._config()))
def _config(self) -> dict[str, Any]:
"""生成 LangGraph checkpointer 使用的线程配置。"""
return {"configurable": {"thread_id": self.thread_id}}
return {"configurable": {"thread_id": self.thread_id}, "recursion_limit": 10000}
def _consume(self, chunks: Any) -> LangGraphRunResult:
"""消费 LangGraph stream 输出,提取状态、报告和 interrupt 请求。"""
"""消费 LangGraph stream 输出,提取状态、报告和旧版 interrupt 请求。"""
result = LangGraphRunResult()
for chunk in chunks:
result.chunks.append(chunk)
logger.info("LangGraph chunk=%s", json_for_log(chunk, max_text_len=1600))
if "__interrupt__" in chunk:
result.interrupted = True
result.confirmation = _extract_interrupt_value(chunk["__interrupt__"])
logger.info("LangGraph interrupt thread_id=%s confirmation=%s", self.thread_id, json_for_log(result.confirmation))
continue
for value in chunk.values():
@ -83,20 +103,30 @@ class LangGraphDeploymentRuntime:
result.report = value["report"]
self._waiting_confirmation = result.interrupted
logger.info(
"LangGraph consume 完成 thread_id=%s interrupted=%s waiting=%s state_run_id=%s report_len=%s",
self.thread_id,
result.interrupted,
self._waiting_confirmation,
result.state.run_id if result.state else "",
len(result.report),
)
return result
def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy"):
"""构建 action 级别的 LangGraph 部署图。"""
logger.info("开始构建 LangGraph 部署图 flow=%s", flow)
try:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import interrupt
except ImportError as exc: # pragma: no cover - 依赖缺失时由调用方降级
raise RuntimeError("未安装 langgraph无法启用部署图。") from exc
def entry_node(state: dict[str, Any]) -> dict[str, Any]:
"""保留入口节点,便于统一路由已有 state 或恢复 state。"""
agent_state = state["agent_state"]
logger.info("LangGraph entry_node run_id=%s pending=%s paused=%s", agent_state.run_id, agent_state.pending_confirmation, agent_state.paused)
return {"agent_state": state["agent_state"]}
def global_action_node(state: dict[str, Any]) -> dict[str, Any]:
@ -104,6 +134,7 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy")
agent_state = state["agent_state"]
action = agent.next_global_action(agent_state)
if action:
logger.info("LangGraph global_action_node run_id=%s action=%s", agent_state.run_id, action)
agent.run_global_action(agent_state, action)
return {"agent_state": agent_state}
@ -112,8 +143,10 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy")
agent_state = state["agent_state"]
work = agent.next_ip_action(agent_state)
if work is None:
logger.info("LangGraph prepare_ip_node 无待执行 IP action run_id=%s", agent_state.run_id)
return {"agent_state": agent_state, "current_ip": "", "current_ip_action": ""}
ip, action = work
logger.info("LangGraph prepare_ip_node run_id=%s ip=%s action=%s", agent_state.run_id, ip, action)
return {"agent_state": agent_state, "current_ip": ip, "current_ip_action": action}
def ip_action_node(state: dict[str, Any]) -> dict[str, Any]:
@ -122,56 +155,56 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy")
ip = str(state.get("current_ip", ""))
action = str(state.get("current_ip_action", ""))
if ip and action:
logger.info("LangGraph ip_action_node run_id=%s ip=%s action=%s", agent_state.run_id, ip, action)
agent.run_ip_action(agent_state, ip, action)
return {"agent_state": agent_state, "current_ip": "", "current_ip_action": ""}
def confirm_node(state: dict[str, Any]) -> dict[str, Any]:
"""把确认请求交给 LangGraph interrupt并在恢复后执行确认动作。"""
agent_state = state["agent_state"]
request = agent.build_confirmation_request(agent_state)
decision = interrupt(request)
approved, note = _parse_confirmation_decision(decision)
agent_state = agent.confirm_pending(
agent_state,
approved=approved,
operator_note=note,
)
return {"agent_state": agent_state}
def report_node(state: dict[str, Any]) -> dict[str, Any]:
"""渲染当前状态报告。"""
agent_state = state["agent_state"]
logger.info("LangGraph report_node run_id=%s pending=%s paused=%s", agent_state.run_id, agent_state.pending_confirmation, agent_state.paused)
return {
"agent_state": state["agent_state"],
"report": agent.render_report(state["agent_state"]),
}
def route_entry(state: dict[str, Any]) -> str:
"""从入口决定进入全局、IP、确认或报告节点。"""
"""从入口决定进入全局、IP 或报告节点。"""
agent_state = state["agent_state"]
if agent_state.pending_confirmation:
return "confirm"
logger.info("LangGraph route_entry -> report legacy_pending run_id=%s", agent_state.run_id)
return "report"
if agent.next_global_action(agent_state):
logger.info("LangGraph route_entry -> global_action run_id=%s", agent_state.run_id)
return "global_action"
if flow == "global":
logger.info("LangGraph route_entry -> report run_id=%s", agent_state.run_id)
return "report"
logger.info("LangGraph route_entry -> prepare_ip run_id=%s", agent_state.run_id)
return "prepare_ip"
def route_after_global(state: dict[str, Any]) -> str:
"""全局 action 后继续全局循环或进入 IP 阶段。"""
agent_state = state["agent_state"]
if agent.next_global_action(agent_state):
logger.info("LangGraph route_after_global -> global_action run_id=%s", agent_state.run_id)
return "global_action"
if flow == "global":
logger.info("LangGraph route_after_global -> report run_id=%s", agent_state.run_id)
return "report"
logger.info("LangGraph route_after_global -> prepare_ip run_id=%s", agent_state.run_id)
return "prepare_ip"
def route_after_prepare_ip(state: dict[str, Any]) -> str:
"""IP 准备节点后进入确认、单 IP action 或报告。"""
"""IP 准备节点后进入单 IP action 或报告。"""
agent_state = state["agent_state"]
if agent_state.pending_confirmation:
return "confirm"
logger.info("LangGraph route_after_prepare_ip -> report legacy_pending run_id=%s", agent_state.run_id)
return "report"
if state.get("current_ip_action"):
logger.info("LangGraph route_after_prepare_ip -> ip_action run_id=%s ip=%s action=%s", agent_state.run_id, state.get("current_ip"), state.get("current_ip_action"))
return "ip_action"
logger.info("LangGraph route_after_prepare_ip -> report run_id=%s", agent_state.run_id)
return "report"
graph = StateGraph(dict)
@ -179,7 +212,6 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy")
graph.add_node("global_action", global_action_node)
graph.add_node("prepare_ip", prepare_ip_node)
graph.add_node("ip_action", ip_action_node)
graph.add_node("confirm", confirm_node)
graph.add_node("report", report_node)
graph.add_edge(START, "entry")
@ -187,7 +219,6 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy")
"entry",
route_entry,
{
"confirm": "confirm",
"global_action": "global_action",
"prepare_ip": "prepare_ip",
"report": "report",
@ -205,12 +236,13 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy")
graph.add_conditional_edges(
"prepare_ip",
route_after_prepare_ip,
{"confirm": "confirm", "ip_action": "ip_action", "report": "report"},
{"ip_action": "ip_action", "report": "report"},
)
graph.add_edge("ip_action", "prepare_ip")
graph.add_edge("confirm", "entry")
graph.add_edge("report", END)
return graph.compile(checkpointer=InMemorySaver())
compiled = graph.compile(checkpointer=InMemorySaver())
logger.info("LangGraph 部署图构建完成 flow=%s", flow)
return compiled
def _extract_interrupt_value(interrupts: Any) -> dict[str, Any]:

View File

@ -4,7 +4,7 @@ from .base import LlmClient
from .factory import build_llm_client
from .openai_compatible import OpenAICompatibleLlmClient
from .rule_based import RuleBasedLlmClient
from .validators import validate_deploy_plan, validate_intent_result, validate_mode_decision
from .validators import validate_deploy_plan, validate_intent_result
__all__ = [
"LlmClient",
@ -13,5 +13,4 @@ __all__ = [
"build_llm_client",
"validate_deploy_plan",
"validate_intent_result",
"validate_mode_decision",
]

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from collections.abc import Iterable
from typing import Any, Protocol
from pam_deploy_graph.models import (
@ -10,8 +11,8 @@ from pam_deploy_graph.models import (
LlmActionAnalysis,
LlmDeployPlan,
LlmIntentResult,
LlmModeDecision,
LlmParamResult,
LlmSingleActionProposal,
)
@ -32,31 +33,37 @@ class LlmClient(Protocol):
params: dict[str, Any],
intent: str,
strategy: ExecutionStrategy,
skill_policy: dict[str, Any],
tool_summaries: list[dict[str, Any]],
) -> LlmDeployPlan:
"""根据参数和意图生成部署计划。"""
...
def decide_execution_mode(
self,
*,
text: str,
params: dict[str, Any],
intent: str,
strategy: ExecutionStrategy,
allowed_modes: list[str],
tool_summaries: list[dict[str, Any]],
) -> LlmModeDecision:
"""决定进入固定 runtime 还是 agentic skill 模式。"""
...
def analyze_action_result(
self,
*,
action: str,
result: ActionResult,
state_summary: dict[str, Any],
) -> LlmActionAnalysis:
"""分析 action 执行结果,并给出是否允许继续执行的建议。"""
...
def chat(self, text: str, context: dict[str, Any] | None = None) -> str:
"""进行普通自然语言对话,不触发部署 workflow。"""
...
def chat_stream(self, text: str, context: dict[str, Any] | None = None) -> Iterable[str]:
"""流式进行普通自然语言对话,不触发部署 workflow。"""
...
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""分析日志文本并给出异常摘要、原因和建议。"""
...
def propose_action(
self,
text: str,
allowed_actions: list[str],
params: dict[str, Any],
state_summary: dict[str, Any] | None = None,
) -> LlmSingleActionProposal:
"""把自然语言解析为单次 action 调用建议。"""
...

View File

@ -3,11 +3,15 @@
from __future__ import annotations
import os
import logging
from pam_deploy_graph.logging_utils import json_for_log
from .base import LlmClient
from .openai_compatible import OpenAICompatibleLlmClient, load_prompt_text
from .rule_based import RuleBasedLlmClient
logger = logging.getLogger(__name__)
def build_llm_client(
*,
@ -25,8 +29,24 @@ def build_llm_client(
if action_analysis_prompt_path is not None
else os.getenv("PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE", "")
)
logger.info(
"构建 LLM client base_url=%s model=%s has_api_key=%s action_prompt_path=%s explicit=%s",
actual_base_url,
actual_model,
bool(actual_api_key),
actual_action_prompt_path,
json_for_log(
{
"base_url": base_url,
"api_key": api_key,
"model": model,
"action_analysis_prompt_path": action_analysis_prompt_path,
}
),
)
if not actual_base_url and not actual_api_key and not actual_model:
logger.info("未配置真实 LLM使用 RuleBasedLlmClient fallback")
return RuleBasedLlmClient()
missing = []
@ -35,11 +55,14 @@ def build_llm_client(
if not actual_model:
missing.append("model")
if missing:
logger.info("LLM 配置不完整 missing=%s", missing)
raise ValueError(f"LLM 配置不完整,缺少: {', '.join(missing)}")
return OpenAICompatibleLlmClient(
client = OpenAICompatibleLlmClient(
base_url=actual_base_url,
api_key=actual_api_key,
model=actual_model,
action_analysis_prompt=load_prompt_text(actual_action_prompt_path),
)
logger.info("真实 LLM client 构建完成 client=%s model=%s has_api_key=%s", type(client).__name__, actual_model, bool(actual_api_key))
return client

View File

@ -7,9 +7,11 @@
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
import urllib.request
from collections.abc import Callable
from collections.abc import Callable, Iterable, Iterator
from typing import Any
from pam_deploy_graph.constants import (
@ -20,12 +22,25 @@ from pam_deploy_graph.constants import (
REQUIRED_PARAMS,
SENSITIVE_KEYS,
)
from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmModeDecision, LlmParamResult
from pam_deploy_graph.logging_utils import json_for_log, redact_for_log
from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult, LlmSingleActionProposal
from pam_deploy_graph.models import ActionResult, LlmActionAnalysis
from .prompts import ACTION_ANALYSIS_PROMPT, INTENT_PROMPT, MODE_PROMPT, PARAM_PROMPT, PLAN_PROMPT, SYSTEM_PROMPT
from .prompts import (
ACTION_ANALYSIS_PROMPT,
CHAT_PROMPT,
INTENT_PROMPT,
LOG_ANALYSIS_PROMPT,
PARAM_PROMPT,
PLAN_PROMPT,
SINGLE_ACTION_PROMPT,
SYSTEM_PROMPT,
)
from .text_filter import filter_thinking_chunks, strip_thinking_text
JsonTransport = Callable[[str, dict[str, str], dict[str, Any], float], dict[str, Any]]
StreamTransport = Callable[[str, dict[str, str], dict[str, Any], float], Iterable[str]]
logger = logging.getLogger(__name__)
class OpenAICompatibleLlmClient:
@ -41,6 +56,7 @@ class OpenAICompatibleLlmClient:
timeout_sec: float = 30,
temperature: float = 0,
transport: JsonTransport | None = None,
stream_transport: StreamTransport | None = None,
) -> None:
"""保存连接参数、模型参数和可替换的 HTTP transport。"""
if not base_url:
@ -54,10 +70,22 @@ class OpenAICompatibleLlmClient:
self.timeout_sec = timeout_sec
self.temperature = temperature
self.transport = transport or _default_transport
self.stream_transport = stream_transport or _default_stream_transport
logger.info(
"OpenAI-compatible LLM client 初始化 base_url=%s endpoint=%s model=%s has_api_key=%s timeout=%s temperature=%s custom_transport=%s custom_stream_transport=%s",
self.base_url,
_chat_completions_url(self.base_url),
self.model,
bool(self.api_key),
self.timeout_sec,
self.temperature,
transport is not None,
stream_transport is not None,
)
def understand_request(self, text: str) -> LlmIntentResult:
"""调用 LLM 识别用户意图。"""
payload = self._complete_json(INTENT_PROMPT, {"user_text": text})
payload = self._complete_json("understand_request", INTENT_PROMPT, {"user_text": text})
return LlmIntentResult(
intent=_string(payload, "intent", "deploy"), # type: ignore[arg-type]
mode_preference=_string(payload, "mode_preference", "未指定"), # type: ignore[arg-type]
@ -73,6 +101,7 @@ class OpenAICompatibleLlmClient:
original_base = dict(base_params or {})
safe_base = _redact_sensitive(original_base)
payload = self._complete_json(
"extract_params",
PARAM_PROMPT,
{
"user_text": text,
@ -107,18 +136,15 @@ class OpenAICompatibleLlmClient:
params: dict[str, Any],
intent: str,
strategy: ExecutionStrategy,
skill_policy: dict[str, Any],
tool_summaries: list[dict[str, Any]],
) -> LlmDeployPlan:
"""调用 LLM 生成部署计划。"""
payload = self._complete_json(
"generate_plan",
PLAN_PROMPT,
{
"params": _redact_sensitive(params),
"intent": intent,
"execution_strategy": strategy,
"skill_policy": skill_policy,
"tool_summaries": tool_summaries,
"allowed_actions": list(ALLOWED_ACTIONS),
"global_action_sequence": list(GLOBAL_ACTION_SEQUENCE),
"ip_action_sequence": list(IP_ACTION_SEQUENCE),
@ -133,57 +159,19 @@ class OpenAICompatibleLlmClient:
execution_strategy=_string(payload, "execution_strategy", strategy), # type: ignore[arg-type]
)
def decide_execution_mode(
self,
*,
text: str,
params: dict[str, Any],
intent: str,
strategy: ExecutionStrategy,
allowed_modes: list[str],
tool_summaries: list[dict[str, Any]],
) -> LlmModeDecision:
"""调用 LLM 决定本次任务进入固定 runtime 或 agentic skill。"""
payload = self._complete_json(
MODE_PROMPT,
{
"user_text": text,
"params": _redact_sensitive(params),
"intent": intent,
"execution_strategy": strategy,
"allowed_modes": allowed_modes,
"tool_summaries": tool_summaries,
},
)
return LlmModeDecision(
mode=_string(payload, "mode", "fixed_runtime"), # type: ignore[arg-type]
reason=_string(payload, "reason", ""),
risk_level=_string(payload, "risk_level", "medium"), # type: ignore[arg-type]
requires_confirmation=bool(payload.get("requires_confirmation", True)),
)
def analyze_action_result(
self,
*,
action: str,
result: ActionResult,
state_summary: dict[str, Any],
) -> LlmActionAnalysis:
"""调用 LLM 分析 action 结果,返回结构化诊断建议。"""
payload = self._complete_json(
"analyze_action_result",
self.action_analysis_prompt,
{
"action": action,
"result": {
"backend": result.backend,
"ok": result.ok,
"exit_code": result.exit_code,
"tool_name": result.tool_name,
"values": _redact_sensitive(result.values),
"stderr": _truncate_text(result.stderr),
"error_summary": result.error_summary,
},
"state_summary": _redact_sensitive(state_summary),
"result": _action_review_result_payload(action, result),
},
)
return LlmActionAnalysis(
@ -194,11 +182,78 @@ class OpenAICompatibleLlmClient:
suggested_action=_string(payload, "suggested_action", ""),
requires_confirmation=bool(payload.get("requires_confirmation", False)),
should_continue=bool(payload.get("should_continue", True)),
progress_complete=_optional_bool(payload.get("progress_complete")),
notes=_string_list(payload.get("notes")),
)
def _complete_json(self, instruction: str, input_payload: dict[str, Any]) -> dict[str, Any]:
def chat(self, text: str, context: dict[str, Any] | None = None) -> str:
"""调用 LLM 做普通对话,不要求 JSON 响应。"""
return self._complete_text(
"chat",
CHAT_PROMPT,
{
"user_text": text,
"context": _redact_sensitive(context or {}),
},
)
def chat_stream(self, text: str, context: dict[str, Any] | None = None) -> Iterable[str]:
"""调用 LLM 做普通流式对话,不要求 JSON 响应。"""
return self._complete_text_stream(
"chat",
CHAT_PROMPT,
{
"user_text": text,
"context": _redact_sensitive(context or {}),
},
)
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""调用 LLM 分析日志尾部摘要。"""
return self._complete_text(
"analyze_log",
LOG_ANALYSIS_PROMPT,
{
"source_path": source_path,
"question": question or "请分析日志中的异常、可能原因和下一步建议。",
"log_tail": redact_for_log(log_text, max_text_len=64000),
},
)
def propose_action(
self,
text: str,
allowed_actions: list[str],
params: dict[str, Any],
state_summary: dict[str, Any] | None = None,
) -> LlmSingleActionProposal:
"""调用 LLM 把自然语言解析为单 action 调用建议。"""
payload = self._complete_json(
"propose_action",
SINGLE_ACTION_PROMPT,
{
"user_text": text,
"allowed_actions": allowed_actions,
"params": _redact_sensitive(params),
"state_summary": _redact_sensitive(state_summary or {}),
},
)
action = _string(payload, "action", "")
if action not in allowed_actions:
action = ""
return LlmSingleActionProposal(
action=action,
ip=_string(payload, "ip", ""),
kwargs=_dict(payload.get("kwargs")),
reason=_string(payload, "reason", ""),
risk_level=_risk_level(payload.get("risk_level")),
requires_confirmation=True,
)
def _complete_json(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> dict[str, Any]:
"""发送 chat/completions 请求,并解析 JSON 对象响应。"""
started_at = time.perf_counter()
endpoint = _chat_completions_url(self.base_url)
request_payload = {
"model": self.model,
"temperature": self.temperature,
@ -216,18 +271,145 @@ class OpenAICompatibleLlmClient:
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
logger.info(
"LLM 请求开始 operation=%s endpoint=%s model=%s timeout=%s has_api_key=%s input=%s",
operation,
endpoint,
self.model,
self.timeout_sec,
bool(self.api_key),
json_for_log(input_payload, max_text_len=1600),
)
try:
response = self.transport(
_chat_completions_url(self.base_url),
endpoint,
headers,
request_payload,
self.timeout_sec,
)
content = _message_content(response)
logger.info(
"LLM 原始响应 operation=%s duration_ms=%s content=%s",
operation,
int((time.perf_counter() - started_at) * 1000),
redact_for_log(content, max_text_len=1600),
)
parsed = _loads_json_object(content)
if not isinstance(parsed, dict):
raise ValueError("LLM 响应必须是 JSON object")
except Exception:
logger.exception(
"LLM 请求失败 operation=%s endpoint=%s duration_ms=%s input=%s",
operation,
endpoint,
int((time.perf_counter() - started_at) * 1000),
json_for_log(input_payload, max_text_len=1600),
)
raise
logger.info(
"LLM 请求完成 operation=%s duration_ms=%s response_keys=%s response=%s",
operation,
int((time.perf_counter() - started_at) * 1000),
sorted(parsed.keys()),
json_for_log(parsed, max_text_len=1600),
)
return parsed
def _complete_text(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> str:
"""发送 chat/completions 请求,并返回普通文本响应。"""
started_at = time.perf_counter()
endpoint = _chat_completions_url(self.base_url)
request_payload = {
"model": self.model,
"temperature": self.temperature,
"messages": [
{"role": "system", "content": instruction},
{
"role": "user",
"content": "输入 JSON:\n" + json.dumps(input_payload, ensure_ascii=False, sort_keys=True),
},
],
}
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
logger.info(
"LLM 文本请求开始 operation=%s endpoint=%s model=%s timeout=%s has_api_key=%s input=%s",
operation,
endpoint,
self.model,
self.timeout_sec,
bool(self.api_key),
json_for_log(input_payload, max_text_len=1600),
)
try:
response = self.transport(endpoint, headers, request_payload, self.timeout_sec)
content = strip_thinking_text(str(_message_content(response)))
except Exception:
logger.exception(
"LLM 文本请求失败 operation=%s endpoint=%s duration_ms=%s input=%s",
operation,
endpoint,
int((time.perf_counter() - started_at) * 1000),
json_for_log(input_payload, max_text_len=1600),
)
raise
logger.info(
"LLM 文本请求完成 operation=%s duration_ms=%s content=%s",
operation,
int((time.perf_counter() - started_at) * 1000),
redact_for_log(content, max_text_len=1600),
)
return content
def _complete_text_stream(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> Iterable[str]:
"""发送 stream chat/completions 请求,并返回过滤后的普通文本分片。"""
started_at = time.perf_counter()
endpoint = _chat_completions_url(self.base_url)
request_payload = {
"model": self.model,
"temperature": self.temperature,
"stream": True,
"messages": [
{"role": "system", "content": instruction},
{
"role": "user",
"content": "输入 JSON:\n" + json.dumps(input_payload, ensure_ascii=False, sort_keys=True),
},
],
}
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
logger.info(
"LLM 流式文本请求开始 operation=%s endpoint=%s model=%s timeout=%s has_api_key=%s input=%s",
operation,
endpoint,
self.model,
self.timeout_sec,
bool(self.api_key),
json_for_log(input_payload, max_text_len=1600),
)
try:
raw_chunks = self.stream_transport(endpoint, headers, request_payload, self.timeout_sec)
for chunk in filter_thinking_chunks(raw_chunks):
if chunk:
yield chunk
except Exception:
logger.exception(
"LLM 流式文本请求失败 operation=%s endpoint=%s duration_ms=%s input=%s",
operation,
endpoint,
int((time.perf_counter() - started_at) * 1000),
json_for_log(input_payload, max_text_len=1600),
)
raise
logger.info(
"LLM 流式文本请求完成 operation=%s duration_ms=%s",
operation,
int((time.perf_counter() - started_at) * 1000),
)
def _default_transport(
url: str,
@ -250,6 +432,41 @@ def _default_transport(
return decoded
def _default_stream_transport(
url: str,
headers: dict[str, str],
payload: dict[str, Any],
timeout_sec: float,
) -> Iterator[str]:
"""使用标准库 urllib 发送 OpenAI-compatible SSE 流式请求。"""
request = urllib.request.Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout_sec) as response:
for raw_line in response:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line or line.startswith(":"):
continue
if line.startswith("event:") or line.startswith("id:"):
continue
if not line.startswith("data:"):
raise ValueError("LLM 流式响应不是 SSE data 格式")
data = line[len("data:") :].strip()
if data == "[DONE]":
break
try:
decoded = json.loads(data)
except json.JSONDecodeError:
logger.debug("忽略无法解析的 LLM stream data: %s", redact_for_log(data, max_text_len=300))
continue
chunk = _stream_delta_content(decoded)
if chunk:
yield chunk
def load_prompt_text(path: str | None) -> str:
"""读取自定义提示词文件。"""
if not path:
@ -283,6 +500,35 @@ def _message_content(response: dict[str, Any]) -> Any:
return content
def _stream_delta_content(response: dict[str, Any]) -> str:
"""从 OpenAI-compatible stream chunk 中提取 delta.content。"""
try:
choice = response["choices"][0]
except (KeyError, IndexError, TypeError):
return ""
delta = choice.get("delta") if isinstance(choice, dict) else None
if isinstance(delta, dict) and "content" in delta:
return str(_content_parts_to_text(delta.get("content")))
message = choice.get("message") if isinstance(choice, dict) else None
if isinstance(message, dict) and "content" in message:
return str(_content_parts_to_text(message.get("content")))
text = choice.get("text") if isinstance(choice, dict) else None
return str(text) if text is not None else ""
def _content_parts_to_text(content: Any) -> str:
"""把 OpenAI content parts 或字符串转换为纯文本。"""
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(str(item.get("text", "")))
elif isinstance(item, str):
parts.append(item)
return "".join(parts)
return "" if content is None else str(content)
def _loads_json_object(content: Any) -> Any:
"""把 message.content 解析为 JSON 对象。"""
if isinstance(content, dict):
@ -307,6 +553,45 @@ def _redact_sensitive(value: Any) -> Any:
return value
def _action_review_result_payload(action: str, result: ActionResult) -> dict[str, Any]:
"""构造 action 审核输入,避免把正常脚本日志当作错误喂给 LLM。"""
payload: dict[str, Any] = {
"backend": result.backend,
"ok": result.ok,
"exit_code": result.exit_code,
"tool_name": result.tool_name,
"values": _redact_sensitive(result.values),
"error_summary": result.error_summary,
}
if _needs_diagnostic_log(action, result):
diagnostic = _diagnostic_log_text(result)
if diagnostic:
payload["diagnostic_log"] = diagnostic
return payload
def _needs_diagnostic_log(action: str, result: ActionResult) -> bool:
"""仅在失败或业务异常时把少量诊断日志交给 LLM。"""
if not result.ok or result.error_summary or result.values.get("PENDING_AGENT_CONFIRMATION"):
return True
if action == "verify-ip":
success = result.values.get("SUCCESS")
if success is not None and str(success).lower() not in ("true", "1", "yes"):
return True
return False
def _diagnostic_log_text(result: ActionResult) -> str:
"""优先使用错误摘要;必要时取 stderr/stdout/raw_output 的尾部作为诊断上下文。"""
if result.error_summary:
return _truncate_text(result.error_summary)
for text in (result.stderr, result.stdout, result.raw_output):
stripped = text.strip()
if stripped:
return _tail_text(stripped)
return ""
def _truncate_text(value: str, limit: int = 1000) -> str:
"""截断发送给 LLM 的长文本,避免传入完整日志。"""
if len(value) <= limit:
@ -314,6 +599,14 @@ def _truncate_text(value: str, limit: int = 1000) -> str:
return value[:limit] + "...[已截断]"
def _tail_text(value: str, limit: int = 1000) -> str:
"""保留长诊断日志尾部,通常错误原因更靠近末尾。"""
if len(value) <= limit:
return value
marker = "[已截断]..."
return marker + value[-(limit - len(marker)) :]
def _string(payload: dict[str, Any], key: str, default: str) -> str:
"""安全读取字符串字段。"""
value = payload.get(key, default)
@ -328,6 +621,31 @@ def _float(payload: dict[str, Any], key: str, default: float) -> float:
return default
def _optional_bool(value: Any) -> bool | None:
"""解析可选布尔值,字段缺失时保留 None。"""
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in ("", "null", "none"):
return None
if lowered in ("true", "1", "yes", "y"):
return True
if lowered in ("false", "0", "no", "n"):
return False
return bool(value)
def _risk_level(value: Any) -> str:
"""解析单 action 风险等级,非法值降级为 medium。"""
text = str(value or "").strip().lower()
if text in ("low", "medium", "high"):
return text
return "medium"
def _dict(value: Any) -> dict[str, Any]:
"""确保返回 dict非法值降级为空 dict。"""
return value if isinstance(value, dict) else {}

View File

@ -7,7 +7,7 @@ SYSTEM_PROMPT = """你是 PAM 智能部署 Agent 的结构化理解与规划组
- 不生成 shellPowerShellbatcurl 等可执行命令
- 不回显密钥tokenCLIENT_SECRETAuthorization 等敏感值
- 只能在允许的 action 集合中选择部署动作
- 真实执行前必须保留人工确认点参数确认目标 IP 范围确认失败回滚确认
- 真实执行前必须保留人工确认点参数确认目标 IP 范围确认失败后应暂停修复后 resume 重试回滚只能由用户显式触发
"""
INTENT_PROMPT = """根据用户输入识别意图和执行偏好。
@ -24,23 +24,6 @@ INTENT_PROMPT = """根据用户输入识别意图和执行偏好。
}
"""
MODE_PROMPT = """根据用户输入、参数、执行策略和可用工具,决定本次任务应进入固定 runtime 还是 agentic skill 模式。
输出 JSON schema
{
"mode": "fixed_runtime|agentic_skill",
"reason": "...",
"risk_level": "low|medium|high",
"requires_confirmation": true
}
决策原则
- 标准部署批量升级回滚高风险变更默认优先 fixed_runtime
- 诊断类探索类半结构化任务或用户明确要求 skill 自主编排/自主调用工具可选 agentic_skill
- 不能因为用户提到 MCP 就自动选择 agentic_skillMCP 也可以作为 fixed_runtime 的后端
- 只有在允许模式集合中选择 mode
"""
PARAM_PROMPT = """从用户输入中抽取 PAM 部署参数和控制信息。
输出 JSON schema
@ -55,7 +38,13 @@ PARAM_PROMPT = """从用户输入中抽取 PAM 部署参数和控制信息。
"ZIP_FILE_PATH": "...",
"ACTION_TYPE": "...",
"TIMEOUT": "...",
"LOG_NAME": "..."
"LOG_NAME": "...",
"PARENT_VERSION_NUMBER": "...",
"POLL_INTERVAL_SEC": "...",
"DOWNLOAD_POLL_MAX_ATTEMPTS": "...",
"UPGRADE_POLL_MAX_ATTEMPTS": "...",
"VERIFY_INTERVAL_SEC": "...",
"VERIFY_MAX_ATTEMPTS": "..."
},
"extracted_control": {
"user_specified_ips": ["..."]
@ -94,11 +83,64 @@ ACTION_ANALYSIS_PROMPT = """分析一次 PAM action 执行结果。
"suggested_action": "...",
"requires_confirmation": false,
"should_continue": true,
"progress_complete": null,
"notes": ["..."]
}
要求
- 必须明确给出 `should_continue`没有问题时为 true存在需要人工判断的问题时为 false
- 如果 exit_code 0ok=falseverify-ip SUCCESS=false出现 pending_confirmation应标记异常
- 如果 exit_code 0ok=falseverify-ip SUCCESS=false出现 legacy pending_confirmation应标记异常
- `poll-download-progress``poll-upgrade-progress` 必须判断 `progress_complete`已完成为 true未完成但正常为 false非进度 action 可为 null
- 进度 action 未完成但正常时`has_anomaly=false``should_continue=true``progress_complete=false`建议继续查询进度
- 进度 action 完成条件优先看 `STEP=DONE``STATUS=completed/done/success``SUCCESS=true``FINISH=true` `MSG=success` `RATE_OF_PROGRESS=100` `CODE` 为空或 0
- 进度 action 出现 `CODE` 0 `STEP/MSG/STATUS/MESSAGE` fail/error应标记异常并 `should_continue=false`
- 主要依据结构化字段 `ok``exit_code``values``error_summary` 判断不会提供完整运行态摘要避免被历史状态误导
- `verify-ip SUCCESS=false` runtime 按配置重复检查单次审核仍应说明当前健康检查未通过
- 只有输入里存在 `diagnostic_log` 才把它当作异常诊断上下文
- 脚本正常过程日志不会作为错误依据不能因为日志来自 stderr 就判定异常
- 不要输出密钥tokenAuthorization 或完整日志原文
"""
CHAT_PROMPT = """你是 PAM 部署 Agent 的交互助手。
要求
- 可以回答普通问题解释当前 Agent 的命令和部署流程
- 不要自动触发部署回滚升级脚本执行或 MCP 调用
- 如果用户想执行完整部署提示使用 `analyze <需求>` 先分析确认后再输入 `run`
- 如果用户想单独执行 action提示使用 `action propose <需求>` `action run ...`执行前仍需要人工确认
- 不要输出密钥tokenAuthorizationCLIENT_SECRET api_key
- 不要输出 `<think>``</think>``Thinking Process``Reasoning Process``Chain of Thought`推理过程内部思考或隐藏分析内容
- 只输出可以直接展示给用户的最终回答
"""
LOG_ANALYSIS_PROMPT = """分析 PAM Agent 或部署脚本日志。
要求
- 优先总结异常现象可能原因和建议下一步
- 不要输出密钥tokenAuthorizationCLIENT_SECRET api_key
- 输入通常是日志尾部摘要不代表完整文件
- 不要因为日志来自 stderr 就直接判定失败要结合 ERRORExceptionfail状态码和上下文判断
- 不要输出 `<think>``</think>``Thinking Process``Reasoning Process``Chain of Thought`推理过程内部思考或隐藏分析内容
- 只输出可以直接展示给用户的最终分析结果
"""
SINGLE_ACTION_PROMPT = """把用户自然语言解析成一次 PAM action 调用建议。
输出 JSON schema
{
"action": "get-token",
"ip": "",
"kwargs": {},
"reason": "...",
"risk_level": "low|medium|high",
"requires_confirmation": true
}
要求
- `action` 必须来自输入的 allowed_actions不能识别明确 action 时返回空字符串
- 不要猜测危险 action不要自动规划多个 action
- IP action 必须尽量提取 `ip`
- `ip` 外的额外参数放入 `kwargs`
- 所有 action 都必须 `requires_confirmation=true`
- 不要输出密钥tokenAuthorizationCLIENT_SECRET api_key
"""

View File

@ -6,20 +6,28 @@
from __future__ import annotations
from collections.abc import Iterable
import logging
import re
from dataclasses import asdict
from typing import Any
from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, REQUIRED_PARAMS
from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS, SENSITIVE_KEYS
from pam_deploy_graph.logging_utils import json_for_log, redact_for_log
from pam_deploy_graph.models import (
ActionResult,
ExecutionStrategy,
LlmActionAnalysis,
LlmDeployPlan,
LlmIntentResult,
LlmModeDecision,
LlmParamResult,
LlmSingleActionProposal,
)
from .text_filter import strip_thinking_text
logger = logging.getLogger(__name__)
KEY_ALIASES = {
"home_base_url": "HOME_BASE_URL",
"HOME_BASE_URL": "HOME_BASE_URL",
@ -35,6 +43,9 @@ KEY_ALIASES = {
"MODULE_NAME": "MODULE_NAME",
"versionNumber": "VERSION_NUMBER",
"VERSION_NUMBER": "VERSION_NUMBER",
"parentVersionNumber": "PARENT_VERSION_NUMBER",
"PARENT_VERSION_NUMBER": "PARENT_VERSION_NUMBER",
"parent_version_number": "PARENT_VERSION_NUMBER",
"zipFilePath": "ZIP_FILE_PATH",
"ZIP_FILE_PATH": "ZIP_FILE_PATH",
"actionType": "ACTION_TYPE",
@ -49,8 +60,86 @@ KEY_ALIASES = {
class RuleBasedLlmClient:
"""基于规则的轻量 LLM client fallback。"""
def chat(self, text: str, context: dict[str, Any] | None = None) -> str:
"""规则 fallback 的普通对话说明。"""
logger.info("规则 LLM 普通对话 text=%s context=%s", redact_for_log(text, max_text_len=800), json_for_log(context or {}))
lowered = text.lower()
if any(word in lowered for word in ("help", "帮助", "怎么用", "命令")):
return strip_thinking_text(
"当前是本地规则 LLM fallback。可用 `analyze <需求>` 分析部署需求,`run` 执行完整 workflow"
"`action propose <需求>` 解析单个 action`action run ...` 确认后执行单个 action"
"`log analyze <路径>` 分析日志尾部。"
)
return strip_thinking_text(
"当前未配置真实 LLM已使用本地规则 fallback。普通闲聊只能给出有限说明"
"如需自然语言问答、日志深度分析或更准确的 action 解析,请配置真实 LLM。"
)
def chat_stream(self, text: str, context: dict[str, Any] | None = None) -> Iterable[str]:
"""规则 fallback 的流式对话兼容实现。"""
yield self.chat(text, context=context)
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""用本地规则分析日志尾部。"""
logger.info("规则 LLM 日志分析 source=%s question=%s text_len=%s", source_path, redact_for_log(question or "", max_text_len=300), len(log_text))
lines = log_text.splitlines()
problem_lines = [
line
for line in lines
if re.search(r"error|exception|fail|traceback|timeout|refused|denied|失败|异常|错误|超时", line, flags=re.IGNORECASE)
]
summary = [
f"日志来源: {source_path or '-'}",
f"已分析尾部 {len(lines)} 行。",
]
if question:
summary.append(f"关注问题: {question}")
if problem_lines:
summary.append(f"发现 {len(problem_lines)} 行疑似异常,最近几条:")
summary.extend(f"- {redact_for_log(line, max_text_len=240)}" for line in problem_lines[-5:])
summary.append("建议:优先检查以上异常附近的接口返回、网络连通性、认证信息和目标服务状态。")
else:
summary.append("未在日志尾部发现明显 ERROR/Exception/fail/timeout 关键字。")
summary.append("建议:如问题仍存在,请扩大 `--tail` 或提供更具体的问题描述。")
return strip_thinking_text("\n".join(summary))
def propose_action(
self,
text: str,
allowed_actions: list[str],
params: dict[str, Any],
state_summary: dict[str, Any] | None = None,
) -> LlmSingleActionProposal:
"""只在用户明确写出 action 名时生成单 action 建议。"""
logger.info(
"规则 LLM 单 action 解析开始 text=%s allowed=%s state=%s",
redact_for_log(text, max_text_len=800),
allowed_actions,
json_for_log(state_summary or {}),
)
action = ""
lowered = text.lower()
for candidate in allowed_actions:
if candidate.lower() in lowered:
action = candidate
break
ip_match = re.search(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", text)
kwargs = _safe_action_kwargs(self._extract_key_values(text))
risk = "high" if action in ("publish-version", "create-download-task", "upgrade-ip", "start-ip", "stop-ip", "rollback-ip") else "medium"
proposal = LlmSingleActionProposal(
action=action,
ip=ip_match.group(0) if ip_match else "",
kwargs=kwargs,
reason="规则 fallback 仅在输入中出现明确 action 名时生成建议。" if action else "未识别到明确 action 名。",
risk_level=risk, # type: ignore[arg-type]
requires_confirmation=True,
)
logger.info("规则 LLM 单 action 解析完成 proposal=%s", json_for_log(asdict(proposal)))
return proposal
def understand_request(self, text: str) -> LlmIntentResult:
"""用关键词规则识别用户意图和执行策略偏好。"""
logger.info("规则 LLM 意图识别开始 text=%s", redact_for_log(text, max_text_len=800))
lowered = text.lower()
reasons: list[str] = []
intent = "deploy"
@ -83,16 +172,19 @@ class RuleBasedLlmClient:
if intent == "preview":
strategy_preference = strategy_preference if strategy_preference != "未指定" else "hybrid_node_mcp"
return LlmIntentResult(
result = LlmIntentResult(
intent=intent, # type: ignore[arg-type]
mode_preference=mode_preference, # type: ignore[arg-type]
strategy_preference=strategy_preference, # type: ignore[arg-type]
confidence=0.72 if intent != "deploy" else 0.6,
reasons=reasons,
)
logger.info("规则 LLM 意图识别完成 result=%s", json_for_log(asdict(result)))
return result
def extract_params(self, text: str, base_params: dict[str, Any] | None = None) -> LlmParamResult:
"""从 key=value、中文短语和 IP 地址中抽取参数。"""
logger.info("规则 LLM 参数抽取开始 text=%s base_params=%s", redact_for_log(text, max_text_len=800), json_for_log(base_params or {}))
params = dict(base_params or {})
params.update(self._extract_key_values(text))
params.update(self._extract_chinese_patterns(text))
@ -104,12 +196,14 @@ class RuleBasedLlmClient:
missing = [key for key in REQUIRED_PARAMS if not params.get(key)]
sensitive = [key for key in ("CLIENT_SECRET", "CLIENT_ID") if params.get(key)]
return LlmParamResult(
result = LlmParamResult(
extracted_params=params,
extracted_control=control,
missing_required_params=missing,
sensitive_fields_present=sensitive,
)
logger.info("规则 LLM 参数抽取完成 result=%s", json_for_log(asdict(result)))
return result
def generate_plan(
self,
@ -117,10 +211,9 @@ class RuleBasedLlmClient:
params: dict[str, Any],
intent: str,
strategy: ExecutionStrategy,
skill_policy: dict[str, Any],
tool_summaries: list[dict[str, Any]],
) -> LlmDeployPlan:
"""生成确定性的部署计划和风险提示。"""
logger.info("规则 LLM 计划生成开始 intent=%s strategy=%s params=%s", intent, strategy, json_for_log(params))
if strategy == "hybrid_node_mcp":
strategy_text = "PAM_HOME 使用脚本 actionPAM_NODE 使用 MCP"
elif strategy == "script_only":
@ -142,70 +235,38 @@ class RuleBasedLlmClient:
if strategy == "hybrid_node_mcp":
risk_notes.append("PAM_HOME 当前没有 MCP 能力HOME 阶段仍会调用脚本 action。")
if intent == "query_node_ips":
planned_actions = ["get-token", "get-node-url", "get-online-ips"]
elif intent == "rollback":
planned_actions = ["rollback-ip", "verify-ip", "download-log"]
elif intent == "deploy":
planned_actions = [*GLOBAL_ACTION_SEQUENCE, *IP_ACTION_SEQUENCE]
else:
planned_actions = list(GLOBAL_ACTION_SEQUENCE)
return LlmDeployPlan(
result = LlmDeployPlan(
summary=summary,
risk_notes=risk_notes,
planned_actions=planned_actions,
planned_actions=list(GLOBAL_ACTION_SEQUENCE),
requires_confirmation=intent in ("deploy", "query_node_ips", "rollback"),
execution_strategy=strategy,
)
def decide_execution_mode(
self,
*,
text: str,
params: dict[str, Any],
intent: str,
strategy: ExecutionStrategy,
allowed_modes: list[str],
tool_summaries: list[dict[str, Any]],
) -> LlmModeDecision:
"""根据关键词规则决定进入固定 runtime 或 agentic skill。"""
lowered = text.lower()
requested_agentic = any(
word in lowered for word in ("自主编排", "按 skill", "自动选择工具", "自动决策", "toolcall", "agentic")
)
diagnostic_intent = any(word in lowered for word in ("诊断", "排查", "分析异常", "帮我看看", "explore"))
high_risk_intent = intent in ("deploy", "rollback") or any(word in lowered for word in ("批量", "升级", "回滚"))
mode = "fixed_runtime"
reason = "标准部署和高风险动作默认走固定 runtime。"
risk_level = "high" if high_risk_intent else "medium"
requires_confirmation = True
if requested_agentic or diagnostic_intent:
mode = "agentic_skill"
reason = "用户明确要求按 skill 自主编排,或任务更偏探索/诊断。"
risk_level = "medium"
if mode not in allowed_modes:
mode = allowed_modes[0] if allowed_modes else "fixed_runtime"
reason = "原始模式不在 skill 允许集合内,已回退到允许模式。"
return LlmModeDecision(
mode=mode, # type: ignore[arg-type]
reason=reason,
risk_level=risk_level, # type: ignore[arg-type]
requires_confirmation=requires_confirmation,
)
logger.info("规则 LLM 计划生成完成 result=%s", json_for_log(asdict(result)))
return result
def analyze_action_result(
self,
*,
action: str,
result: ActionResult,
state_summary: dict[str, Any],
) -> LlmActionAnalysis:
"""用本地规则分析 action 结果,作为真实 LLM 不可用时的兜底。"""
logger.info(
"规则 LLM action 审核开始 action=%s result=%s",
action,
json_for_log(
{
"backend": result.backend,
"ok": result.ok,
"exit_code": result.exit_code,
"tool_name": result.tool_name,
"values": result.values,
"error_summary": result.error_summary,
},
max_text_len=1000,
),
)
notes: list[str] = []
has_anomaly = not result.ok
severity = "info"
@ -213,11 +274,12 @@ class RuleBasedLlmClient:
suggested_action = "继续观察。"
requires_confirmation = False
should_continue = True
progress_complete: bool | None = None
if not result.ok:
severity = "medium"
possible_reason = result.error_summary or "action 返回失败状态。"
suggested_action = "查看 action stderr/raw_output确认参数、网络和目标服务状态。"
suggested_action = "查看 action 诊断日志、参数、网络和目标服务状态。"
notes.append("硬规则检测到 action 执行失败。")
should_continue = False
@ -239,6 +301,25 @@ class RuleBasedLlmClient:
notes.append("rollback-ip 失败需要人工处理。")
should_continue = False
if action in ("poll-download-progress", "poll-upgrade-progress"):
progress_complete, progress_has_anomaly, progress_reason, progress_note = _analyze_progress_values(action, result.values)
if progress_note:
notes.append(progress_note)
if progress_has_anomaly:
has_anomaly = True
severity = "high"
possible_reason = progress_reason or possible_reason or "进度接口返回失败状态。"
suggested_action = "停止后续 action检查下载/推送任务状态、PAM_HOME/PAM_NODE 日志和接口返回。"
should_continue = False
elif progress_complete:
has_anomaly = has_anomaly or False
suggested_action = "进度已完成,可以继续下一个 action。"
should_continue = should_continue and True
elif result.ok:
severity = severity if has_anomaly else "info"
suggested_action = "进度未完成,继续查询进度。"
should_continue = should_continue and True
if result.values.get("PENDING_AGENT_CONFIRMATION"):
has_anomaly = True
severity = "high"
@ -248,7 +329,7 @@ class RuleBasedLlmClient:
notes.append("action 返回待人工确认标记。")
should_continue = False
return LlmActionAnalysis(
analysis = LlmActionAnalysis(
action=action,
has_anomaly=has_anomaly,
severity=severity, # type: ignore[arg-type]
@ -256,8 +337,11 @@ class RuleBasedLlmClient:
suggested_action=suggested_action,
requires_confirmation=requires_confirmation,
should_continue=should_continue,
progress_complete=progress_complete,
notes=notes,
)
logger.info("规则 LLM action 审核完成 analysis=%s", json_for_log(asdict(analysis)))
return analysis
def _extract_key_values(self, text: str) -> dict[str, str]:
"""抽取 KEY=VALUE 形式的参数。"""
@ -276,6 +360,7 @@ class RuleBasedLlmClient:
"APP_NAME": r"(?:应用|应用名)\s*[:]?\s*([A-Za-z0-9_.-]+)",
"MODULE_NAME": r"(?:模块|模块名)\s*[:]?\s*([A-Za-z0-9_.-]+)",
"VERSION_NUMBER": r"(?:版本|版本号)\s*[:]?\s*([A-Za-z0-9_.-]+)",
"PARENT_VERSION_NUMBER": r"(?:继承版本|父版本|规则版本|继承哪个版本的规则)\s*[:]?\s*([A-Za-z0-9_.-]+)",
"ZIP_FILE_PATH": r"(?:包|软件包|zip)\s*[:]?\s*([A-Za-z]:[\\/][^\s,;]+|/[^\s,;]+)",
}
params: dict[str, str] = {}
@ -284,3 +369,54 @@ class RuleBasedLlmClient:
if match:
params[key] = match.group(1)
return params
def _analyze_progress_values(action: str, values: dict[str, Any]) -> tuple[bool, bool, str, str]:
"""分析进度字段,返回完成状态、异常状态、原因和备注。"""
step = _lower_value(values.get("STEP"))
status = _lower_value(values.get("STATUS"))
msg = _lower_value(values.get("MSG"))
message = _lower_value(values.get("MESSAGE"))
success = _lower_value(values.get("SUCCESS"))
finish = _lower_value(values.get("FINISH"))
code = _lower_value(values.get("CODE"))
rate = _lower_value(values.get("RATE_OF_PROGRESS"))
complete = False
if step == "done":
complete = True
elif status in ("completed", "complete", "done", "success", "succeeded"):
complete = True
elif success in ("true", "1", "yes"):
complete = True
elif action == "poll-upgrade-progress" and finish in ("true", "1", "yes"):
complete = True
elif msg == "success" and rate == "100" and (not code or code == "0"):
complete = True
if code and code != "0":
return complete, True, f"进度接口返回非 0 CODE: {code}", _progress_note(values)
combined = " ".join(item for item in (step, status, msg, message) if item)
if re.search(r"fail|error", combined, flags=re.IGNORECASE):
return complete, True, values.get("MESSAGE") or values.get("MSG") or values.get("STEP") or "进度接口返回失败状态", _progress_note(values)
return complete, False, "", _progress_note(values)
def _progress_note(values: dict[str, Any]) -> str:
"""把进度核心字段整理成一条备注。"""
parts = []
for key in ("RATE_OF_PROGRESS", "STEP", "MSG", "STATUS", "SUCCESS", "CODE", "FINISH", "MESSAGE"):
value = values.get(key)
if value not in (None, ""):
parts.append(f"{key}={value}")
return "当前进度: " + ", ".join(parts) if parts else "进度接口未返回明确进度字段。"
def _lower_value(value: Any) -> str:
"""把字段值转成小写字符串。"""
return str(value).strip().lower() if value is not None else ""
def _safe_action_kwargs(values: dict[str, str]) -> dict[str, str]:
"""过滤单 action 额外参数,避免把敏感字段放入执行建议。"""
return {key: value for key, value in values.items() if key not in SENSITIVE_KEYS}

View File

@ -0,0 +1,260 @@
"""LLM 文本输出过滤工具。"""
from __future__ import annotations
from collections.abc import Iterable, Iterator
import re
OPEN_THINK_TAG = "<think>"
CLOSE_THINK_TAG = "</think>"
REASONING_START_RE = re.compile(
r"^\s*(?:[#>\-*]+\s*)*(?:\*\*)?"
r"(?:thinking process|thought process|reasoning process|chain of thought|internal reasoning|inner monologue|"
r"思考过程|推理过程|内部思考)"
r"(?:\*\*)?\s*(?:[:]|\s*$)",
flags=re.IGNORECASE,
)
FINAL_ANSWER_RE = re.compile(
r"^\s*(?:[#>\-*]+\s*)*(?:\*\*)?"
r"(?:final answer|final response|answer|response|最终答案|最终回答|正式回答|回答|回复|结论)"
r"(?:\*\*)?\s*[:]\s*",
flags=re.IGNORECASE | re.MULTILINE,
)
REASONING_LINE_RE = re.compile(
r"(thinking process|thought process|reasoning process|chain of thought|internal reasoning|inner monologue|"
r"analyze the request|determine the response|drafting the response|refining the response|"
r"user question|input json|role:|constraints:|requirements:|"
r"do not output|do not automatically|hidden analysis|forbidden tags|"
r"i need to|i should|i must|i will|i can|must ensure|should briefly|ensure no|keep it concise|"
r"思考过程|推理过程|内部思考|分析请求|确定回答|起草回答|优化回答|隐藏分析)",
flags=re.IGNORECASE,
)
MAX_REASONING_PREFIX_HOLD = 80
def strip_thinking_text(text: str) -> str:
"""移除 LLM 普通文本输出里的思考标签、显式思考段和内容。"""
filter_ = ThinkingTextStreamFilter()
visible = filter_.feed(text) + filter_.finish()
return visible.strip()
def filter_thinking_chunks(chunks: Iterable[str]) -> Iterator[str]:
"""按流式分片移除思考内容,避免跨分片泄露。"""
filter_ = ThinkingTextStreamFilter()
for chunk in chunks:
visible = filter_.feed(str(chunk))
if visible:
yield visible
tail = filter_.finish()
if tail:
yield tail
class ThinkingTextStreamFilter:
"""支持跨 chunk 识别 think 标签和显式思考段的流式过滤器。"""
def __init__(self) -> None:
"""初始化可见/隐藏状态和待判定缓冲区。"""
self._pending = ""
self._inside_think = False
self._reasoning_filter = ExplicitReasoningStreamFilter()
def feed(self, chunk: str) -> str:
"""输入一个文本分片,返回当前可安全展示的可见文本。"""
if not chunk:
return ""
self._pending += chunk
output: list[str] = []
while self._pending:
lowered = self._pending.lower()
if self._inside_think:
close_index = lowered.find(CLOSE_THINK_TAG)
if close_index >= 0:
self._pending = self._pending[close_index + len(CLOSE_THINK_TAG) :]
self._inside_think = False
continue
keep = _longest_suffix_prefix(lowered, [CLOSE_THINK_TAG])
self._pending = self._pending[-keep:] if keep else ""
break
open_index = lowered.find(OPEN_THINK_TAG)
close_index = lowered.find(CLOSE_THINK_TAG)
if open_index >= 0 and (close_index < 0 or open_index < close_index):
output.append(self._pending[:open_index])
self._pending = self._pending[open_index + len(OPEN_THINK_TAG) :]
self._inside_think = True
continue
if close_index >= 0:
output.append(self._pending[:close_index])
self._pending = self._pending[close_index + len(CLOSE_THINK_TAG) :]
continue
keep = _longest_suffix_prefix(lowered, [OPEN_THINK_TAG, CLOSE_THINK_TAG])
if keep:
output.append(self._pending[:-keep])
self._pending = self._pending[-keep:]
else:
output.append(self._pending)
self._pending = ""
break
return self._reasoning_filter.feed("".join(output))
def finish(self) -> str:
"""结束流式过滤,丢弃未闭合 think 内容和未完成标签。"""
if self._inside_think:
self._pending = ""
self._inside_think = False
return self._reasoning_filter.finish()
lowered = self._pending.lower()
if lowered in _tag_prefixes():
self._pending = ""
return self._reasoning_filter.finish()
tail = self._pending
self._pending = ""
return self._reasoning_filter.feed(tail) + self._reasoning_filter.finish()
class ExplicitReasoningStreamFilter:
"""过滤以 `Thinking Process:` 等形式输出的显式思考段。"""
def __init__(self) -> None:
"""初始化思考段识别状态。"""
self._buffer = ""
self._mode = "undecided"
def feed(self, chunk: str) -> str:
"""输入已去掉 think 标签的文本,返回可展示内容。"""
if not chunk:
return ""
if self._mode == "pass":
return chunk
self._buffer += chunk
if self._mode == "suppress":
final_text = _extract_after_final_answer_marker(self._buffer)
if final_text is not None:
self._buffer = ""
self._mode = "pass"
return final_text
return ""
if _starts_with_reasoning_marker(self._buffer):
self._mode = "suppress"
final_text = _extract_after_final_answer_marker(self._buffer)
if final_text is not None:
self._buffer = ""
self._mode = "pass"
return final_text
return ""
if _could_be_reasoning_marker_prefix(self._buffer):
return ""
self._mode = "pass"
visible = self._buffer
self._buffer = ""
return visible
def finish(self) -> str:
"""结束过滤,输出普通缓冲或清理被压住的显式思考段。"""
if not self._buffer:
return ""
if self._mode == "suppress" or _starts_with_reasoning_marker(self._buffer):
visible = _strip_leading_reasoning_section(self._buffer)
else:
visible = self._buffer
self._buffer = ""
self._mode = "pass"
return visible
def _starts_with_reasoning_marker(text: str) -> bool:
"""判断文本首个非空内容是否是显式思考段标记。"""
return REASONING_START_RE.match(text) is not None
def _could_be_reasoning_marker_prefix(text: str) -> bool:
"""流式初始阶段判断当前缓冲是否可能是思考段标记的一部分。"""
candidate = _normalize_marker_prefix(text)
if not candidate:
return True
markers = (
"thinking process",
"thought process",
"reasoning process",
"chain of thought",
"internal reasoning",
"inner monologue",
"思考过程",
"推理过程",
"内部思考",
)
return len(candidate) < MAX_REASONING_PREFIX_HOLD and any(marker.startswith(candidate) for marker in markers)
def _normalize_marker_prefix(text: str) -> str:
"""把流式开头清理成便于判断的 marker 前缀。"""
stripped = text.lstrip()
stripped = re.sub(r"^(?:[#>\-*]+\s*)+", "", stripped)
stripped = stripped.strip("*").strip()
return stripped.lower()
def _extract_after_final_answer_marker(text: str) -> str | None:
"""如果存在最终回答标记,返回标记后的正文。"""
matches = list(FINAL_ANSWER_RE.finditer(text))
if not matches:
return None
return text[matches[-1].end() :].strip()
def _strip_leading_reasoning_section(text: str) -> str:
"""删除以显式思考标记开头的推理段,保留后续最终正文。"""
final_text = _extract_after_final_answer_marker(text)
if final_text is not None:
return final_text
lines = text.splitlines()
first = _first_non_empty_line_index(lines)
if first is None or not _starts_with_reasoning_marker(lines[first]):
return text.strip()
last_reasoning = first
for index in range(first, len(lines)):
if _looks_like_reasoning_line(lines[index]):
last_reasoning = index
return "\n".join(lines[last_reasoning + 1 :]).strip()
def _first_non_empty_line_index(lines: list[str]) -> int | None:
"""返回首个非空行下标。"""
for index, line in enumerate(lines):
if line.strip():
return index
return None
def _looks_like_reasoning_line(line: str) -> bool:
"""识别常见显式思考过程行。"""
stripped = line.strip()
if not stripped:
return False
if _starts_with_reasoning_marker(stripped):
return True
if REASONING_LINE_RE.search(stripped):
return True
return bool(re.match(r"^\s*\d+\.\s*\*\*[^*]+(?:request|response|answer|constraints|过程|回答)[^*]*\*\*", stripped, flags=re.IGNORECASE))
def _longest_suffix_prefix(text: str, targets: list[str]) -> int:
"""返回 text 末尾与任一目标标签前缀匹配的最长长度。"""
best = 0
for target in targets:
max_len = min(len(text), len(target) - 1)
for length in range(1, max_len + 1):
if text.endswith(target[:length]):
best = max(best, length)
return best
def _tag_prefixes() -> set[str]:
"""生成 think 标签的所有非完整前缀,用于收尾时丢弃半截标签。"""
prefixes = {""}
for tag in (OPEN_THINK_TAG, CLOSE_THINK_TAG):
prefixes.update(tag[:index] for index in range(1, len(tag)))
return prefixes

View File

@ -3,11 +3,9 @@
from __future__ import annotations
from pam_deploy_graph.constants import ALLOWED_ACTIONS
from pam_deploy_graph.models import LlmDeployPlan, LlmIntentResult, LlmModeDecision
from pam_deploy_graph.models import LlmDeployPlan, LlmIntentResult
VALID_INTENTS = {"deploy", "show_usage", "preview", "query_node_ips", "rollback"}
VALID_EXECUTION_MODES = {"fixed_runtime", "agentic_skill"}
VALID_RISK_LEVELS = {"low", "medium", "high"}
FORBIDDEN_TEXT = ("bash ", "powershell ", "deploy.sh", "deploy.ps1", "CLIENT_SECRET=")
@ -29,13 +27,3 @@ def validate_deploy_plan(plan: LlmDeployPlan) -> None:
forbidden = [item for item in FORBIDDEN_TEXT if item.lower() in lowered]
if forbidden:
raise ValueError(f"计划包含禁止出现的可执行文本: {', '.join(forbidden)}")
def validate_mode_decision(result: LlmModeDecision, allowed_modes: list[str] | None = None) -> None:
"""校验模式决策结果是否合法。"""
if result.mode not in VALID_EXECUTION_MODES:
raise ValueError(f"非法执行模式: {result.mode}")
if result.risk_level not in VALID_RISK_LEVELS:
raise ValueError(f"非法风险等级: {result.risk_level}")
if allowed_modes and result.mode not in allowed_modes:
raise ValueError(f"模式 {result.mode} 不在允许集合内")

View File

@ -0,0 +1,150 @@
"""Agent 运行日志配置和脱敏工具。"""
from __future__ import annotations
import json
import logging
import os
import re
from dataclasses import asdict, is_dataclass
from logging.handlers import TimedRotatingFileHandler
from pathlib import Path
from typing import Any
from .constants import SENSITIVE_KEYS
DEFAULT_LOG_FILE = Path("logs") / "pam_deploy_agent.log"
DEFAULT_LOG_RETENTION_DAYS = 14
LOG_FILE_ENV = "PAM_AGENT_LOG_FILE"
LOG_LEVEL_ENV = "PAM_AGENT_LOG_LEVEL"
LOG_RETENTION_DAYS_ENV = "PAM_AGENT_LOG_RETENTION_DAYS"
_HANDLER_MARKER = "_pam_deploy_agent_handler"
_SENSITIVE_NAME_PARTS = ("secret", "token", "authorization", "api_key", "apikey", "password")
_ASSIGNMENT_PATTERN = re.compile(
r"(?i)\b(client_secret|mcp_client_secret|api_key|pam_llm_api_key|token|access_token|authorization|password)\b"
r"\s*([:=])\s*([^\s,;]+)"
)
_AUTH_BEARER_ASSIGNMENT_PATTERN = re.compile(r"(?i)\b(authorization)\b\s*([:=])\s*bearer\s+[^\s,;]+")
_BEARER_PATTERN = re.compile(r"(?i)(bearer\s+)[A-Za-z0-9._~+\-/=]+")
def configure_logging(
log_file: str | Path | None = None,
level: str | int | None = None,
retention_days: int | str | None = None,
) -> Path:
"""配置 Agent 每日滚动文件日志;重复调用不会重复添加 handler。"""
actual_path = Path(log_file or os.getenv(LOG_FILE_ENV) or DEFAULT_LOG_FILE)
actual_path.parent.mkdir(parents=True, exist_ok=True)
actual_level = _resolve_level(level or os.getenv(LOG_LEVEL_ENV) or "INFO")
actual_retention_days = _resolve_retention_days(
retention_days if retention_days is not None else os.getenv(LOG_RETENTION_DAYS_ENV),
)
package_logger = logging.getLogger("pam_deploy_graph")
package_logger.setLevel(actual_level)
package_logger.propagate = False
marker = str(actual_path.resolve())
for handler in list(package_logger.handlers):
if getattr(handler, _HANDLER_MARKER, "") == marker:
if isinstance(handler, TimedRotatingFileHandler):
handler.setLevel(actual_level)
handler.backupCount = actual_retention_days
return actual_path
package_logger.removeHandler(handler)
handler.close()
break
handler = TimedRotatingFileHandler(
actual_path,
when="midnight",
interval=1,
backupCount=actual_retention_days,
encoding="utf-8",
)
setattr(handler, _HANDLER_MARKER, marker)
handler.setLevel(actual_level)
handler.setFormatter(
logging.Formatter(
fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
package_logger.addHandler(handler)
package_logger.info(
"日志已初始化 path=%s level=%s rotation=daily retention_days=%s",
actual_path,
logging.getLevelName(actual_level),
actual_retention_days,
)
return actual_path
def redact_for_log(value: Any, *, max_text_len: int = 1200) -> Any:
"""递归脱敏并截断日志对象避免把密钥、token 或完整长文本写入日志。"""
if is_dataclass(value) and not isinstance(value, type):
return redact_for_log(asdict(value), max_text_len=max_text_len)
if isinstance(value, dict):
redacted: dict[str, Any] = {}
for key, item in value.items():
text_key = str(key)
if _is_sensitive_key(text_key):
redacted[text_key] = "***"
else:
redacted[text_key] = redact_for_log(item, max_text_len=max_text_len)
return redacted
if isinstance(value, (list, tuple, set)):
return [redact_for_log(item, max_text_len=max_text_len) for item in value]
if isinstance(value, str):
return _truncate(_redact_string(value), max_text_len)
if value is None or isinstance(value, (bool, int, float)):
return value
return _truncate(_redact_string(str(value)), max_text_len)
def json_for_log(value: Any, *, max_text_len: int = 1200) -> str:
"""把对象脱敏后序列化成适合单行日志的 JSON 文本。"""
redacted = redact_for_log(value, max_text_len=max_text_len)
return json.dumps(redacted, ensure_ascii=False, default=str, sort_keys=True)
def _resolve_level(value: str | int) -> int:
"""解析日志级别字符串,非法值降级为 INFO。"""
if isinstance(value, int):
return value
resolved = getattr(logging, str(value).upper(), logging.INFO)
return resolved if isinstance(resolved, int) else logging.INFO
def _resolve_retention_days(value: int | str | None) -> int:
"""解析日志保留天数,非法值使用默认值。"""
if value in (None, ""):
return DEFAULT_LOG_RETENTION_DAYS
try:
days = int(str(value).strip())
except (TypeError, ValueError):
return DEFAULT_LOG_RETENTION_DAYS
return max(days, 0)
def _is_sensitive_key(key: str) -> bool:
"""判断字段名是否应脱敏。"""
if key in SENSITIVE_KEYS:
return True
normalized = key.lower().replace("-", "_")
return any(part in normalized for part in _SENSITIVE_NAME_PARTS)
def _truncate(value: str, limit: int) -> str:
"""截断过长字符串。"""
if len(value) <= limit:
return value
return value[:limit] + "...[已截断]"
def _redact_string(value: str) -> str:
"""脱敏字符串中的常见 KEY=VALUE 和 Bearer token 片段。"""
value = _AUTH_BEARER_ASSIGNMENT_PATTERN.sub(lambda match: f"{match.group(1)}{match.group(2)}***", value)
value = _ASSIGNMENT_PATTERN.sub(lambda match: f"{match.group(1)}{match.group(2)}***", value)
return _BEARER_PATTERN.sub(lambda match: f"{match.group(1)}***", value)

View File

@ -7,6 +7,7 @@ callable 或 SDK session 适配成这个接口,避免业务代码绑定具体
from __future__ import annotations
import json
import logging
import time
import urllib.parse
import urllib.request
@ -16,6 +17,10 @@ from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from .logging_utils import json_for_log
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class McpAuthConfig:
@ -111,10 +116,21 @@ class McpClientConfig:
def load_mcp_client_config(path: str | Path) -> McpClientConfig:
"""读取 MCP client JSON 配置文件。"""
logger.info("读取 MCP client 配置 path=%s", path)
payload = json.loads(Path(path).read_text(encoding="utf-8"))
if not isinstance(payload, dict):
raise ValueError("MCP client 配置必须是 JSON object")
return McpClientConfig.from_mapping(payload)
config = McpClientConfig.from_mapping(payload)
logger.info(
"MCP client 配置读取完成 path=%s transport=%s server_url=%s command=%s has_auth=%s tool_names=%s",
path,
config.transport,
config.server_url,
config.command,
config.auth is not None,
json_for_log(config.tool_names),
)
return config
class FunctionMcpToolClient:
@ -126,6 +142,7 @@ class FunctionMcpToolClient:
def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any:
"""调用底层函数并返回原始结果。"""
logger.info("Function MCP tool 调用 tool=%s arguments=%s", tool_name, json_for_log(arguments))
return self.caller(tool_name, arguments)
@ -147,13 +164,19 @@ class SessionMcpToolClient:
def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any:
"""调用 SDK session并把 SDK 返回值归一化。"""
logger.info("Session MCP tool 调用开始 tool=%s arguments=%s", tool_name, json_for_log(arguments))
result = self.session.call_tool(tool_name, arguments)
return normalize_mcp_sdk_result(result)
normalized = normalize_mcp_sdk_result(result)
logger.info("Session MCP tool 调用完成 tool=%s result=%s", tool_name, json_for_log(normalized, max_text_len=1600))
return normalized
def list_tools(self) -> list[str]:
"""从 SDK session 获取 tool 名称列表。"""
logger.info("Session MCP list_tools 开始")
result = self.session.list_tools()
return normalize_mcp_tool_list(result)
tools = normalize_mcp_tool_list(result)
logger.info("Session MCP list_tools 完成 tools=%s", tools)
return tools
class StdioMcpToolClient:
@ -176,9 +199,19 @@ class StdioMcpToolClient:
self.env = env
self.cwd = cwd or None
self.timeout_seconds = timeout_seconds
logger.info(
"stdio MCP client 初始化 command=%s args=%s cwd=%s env_keys=%s timeout=%s",
self.command,
self.args,
self.cwd or "",
sorted((self.env or {}).keys()),
self.timeout_seconds,
)
def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any:
"""创建一次 MCP stdio session调用 tool 后关闭 session。"""
started_at = time.perf_counter()
logger.info("stdio MCP tool 调用开始 tool=%s arguments=%s", tool_name, json_for_log(arguments))
try:
import anyio
from mcp import ClientSession
@ -203,10 +236,23 @@ class StdioMcpToolClient:
)
return normalize_mcp_sdk_result(result)
return anyio.run(call_once)
try:
result = anyio.run(call_once)
except Exception:
logger.exception("stdio MCP tool 调用失败 tool=%s duration_ms=%s", tool_name, int((time.perf_counter() - started_at) * 1000))
raise
logger.info(
"stdio MCP tool 调用完成 tool=%s duration_ms=%s result=%s",
tool_name,
int((time.perf_counter() - started_at) * 1000),
json_for_log(result, max_text_len=1600),
)
return result
def list_tools(self) -> list[str]:
"""创建一次 MCP stdio session读取 server 暴露的 tool 列表。"""
started_at = time.perf_counter()
logger.info("stdio MCP list_tools 开始")
try:
import anyio
from mcp import ClientSession
@ -227,7 +273,13 @@ class StdioMcpToolClient:
result = await session.list_tools()
return normalize_mcp_tool_list(result)
return anyio.run(list_once)
try:
tools = anyio.run(list_once)
except Exception:
logger.exception("stdio MCP list_tools 失败 duration_ms=%s", int((time.perf_counter() - started_at) * 1000))
raise
logger.info("stdio MCP list_tools 完成 duration_ms=%s tools=%s", int((time.perf_counter() - started_at) * 1000), tools)
return tools
class OAuthTokenProvider:
@ -243,6 +295,12 @@ class OAuthTokenProvider:
self.timeout_seconds = timeout_seconds
self._token = ""
self._expires_at = 0.0
logger.info(
"MCP OAuth token provider 初始化 token_url=%s client_id=%s timeout=%s",
self.config.token_url,
self.config.client_id,
self.timeout_seconds,
)
def authorization_headers(self) -> dict[str, str]:
"""返回带 token 的请求头。"""
@ -255,7 +313,9 @@ class OAuthTokenProvider:
"""获取可用 token未过期时复用缓存。"""
now = time.time()
if self._token and now < self._expires_at:
logger.info("MCP auth token 使用缓存 expires_in_sec=%s", int(self._expires_at - now))
return self._token
logger.info("MCP auth token 开始刷新 token_url=%s client_id=%s", self.config.token_url, self.config.client_id)
payload = {
"grant_type": self.config.grant_type,
"client_id": self.config.client_id,
@ -278,6 +338,7 @@ class OAuthTokenProvider:
expires_in = _safe_float(result.get(self.config.expires_in_field), 3600)
self._token = token
self._expires_at = now + max(expires_in - 60, 1)
logger.info("MCP auth token 刷新完成 expires_in=%s cached_until=%s", expires_in, int(self._expires_at))
return token
@ -305,14 +366,23 @@ class HttpMcpToolClient:
self.auth_provider = auth_provider
self.timeout_seconds = timeout_seconds
self.sse_read_timeout_seconds = sse_read_timeout_seconds
logger.info(
"HTTP MCP client 初始化 url=%s transport=%s has_auth=%s headers=%s timeout=%s sse_read_timeout=%s",
self.url,
self.transport,
self.auth_provider is not None,
json_for_log(self.headers),
self.timeout_seconds,
self.sse_read_timeout_seconds,
)
def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any:
"""连接 MCP server调用 tool 后关闭 session。"""
return self._run_session(lambda session: session.call_tool(tool_name, arguments))
return self._run_session(lambda session: session.call_tool(tool_name, arguments), operation_name=f"call_tool:{tool_name}", arguments=arguments)
def list_tools(self) -> list[str]:
"""连接 MCP server读取 server 暴露的 tool 名称。"""
return self._run_session(lambda session: session.list_tools(), normalize_tools=True)
return self._run_session(lambda session: session.list_tools(), normalize_tools=True, operation_name="list_tools")
def _build_headers(self) -> dict[str, str]:
"""合并静态 headers 和动态鉴权 token。"""
@ -321,8 +391,23 @@ class HttpMcpToolClient:
headers.update(self.auth_provider.authorization_headers())
return headers
def _run_session(self, operation: Callable[[Any], Any], *, normalize_tools: bool = False) -> Any:
def _run_session(
self,
operation: Callable[[Any], Any],
*,
normalize_tools: bool = False,
operation_name: str = "operation",
arguments: dict[str, Any] | None = None,
) -> Any:
"""创建一次 HTTP/SSE MCP session 并执行指定操作。"""
started_at = time.perf_counter()
logger.info(
"HTTP MCP session 开始 operation=%s url=%s transport=%s arguments=%s",
operation_name,
self.url,
self.transport,
json_for_log(arguments or {}),
)
try:
import anyio
from mcp import ClientSession
@ -357,7 +442,24 @@ class HttpMcpToolClient:
result = await operation(session)
return normalize_mcp_tool_list(result) if normalize_tools else normalize_mcp_sdk_result(result)
return anyio.run(call_once)
try:
result = anyio.run(call_once)
except Exception:
logger.exception(
"HTTP MCP session 失败 operation=%s url=%s transport=%s duration_ms=%s",
operation_name,
self.url,
self.transport,
int((time.perf_counter() - started_at) * 1000),
)
raise
logger.info(
"HTTP MCP session 完成 operation=%s duration_ms=%s result=%s",
operation_name,
int((time.perf_counter() - started_at) * 1000),
json_for_log(result, max_text_len=1600),
)
return result
def normalize_mcp_sdk_result(result: Any) -> Any:

View File

@ -2,8 +2,10 @@
from __future__ import annotations
import logging
from pathlib import Path
from .logging_utils import json_for_log
from .mcp_client import (
HttpMcpToolClient,
McpClientConfig,
@ -13,16 +15,36 @@ from .mcp_client import (
)
from .mcp_runner import McpActionRunner
logger = logging.getLogger(__name__)
def build_mcp_runner_from_config(path: str | Path) -> McpActionRunner:
"""读取 MCP 配置文件,并构造可直接给 Agent 使用的 runner。"""
logger.info("开始构建 MCP runner config_path=%s", path)
config = load_mcp_client_config(path)
client = build_mcp_client(config)
return McpActionRunner(client=client, tool_names=config.tool_names or None)
runner = McpActionRunner(client=client, tool_names=config.tool_names or None)
logger.info(
"MCP runner 构建完成 config_path=%s transport=%s server_url=%s client=%s tool_names=%s",
path,
config.transport,
config.server_url,
type(client).__name__,
json_for_log(config.tool_names),
)
return runner
def build_mcp_client(config: McpClientConfig):
"""根据 transport 类型创建 MCP client。"""
logger.info(
"开始构建 MCP client transport=%s server_url=%s command=%s has_auth=%s headers=%s",
config.transport,
config.server_url,
config.command,
config.auth is not None,
json_for_log(config.headers),
)
if config.transport == "stdio":
return StdioMcpToolClient(
command=config.command,

View File

@ -2,11 +2,15 @@
from __future__ import annotations
import logging
from typing import Any, Protocol
from .logging_utils import json_for_log
from .models import ActionResult
from .output_parser import parse_mcp_result
logger = logging.getLogger(__name__)
class McpToolClient(Protocol):
"""MCP 工具客户端需要实现的最小同步接口。"""
@ -46,6 +50,11 @@ class McpActionRunner:
self.client = client
self.tool_names = tool_names or {}
self._discovered_tools: list[str] | None = None
logger.info(
"MCP action runner 初始化 client=%s explicit_tool_names=%s",
type(client).__name__ if client else "",
json_for_log(self.tool_names),
)
def run(
self,
@ -70,16 +79,34 @@ class McpActionRunner:
node_url=node_url,
stop_first=stop_first,
)
logger.info(
"MCP action 调用开始 action=%s tool=%s arguments=%s",
action,
tool_name,
json_for_log(arguments),
)
try:
payload = self.client.call_tool(tool_name, arguments)
except Exception as exc: # pragma: no cover - 防御性异常包装
logger.exception("MCP action 调用异常 action=%s tool=%s", action, tool_name)
return parse_mcp_result(action, {}, ok=False, tool_name=tool_name, error=str(exc))
return parse_mcp_result(action, payload, ok=True, tool_name=tool_name)
logger.info("MCP action 原始返回 action=%s tool=%s payload=%s", action, tool_name, json_for_log(payload, max_text_len=1600))
result = parse_mcp_result(action, payload, ok=True, tool_name=tool_name)
logger.info(
"MCP action 解析完成 action=%s tool=%s ok=%s values=%s error=%s",
action,
tool_name,
result.ok,
json_for_log(result.values),
result.error_summary,
)
return result
def _resolve_tool_name(self, action: str) -> str:
"""根据显式映射、server tools 自动发现和默认约定解析 tool name。"""
explicit = self.tool_names.get(action)
if explicit:
logger.info("MCP tool 使用显式映射 action=%s tool=%s", action, explicit)
return explicit
discovered = self._list_discovered_tools()
@ -89,12 +116,14 @@ class McpActionRunner:
for candidate in candidates:
matched = by_lower.get(candidate.lower())
if matched:
logger.info("MCP tool 自动匹配 action=%s tool=%s candidates=%s", action, matched, candidates)
return matched
available = ", ".join(discovered)
raise ValueError(f"MCP server 未发现 action 对应 tool: {action}; 已发现: {available}")
fallback = DEFAULT_NODE_MCP_TOOLS.get(action)
if fallback:
logger.info("MCP tool 使用默认约定 action=%s tool=%s", action, fallback)
return fallback
raise ValueError(f"action 未映射 MCP tool: {action}")
@ -108,7 +137,9 @@ class McpActionRunner:
try:
self._discovered_tools = list(self.client.list_tools())
except Exception:
logger.exception("MCP tool 自动发现失败,使用默认 tool name 约定")
self._discovered_tools = []
logger.info("MCP tool 自动发现完成 tools=%s", self._discovered_tools)
return self._discovered_tools
def _build_arguments(
@ -138,6 +169,8 @@ class McpActionRunner:
arguments["hashCode"] = hash_code
if node_url:
arguments["nodeUrl"] = node_url
if action == "create-download-task" and params.get("PARENT_VERSION_NUMBER"):
arguments["parentVersionNumber"] = params.get("PARENT_VERSION_NUMBER")
if action == "rollback-ip":
arguments["stopFirst"] = stop_first
return {key: value for key, value in arguments.items() if value not in (None, "")}

View File

@ -7,13 +7,11 @@ from typing import Any, Literal
BackendName = Literal["mcp", "script", "fake"]
ExecutionStrategy = Literal["hybrid_node_mcp", "script_only", "fake"]
AgentExecutionMode = Literal["fixed_runtime", "agentic_skill"]
IntentName = Literal["deploy", "show_usage", "preview", "query_node_ips", "rollback"]
ModePreference = Literal["MCP", "API脚本", "未指定"]
StrategyPreference = Literal["hybrid_node_mcp", "script_only", "fake", "未指定"]
ActionAnalysisSeverity = Literal["info", "low", "medium", "high"]
ModeDecisionRisk = Literal["low", "medium", "high"]
ToolScope = Literal["global", "ip"]
ActionRiskLevel = Literal["low", "medium", "high"]
@dataclass(slots=True)
@ -32,21 +30,6 @@ class ActionResult:
error_summary: str = ""
@dataclass(slots=True)
class ActionToolSpec:
"""面向 LLM 和 runtime 的统一 action tool 描述。"""
name: str
action: str
scope: ToolScope
description: str
risk_level: ModeDecisionRisk = "medium"
requires_confirmation: bool = False
required_runtime_fields: tuple[str, ...] = ()
required_param_fields: tuple[str, ...] = ()
preferred_backend: str = ""
@dataclass(slots=True)
class SkillPolicy:
"""从 Skill 文档提取出的部署策略约束。"""
@ -54,7 +37,6 @@ class SkillPolicy:
name: str
source_path: str
description: str = ""
allowed_execution_modes: tuple[AgentExecutionMode, ...] = ("fixed_runtime", "agentic_skill")
allowed_modes: tuple[str, ...] = ("MCP", "API脚本")
allowed_actions: tuple[str, ...] = ()
required_confirmations: tuple[str, ...] = (
@ -108,16 +90,6 @@ class LlmDeployPlan:
execution_strategy: StrategyPreference = "未指定"
@dataclass(slots=True)
class LlmModeDecision:
"""LLM 给出的执行模式决策。"""
mode: AgentExecutionMode = "fixed_runtime"
reason: str = ""
risk_level: ModeDecisionRisk = "medium"
requires_confirmation: bool = True
@dataclass(slots=True)
class LlmActionAnalysis:
"""LLM 或规则对单次 action 结果的诊断建议。"""
@ -129,9 +101,22 @@ class LlmActionAnalysis:
suggested_action: str = ""
requires_confirmation: bool = False
should_continue: bool = True
progress_complete: bool | None = None
notes: list[str] = field(default_factory=list)
@dataclass(slots=True)
class LlmSingleActionProposal:
"""LLM 对单次 action 调用的结构化建议。"""
action: str
ip: str = ""
kwargs: dict[str, Any] = field(default_factory=dict)
reason: str = ""
risk_level: ActionRiskLevel = "medium"
requires_confirmation: bool = True
@dataclass(slots=True)
class AgentState:
"""一次部署运行的完整状态,可序列化到 checkpoint。"""
@ -146,7 +131,6 @@ class AgentState:
trace_file_path: str = ""
node_mcp_server_name: str = ""
node_mcp_tool_names: dict[str, str] = field(default_factory=dict)
execution_mode: AgentExecutionMode = "fixed_runtime"
completed_global_steps: list[str] = field(default_factory=list)
hash_code: str = ""
node_url: str = ""
@ -157,11 +141,8 @@ class AgentState:
last_success_step: str = ""
last_failed_step: str = ""
checkpoint_path: str = ""
planned_actions: list[str] = field(default_factory=list)
mode_reason: str = ""
mode_risk_level: ModeDecisionRisk = "medium"
mode_requires_confirmation: bool = True
paused: bool = False
pause_reason: str = ""
review_context: dict[str, Any] = field(default_factory=dict)
events: list[dict[str, Any]] = field(default_factory=list)
poll_attempts: dict[str, int] = field(default_factory=dict)

View File

@ -2,13 +2,18 @@
from __future__ import annotations
import logging
import subprocess
import time
from pathlib import Path
from typing import Any
from .logging_utils import json_for_log
from .models import ActionResult
from .output_parser import parse_script_result
logger = logging.getLogger(__name__)
class ScriptActionRunner:
"""脚本 action runner负责构造命令、执行脚本并解析结果。"""
@ -40,6 +45,18 @@ class ScriptActionRunner:
stop_first=stop_first,
trace_file_path=trace_file_path,
)
started_at = time.perf_counter()
logger.info(
"脚本 action 开始 action=%s command=%s cwd=%s config=%s ip=%s trace=%s timeout=%s",
action,
json_for_log(command),
self.script_base_dir,
config_path,
ip or "",
trace_file_path or "",
timeout_sec,
)
try:
completed = subprocess.run(
command,
cwd=str(self.script_base_dir),
@ -48,7 +65,19 @@ class ScriptActionRunner:
timeout=timeout_sec,
check=False,
)
return parse_script_result(
except Exception:
logger.exception("脚本 action 执行异常 action=%s command=%s cwd=%s", action, json_for_log(command), self.script_base_dir)
raise
duration_ms = int((time.perf_counter() - started_at) * 1000)
logger.info(
"脚本 action 结束 action=%s exit_code=%s duration_ms=%s stdout=%s stderr=%s",
action,
completed.returncode,
duration_ms,
json_for_log(completed.stdout, max_text_len=1200),
json_for_log(completed.stderr, max_text_len=1200),
)
result = parse_script_result(
action=action,
stdout=completed.stdout,
stderr=completed.stderr,
@ -56,6 +85,14 @@ class ScriptActionRunner:
backend="script",
tool_name=script_entry,
)
logger.info(
"脚本 action 解析完成 action=%s ok=%s values=%s error=%s",
action,
result.ok,
json_for_log(result.values),
result.error_summary,
)
return result
def build_command(
self,

View File

@ -2,7 +2,6 @@
from __future__ import annotations
import re
from pathlib import Path
from .constants import (
@ -16,7 +15,7 @@ from .models import SkillPolicy
def load_skill_policy(path: str | Path) -> SkillPolicy:
"""读取 Skill markdown,并提取真正参与执行的策略约束"""
"""读取 Skill markdown 头部信息,并填充 action/参数策略"""
skill_path = Path(path)
text = skill_path.read_text(encoding="utf-8")
name = "pam-auto-deply"
@ -31,121 +30,13 @@ def load_skill_policy(path: str | Path) -> SkillPolicy:
elif line.startswith("description:"):
description = line.split(":", 1)[1].strip()
parsed_actions = _parse_allowed_actions(text)
action_sequence = _parse_action_sequence(text)
ip_action_sequence = tuple(action for action in action_sequence if action in IP_ACTION_SEQUENCE) or IP_ACTION_SEQUENCE
global_sequence = tuple(action for action in action_sequence if action in GLOBAL_ACTION_SEQUENCE) or GLOBAL_ACTION_SEQUENCE
required_confirmations = _parse_required_confirmations(text)
forbidden_actions = _parse_forbidden_actions(text)
required_params = _parse_required_params(text)
return SkillPolicy(
name=name,
source_path=str(skill_path),
description=description,
allowed_actions=parsed_actions or ALLOWED_ACTIONS,
required_params=required_params or REQUIRED_PARAMS,
allowed_actions=ALLOWED_ACTIONS,
required_params=REQUIRED_PARAMS,
optional_params=DEFAULT_PARAMS.copy(),
required_confirmations=required_confirmations or ("params", "target_scope", "rollback"),
action_sequence=global_sequence,
ip_action_sequence=ip_action_sequence,
forbidden_actions=forbidden_actions or ("script-main-flow", "auto-rollback", "modify-deploy-scripts"),
action_sequence=GLOBAL_ACTION_SEQUENCE,
ip_action_sequence=IP_ACTION_SEQUENCE,
)
def _parse_allowed_actions(text: str) -> tuple[str, ...]:
"""从 skill 文档的 action 表中提取允许的 action。"""
section = _section_body(text, "### 6.3 可用 action")
if not section:
return ()
actions: list[str] = []
for raw_line in section.splitlines():
line = raw_line.strip()
if not line.startswith("|"):
continue
parts = [item.strip() for item in line.strip("|").split("|")]
if not parts or parts[0] in ("action", "---"):
continue
action = parts[0].strip("` ")
if action in ALLOWED_ACTIONS and action not in actions:
actions.append(action)
return tuple(actions)
def _parse_action_sequence(text: str) -> tuple[str, ...]:
"""从主流程章节提取推荐执行顺序。"""
section = _section_body(text, "### 4.1 正式部署主流程")
if not section:
return ()
found: list[str] = []
for action in [*GLOBAL_ACTION_SEQUENCE, *IP_ACTION_SEQUENCE]:
if re.search(rf"\b{re.escape(action)}\b", section) and action not in found:
found.append(action)
return tuple(found)
def _parse_required_confirmations(text: str) -> tuple[str, ...]:
"""从强制确认点章节提取确认类型。"""
section = _section_body(text, "### 4.2 主流程中的强制确认点")
if not section:
return ()
confirmations: list[str] = []
keyword_map = {
"参数确认": "params",
"目标 ip": "target_scope",
"部署范围": "target_scope",
"回滚": "rollback",
"间隔策略": "interval_policy",
}
lowered = section.lower()
for keyword, name in keyword_map.items():
if keyword in lowered or keyword in section:
confirmations.append(name)
return tuple(dict.fromkeys(confirmations))
def _parse_forbidden_actions(text: str) -> tuple[str, ...]:
"""从禁止事项章节提取禁止项。"""
section = _section_body(text, "### 7.5 明确禁止的做法")
if not section:
return ()
forbidden: list[str] = []
if "脚本主流程" in section:
forbidden.append("script-main-flow")
if "自动执行回滚" in section or "自动回滚" in section:
forbidden.append("auto-rollback")
if "修改脚本" in section or "自动生成" in section:
forbidden.append("modify-deploy-scripts")
return tuple(dict.fromkeys(forbidden))
def _parse_required_params(text: str) -> tuple[str, ...]:
"""从参数表提取必填脚本字段。"""
section = _section_body(text, "### 3.1 必填业务参数")
if not section:
return ()
params: list[str] = []
for raw_line in section.splitlines():
line = raw_line.strip()
if not line.startswith("|"):
continue
parts = [item.strip() for item in line.strip("|").split("|")]
if len(parts) < 3 or parts[0] in ("规范字段", "---"):
continue
script_field = parts[1].strip("` ")
required_flag = parts[2]
if script_field in REQUIRED_PARAMS and required_flag == "":
params.append(script_field)
return tuple(params)
def _section_body(text: str, heading: str) -> str:
"""提取指定 markdown heading 到下一同级 heading 之间的正文。"""
marker = f"{heading}\n"
if marker not in text:
return ""
_, tail = text.split(marker, 1)
matches = list(re.finditer(r"^###\s+", tail, flags=re.MULTILINE))
if matches:
return tail[: matches[0].start()]
return tail

View File

@ -70,7 +70,7 @@ ACTION_TOOL_SPECS: dict[str, ActionToolSpec] = {
name="poll_download_progress",
action="poll-download-progress",
scope="global",
description="轮询云下载任务进度",
description="单次查询云下载任务进度;是否继续查询由 Agent workflow 和 LLM 审核决定",
risk_level="medium",
),
"upgrade-ip": ActionToolSpec(
@ -85,7 +85,7 @@ ACTION_TOOL_SPECS: dict[str, ActionToolSpec] = {
name="poll_upgrade_progress",
action="poll-upgrade-progress",
scope="ip",
description="轮询单个工作站升级进度",
description="单次查询单个工作站升级进度;是否继续查询由 Agent workflow 和 LLM 审核决定",
risk_level="medium",
),
"start-ip": ActionToolSpec(

View File

@ -9,10 +9,19 @@
"suggested_action": "...",
"requires_confirmation": false,
"should_continue": true,
"progress_complete": null,
"notes": ["..."]
}
要求:
- 必须明确给出 `should_continue`:没有问题时为 true存在需要人工判断的问题时为 false。
- 如果 exit_code 非 0、ok=false、verify-ip SUCCESS=false、出现 pending_confirmation应标记异常。
- 如果 exit_code 非 0、ok=false、verify-ip SUCCESS=false、出现旧版 pending_confirmation应标记异常。
- 对 `poll-download-progress`、`poll-upgrade-progress` 必须判断 `progress_complete`:已完成为 true未完成但正常为 false非进度 action 可为 null。
- 进度 action 未完成但正常时,`has_anomaly=false`、`should_continue=true`、`progress_complete=false`,建议继续查询进度。
- 进度 action 完成条件优先看 `STEP=DONE`、`STATUS=completed/done/success`、`SUCCESS=true`、`FINISH=true`,或 `MSG=success` 且 `RATE_OF_PROGRESS=100` 且 `CODE` 为空或 0。
- 进度 action 出现 `CODE` 非 0或 `STEP/MSG/STATUS/MESSAGE` 含 fail/error应标记异常并 `should_continue=false`。
- 主要依据结构化字段 `ok`、`exit_code`、`values`、`error_summary` 判断;不会提供完整运行态摘要,避免被历史状态误导。
- `verify-ip SUCCESS=false` 由 runtime 按配置重复检查;单次审核仍应说明当前健康检查未通过。
- 只有输入里存在 `diagnostic_log` 时,才把它当作异常诊断上下文。
- 脚本正常过程日志不会作为错误依据,不能因为日志来自 stderr 就判定异常。
- 不要输出密钥、token、Authorization 或完整日志原文。

View File

@ -18,11 +18,13 @@ PARAMS = {
"MODULE_NAME": "Node",
"VERSION_NUMBER": "2.0.5",
"ZIP_FILE_PATH": "C:/pkg.zip",
"VERIFY_INTERVAL_SEC": 0,
"VERIFY_MAX_ATTEMPTS": 2,
}
class BlockingReviewLlmClient:
def analyze_action_result(self, *, action, result, state_summary):
def analyze_action_result(self, *, action, result):
return LlmActionAnalysis(
action=action,
has_anomaly=True,
@ -35,11 +37,84 @@ class BlockingReviewLlmClient:
)
class BlockingOnceReviewLlmClient:
def __init__(self, blocked_action: str = "get-token") -> None:
self.blocked_action = blocked_action
self.blocked = False
def analyze_action_result(self, *, action, result):
if action == self.blocked_action and not self.blocked:
self.blocked = True
return LlmActionAnalysis(
action=action,
has_anomaly=True,
severity="high",
possible_reason="review blocked once",
suggested_action="fix then retry current action",
requires_confirmation=True,
should_continue=False,
)
return LlmActionAnalysis(action=action)
class BrokenReviewLlmClient:
def analyze_action_result(self, *, action, result, state_summary):
def analyze_action_result(self, *, action, result):
raise RuntimeError("review transport failed")
class ProgressivePollRunner(FakeActionRunner):
"""模拟下载和推送进度多次查询后才完成。"""
def __init__(self) -> None:
super().__init__()
self.download_progress = ["10", "55", "100"]
self.upgrade_progress: dict[str, list[str]] = {}
def _fixture_for(self, action, kwargs):
if action == "poll-download-progress":
rate = self.download_progress.pop(0) if self.download_progress else "100"
return {
"ACTION": action,
"STEP": "DONE" if rate == "100" else "RUNNING",
"RATE_OF_PROGRESS": rate,
"MSG": "success" if rate == "100" else "running",
"MESSAGE": f"download {rate}%",
}
if action == "poll-upgrade-progress":
ip = kwargs.get("ip", "")
values = self.upgrade_progress.setdefault(str(ip), ["30", "100"])
rate = values.pop(0) if values else "100"
return {
"ACTION": action,
"IP": ip,
"STEP": "DONE" if rate == "100" else "RUNNING",
"RATE_OF_PROGRESS": rate,
"MSG": "success" if rate == "100" else "running",
"MESSAGE": f"upgrade {rate}%",
}
return super()._fixture_for(action, kwargs)
class FlakyVerifyRunner(FakeActionRunner):
"""模拟应用启动后第二次健康检查通过。"""
def __init__(self) -> None:
super().__init__()
self.verify_calls = 0
def _fixture_for(self, action, kwargs):
if action == "verify-ip" and kwargs.get("ip") == "192.168.1.10":
self.verify_calls += 1
if self.verify_calls == 1:
return {
"ACTION": action,
"IP": "192.168.1.10",
"SUCCESS": "false",
"MESSAGE": "application is starting",
}
return super()._fixture_for(action, kwargs)
def test_run_deploy_flow_success(tmp_path: Path):
agent = PamDeployAgent(fake_runner=FakeActionRunner())
state = agent.create_state(
@ -55,39 +130,81 @@ def test_run_deploy_flow_success(tmp_path: Path):
assert all(item["status"] == "SUCCESS" for item in state.ip_states.values())
def test_agentic_state_uses_planned_actions_subset(tmp_path: Path):
agent = PamDeployAgent(fake_runner=FakeActionRunner())
def test_progress_actions_repeat_until_llm_marks_complete(tmp_path: Path):
fake = ProgressivePollRunner()
agent = PamDeployAgent(fake_runner=fake)
state = agent.create_state(
params=PARAMS,
params={**PARAMS, "POLL_INTERVAL_SEC": 0},
execution_strategy="fake",
execution_mode="agentic_skill",
planned_actions=["get-token", "get-node-url", "get-online-ips"],
config_path=str(tmp_path / "config.txt"),
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
agent.run_deploy_flow(state)
assert state.execution_mode == "agentic_skill"
assert state.planned_actions == ["get-token", "get-node-url", "get-online-ips"]
assert state.completed_global_steps == ["get-token", "get-node-url", "get-online-ips"]
assert state.ip_states == {}
calls = [call[0] for call in fake.calls]
assert calls.count("poll-download-progress") == 3
assert calls.count("poll-upgrade-progress") == 4
assert "poll-download-progress" in state.completed_global_steps
assert state.poll_attempts == {}
assert all(item["status"] == "SUCCESS" for item in state.ip_states.values())
progress_events = [event for event in state.events if event["type"] == "ACTION_PROGRESS"]
assert any(event["stage"] == "poll-download-progress" and "RATE_OF_PROGRESS=10" in event["message"] for event in progress_events)
assert any(event["stage"] == "poll-upgrade-progress" and event["ip"] == "192.168.1.10" for event in progress_events)
def test_fixed_runtime_state_also_respects_planned_actions_subset(tmp_path: Path):
agent = PamDeployAgent(fake_runner=FakeActionRunner())
def test_progress_timeout_pauses_on_current_action(tmp_path: Path):
fake = FakeActionRunner(
{
"poll-download-progress": {
"ACTION": "poll-download-progress",
"STEP": "RUNNING",
"RATE_OF_PROGRESS": "20",
"MSG": "running",
"MESSAGE": "download 20%",
}
}
)
agent = PamDeployAgent(fake_runner=fake)
state = agent.create_state(
params=PARAMS,
params={**PARAMS, "POLL_INTERVAL_SEC": 0, "DOWNLOAD_POLL_MAX_ATTEMPTS": 2},
execution_strategy="fake",
execution_mode="fixed_runtime",
planned_actions=["get-token", "get-node-url", "get-online-ips"],
config_path=str(tmp_path / "config.txt"),
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
agent.run_deploy_flow(state)
assert state.execution_mode == "fixed_runtime"
assert state.completed_global_steps == ["get-token", "get-node-url", "get-online-ips"]
assert state.ip_states == {}
assert state.paused is True
assert state.pause_reason == "progress_timeout"
assert state.last_failed_step == "poll-download-progress"
assert "poll-download-progress" not in state.completed_global_steps
assert state.review_context["stage"] == "poll-download-progress"
assert state.poll_attempts["global:poll-download-progress"] == 2
def test_verify_ip_retries_until_success_before_marking_failed(tmp_path: Path):
fake = FlakyVerifyRunner()
agent = PamDeployAgent(fake_runner=fake)
state = agent.create_state(
params=PARAMS,
execution_strategy="fake",
config_path=str(tmp_path / "config.txt"),
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
agent.run_deploy_flow(state)
assert fake.verify_calls == 2
assert state.paused is False
assert state.poll_attempts == {}
assert state.ip_states["192.168.1.10"]["status"] == "SUCCESS"
assert any(
event["type"] == "ACTION_PROGRESS"
and event["stage"] == "verify-ip"
and event["ip"] == "192.168.1.10"
for event in state.events
)
def test_create_state_writes_absolute_script_config_path_and_normalized_zip(tmp_path: Path):
@ -106,6 +223,7 @@ def test_create_state_writes_absolute_script_config_path_and_normalized_zip(tmp_
assert Path(state.trace_file_path).is_absolute()
config_text = Path(state.config_path).read_text(encoding="utf-8")
assert f"ZIP_FILE_PATH={package_path.resolve()}" in config_text
assert "PARENT_VERSION_NUMBER=\n" in config_text
def test_global_action_requires_hash_code_from_upload_package(tmp_path: Path):
@ -145,11 +263,49 @@ def test_run_deploy_flow_stops_on_verify_failure(tmp_path: Path):
agent.run_deploy_flow(state)
assert state.pending_confirmation == "rollback-ip:192.168.1.10"
verify_calls = [call for call in fake.calls if call[0] == "verify-ip" and call[1].get("ip") == "192.168.1.10"]
assert len(verify_calls) == 2
assert state.pending_confirmation == ""
assert state.paused is True
assert state.pause_reason == "action_failed"
assert state.ip_states["192.168.1.10"]["status"] == "FAILED"
assert state.ip_states["192.168.1.10"]["rollback_status"] == "PENDING_AGENT_CONFIRMATION"
assert state.ip_states["192.168.1.10"]["failed_stage"] == "verify-ip"
assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN"
assert "192.168.1.11" not in state.ip_states
assert any(event["type"] == "CONFIRMATION_REQUIRED" for event in state.events)
assert any(event["type"] == "ACTION_RETRY_REQUIRED" for event in state.events)
assert not any(call[0] == "download-log" for call in fake.calls)
def test_resume_retries_failed_ip_action_without_rollback(tmp_path: Path):
fake = FakeActionRunner(
{
"verify-ip:192.168.1.10": {
"ACTION": "verify-ip",
"IP": "192.168.1.10",
"SUCCESS": "false",
"MESSAGE": "health check failed",
}
}
)
agent = PamDeployAgent(fake_runner=fake)
state = agent.create_state(
params=PARAMS,
execution_strategy="fake",
config_path=str(tmp_path / "config.txt"),
)
agent.run_deploy_flow(state)
fake.fixtures = {}
agent.resume_state(state)
agent.run_deploy_flow(state)
assert state.pending_confirmation == ""
assert state.paused is False
assert state.last_failed_step == ""
assert state.ip_states["192.168.1.10"]["status"] == "SUCCESS"
assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN"
assert state.ip_states["192.168.1.11"]["status"] == "SUCCESS"
assert not any(call[0] == "rollback-ip" for call in fake.calls)
def test_action_analysis_event_is_recorded_when_enabled(tmp_path: Path):
@ -196,11 +352,35 @@ def test_successful_action_can_be_blocked_by_llm_review(tmp_path: Path):
assert state.paused is True
assert state.pause_reason == "llm_review_blocked"
assert state.last_failed_step == "get-token"
assert state.completed_global_steps == ["get-token"]
assert state.completed_global_steps == []
assert state.review_context["stage"] == "get-token"
assert state.review_context["suggested_action"] == "stop and inspect"
def test_resume_retries_llm_blocked_global_action(tmp_path: Path):
fake = FakeActionRunner()
agent = PamDeployAgent(
fake_runner=fake,
llm_client=BlockingOnceReviewLlmClient(),
)
state = agent.create_state(
params=PARAMS,
execution_strategy="fake",
config_path=str(tmp_path / "config.txt"),
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
agent.run_deploy_flow(state)
agent.resume_state(state)
agent.run_deploy_flow(state)
called_actions = [call[0] for call in fake.calls]
assert called_actions[:2] == ["get-token", "get-token"]
assert called_actions.count("get-token") == 2
assert state.paused is False
assert state.completed_global_steps[0] == "get-token"
def test_action_review_failure_pauses_flow(tmp_path: Path):
agent = PamDeployAgent(
fake_runner=FakeActionRunner(),
@ -219,10 +399,11 @@ def test_action_review_failure_pauses_flow(tmp_path: Path):
assert state.pause_reason == "llm_review_blocked"
assert state.review_context["stage"] == "get-token"
assert "LLM 审核失败" in state.review_context["possible_reason"]
assert state.completed_global_steps == []
assert any(event["type"] == "ACTION_ANALYSIS_FAIL" for event in state.events)
def test_confirm_pending_rollback_runs_rollback_and_resume_continues(tmp_path: Path):
def test_explicit_rollback_runs_rollback_and_resume_continues(tmp_path: Path):
fake = FakeActionRunner(
{
"verify-ip:192.168.1.10": {
@ -241,18 +422,16 @@ def test_confirm_pending_rollback_runs_rollback_and_resume_continues(tmp_path: P
)
agent.run_deploy_flow(state)
request = agent.build_confirmation_request(state)
agent.confirm_pending(state, approved=True)
agent.rollback_ip(state, "192.168.1.10")
agent.run_deploy_flow(state)
assert request["type"] == "rollback-ip"
assert state.pending_confirmation == ""
assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE"
assert state.ip_states["192.168.1.11"]["status"] == "SUCCESS"
assert any(call[0] == "rollback-ip" for call in fake.calls)
def test_failed_rollback_keeps_confirmation_pending(tmp_path: Path):
def test_failed_explicit_rollback_pauses_without_confirmation(tmp_path: Path):
fake = FakeActionRunner(
{
"verify-ip:192.168.1.10": {
@ -277,9 +456,11 @@ def test_failed_rollback_keeps_confirmation_pending(tmp_path: Path):
)
agent.run_deploy_flow(state)
agent.confirm_pending(state, approved=True)
agent.rollback_ip(state, "192.168.1.10")
assert state.pending_confirmation == "rollback-ip:192.168.1.10"
assert state.pending_confirmation == ""
assert state.paused is True
assert state.pause_reason == "rollback_failed"
assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_FAILED"

View File

@ -1,4 +1,5 @@
import builtins
import sys
from pathlib import Path
import pytest
@ -6,7 +7,7 @@ import pytest
from pam_deploy_graph.agent import PamDeployAgent
from pam_deploy_graph.fake_runner import FakeActionRunner
from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input
from pam_deploy_graph.models import LlmActionAnalysis
from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult, LlmSingleActionProposal
PARAMS = {
@ -18,11 +19,13 @@ PARAMS = {
"MODULE_NAME": "Node",
"VERSION_NUMBER": "2.0.5",
"ZIP_FILE_PATH": "C:/pkg.zip",
"VERIFY_INTERVAL_SEC": 0,
"VERIFY_MAX_ATTEMPTS": 2,
}
class BlockingReviewLlmClient:
def analyze_action_result(self, *, action, result, state_summary):
def analyze_action_result(self, *, action, result):
return LlmActionAnalysis(
action=action,
has_anomaly=True,
@ -35,6 +38,108 @@ class BlockingReviewLlmClient:
)
class FakeTestableLlmClient:
def __init__(self) -> None:
self.requests: list[str] = []
self.chat_requests: list[tuple[str, dict]] = []
self.log_requests: list[tuple[str, str | None, str]] = []
self.proposal_requests: list[str] = []
def understand_request(self, text: str) -> LlmIntentResult:
self.requests.append(text)
return LlmIntentResult(
intent="deploy",
mode_preference="MCP",
strategy_preference="hybrid_node_mcp",
confidence=0.91,
reasons=["test ok"],
)
def extract_params(self, text, base_params=None):
raise AssertionError("llm test should only call understand_request")
def generate_plan(self, *, params, intent, strategy):
raise AssertionError("llm test should only call understand_request")
def analyze_action_result(self, *, action, result):
return LlmActionAnalysis(action=action)
def chat(self, text, context=None):
self.chat_requests.append((text, context or {}))
return f"chat answer: {text}"
def analyze_log(self, log_text, question=None, source_path=""):
self.log_requests.append((log_text, question, source_path))
return "log analysis answer"
def propose_action(self, text, allowed_actions, params, state_summary=None):
self.proposal_requests.append(text)
return LlmSingleActionProposal(
action="verify-ip" if "verify" in text else "get-online-ips",
ip="192.168.1.10" if "192.168.1.10" in text else "",
reason="test proposal",
risk_level="medium",
requires_confirmation=True,
)
class StreamingChatLlmClient(FakeTestableLlmClient):
def __init__(self) -> None:
super().__init__()
self.stream_requests: list[tuple[str, dict]] = []
def chat_stream(self, text, context=None):
self.stream_requests.append((text, context or {}))
yield "第一句。"
yield "<think>隐藏思考</think>"
yield "第二句。"
class BrokenStreamingChatLlmClient(FakeTestableLlmClient):
def chat_stream(self, text, context=None):
raise RuntimeError("stream unavailable")
class FlakyVerifyRunner(FakeActionRunner):
"""第一次 verify-ip 失败,后续恢复成功,用于覆盖断点重试。"""
def __init__(self) -> None:
super().__init__()
self.verify_calls = 0
def _fixture_for(self, action, kwargs):
if action == "verify-ip" and kwargs.get("ip") == "192.168.1.10":
self.verify_calls += 1
if self.verify_calls == 1:
return {
"ACTION": "verify-ip",
"IP": "192.168.1.10",
"SUCCESS": "false",
"MESSAGE": "health check failed",
}
return super()._fixture_for(action, kwargs)
class ChatProgressRunner(FakeActionRunner):
"""让 chat fake 部署产生一次可见的进度更新。"""
def __init__(self) -> None:
super().__init__()
self.download_progress = ["40", "100"]
def _fixture_for(self, action, kwargs):
if action == "poll-download-progress":
rate = self.download_progress.pop(0) if self.download_progress else "100"
return {
"ACTION": action,
"STEP": "DONE" if rate == "100" else "RUNNING",
"RATE_OF_PROGRESS": rate,
"MSG": "success" if rate == "100" else "running",
"MESSAGE": f"download {rate}%",
}
return super()._fixture_for(action, kwargs)
def run_session(session: InteractiveCliSession, inputs: list[str]) -> list[str]:
output: list[str] = []
iterator = iter(inputs)
@ -93,9 +198,27 @@ def test_chat_run_prints_action_progress(tmp_path: Path):
assert any("分析完成: verify-ip" in item for item in output)
def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path):
def test_chat_run_prints_progress_poll_updates(tmp_path: Path):
checkpoint = tmp_path / "checkpoint.json"
session = InteractiveCliSession(
agent=PamDeployAgent(),
agent=PamDeployAgent(fake_runner=ChatProgressRunner()),
params={**PARAMS, "POLL_INTERVAL_SEC": 0},
strategy="fake",
checkpoint_path=str(checkpoint),
)
output = run_session(session, ["run", "yes", "yes", "yes", "exit"])
assert any("进度更新: poll-download-progress" in item for item in output)
assert any("RATE_OF_PROGRESS=40" in item for item in output)
assert session.state is not None
assert "poll-download-progress" in session.state.completed_global_steps
def test_chat_greeting_goes_to_llm_chat_without_structured_analysis(tmp_path: Path):
llm = FakeTestableLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
@ -104,10 +227,208 @@ def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path):
output = run_session(session, ["你好", "exit"])
assert session.last_analysis is None
assert any("可以输入 help 查看命令" in item for item in output)
assert llm.chat_requests[0][0] == "你好"
assert any("正在询问 LLM: FakeTestableLlmClient" in item for item in output)
assert any("chat answer: 你好" in item for item in output)
assert not any("已生成结构化理解" in item for item in output)
def test_chat_ask_command_uses_llm_chat(tmp_path: Path):
llm = FakeTestableLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 这个 agent 能做什么", "exit"])
assert llm.chat_requests[0][0] == "这个 agent 能做什么"
assert llm.chat_requests[0][1]["params"]["CLIENT_SECRET"] == "***"
assert any("chat answer: 这个 agent 能做什么" in item for item in output)
def test_chat_ask_uses_streaming_chat_when_available(tmp_path: Path):
llm = StreamingChatLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 你好", "exit"])
assert llm.stream_requests[0][0] == "你好"
assert llm.chat_requests == []
assert any("第一句。" in item for item in output)
assert any("第二句。" in item for item in output)
assert not any("隐藏思考" in item or "<think>" in item for item in output)
def test_chat_ask_falls_back_when_streaming_fails(tmp_path: Path):
llm = BrokenStreamingChatLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 你好", "exit"])
assert llm.chat_requests[0][0] == "你好"
assert any("LLM 流式输出失败,改用普通请求" in item for item in output)
assert any("chat answer: 你好" in item for item in output)
def test_chat_ask_strips_think_from_non_streaming_chat(tmp_path: Path):
class ThinkChatLlmClient(FakeTestableLlmClient):
def chat(self, text, context=None):
self.chat_requests.append((text, context or {}))
return "可见<think>隐藏思考</think>结论"
llm = ThinkChatLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 你好", "exit"])
assert any("可见结论" in item for item in output)
assert not any("隐藏思考" in item or "<think>" in item for item in output)
def test_chat_log_analyze_reads_tail_and_redacts(tmp_path: Path):
llm = FakeTestableLlmClient()
log_path = tmp_path / "agent.log"
log_path.write_text(
"\n".join(
[
"line 1 CLIENT_SECRET=real-secret",
"line 2 ok",
"line 3 ERROR failed",
]
),
encoding="utf-8",
)
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, [f"log analyze {log_path} 请找异常 --tail 2", "exit"])
log_text, question, source_path = llm.log_requests[0]
assert "line 1" not in log_text
assert "real-secret" not in log_text
assert "line 3 ERROR failed" in log_text
assert question == "请找异常"
assert source_path == str(log_path)
assert any("log analysis answer" in item for item in output)
def test_chat_log_analyze_strips_think_from_answer(tmp_path: Path):
class ThinkLogLlmClient(FakeTestableLlmClient):
def analyze_log(self, log_text, question=None, source_path=""):
self.log_requests.append((log_text, question, source_path))
return "<think>隐藏日志分析</think>日志结论"
llm = ThinkLogLlmClient()
log_path = tmp_path / "agent.log"
log_path.write_text("ERROR failed", encoding="utf-8")
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, [f"log analyze {log_path}", "exit"])
assert any("日志结论" in item for item in output)
assert not any("隐藏日志分析" in item or "<think>" in item for item in output)
def test_chat_action_propose_only_shows_plan_without_execution(tmp_path: Path):
llm = FakeTestableLlmClient()
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake, llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action propose 请 verify-ip 192.168.1.10", "exit"])
assert llm.proposal_requests == ["请 verify-ip 192.168.1.10"]
assert fake.calls == []
assert any("单 action 计划" in item for item in output)
assert any("- action: verify-ip" in item for item in output)
def test_chat_action_run_llm_requires_confirmation_before_execution(tmp_path: Path):
llm = FakeTestableLlmClient()
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake, llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "no", "exit"])
assert fake.calls == []
assert any("已取消单 action 执行" in item for item in output)
output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "yes", "exit"])
assert ("verify-ip", {"ip": "192.168.1.10"}) in fake.calls
assert session.state is not None
assert any(event["type"] == "SINGLE_ACTION_DONE" for event in session.state.events)
assert any("单 action 执行完成" in item for item in output)
def test_chat_action_run_missing_ip_is_friendly(tmp_path: Path):
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run verify-ip", "exit"])
assert fake.calls == []
assert any("需要提供 ip" in item for item in output)
def test_chat_action_run_manual_executes_fake_action(tmp_path: Path):
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run get-online-ips", "yes", "exit"])
assert ("get-online-ips", {"ip": None}) in fake.calls
assert session.state is not None
assert session.state.online_ips == ["192.168.1.10", "192.168.1.11"]
assert any("单 action 执行完成" in item for item in output)
def test_chat_preflight_blocks_missing_zip_path_before_confirm(tmp_path: Path):
missing_package = tmp_path / "missing.zip"
session = InteractiveCliSession(
@ -141,7 +462,28 @@ def test_chat_action_failure_does_not_report_langgraph_unavailable(tmp_path: Pat
assert not any("LangGraph 确认运行器不可用" in item for item in output)
def test_chat_approve_then_resume_continues_after_failed_ip(tmp_path: Path):
def test_chat_resume_retries_failed_ip_without_rollback(tmp_path: Path):
fake = FlakyVerifyRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["run", "yes", "yes", "yes", "exit"])
assert session.state is not None
assert session.state.pending_confirmation == ""
assert session.state.paused is False
assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN"
assert session.state.ip_states["192.168.1.10"]["status"] == "SUCCESS"
assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS"
assert not any(call[0] == "rollback-ip" for call in fake.calls)
assert any("进度更新: verify-ip" in item for item in output)
def test_chat_explicit_rollback_command_rolls_back_failed_ip(tmp_path: Path):
fake = FakeActionRunner(
{
"verify-ip:192.168.1.10": {
@ -159,12 +501,14 @@ def test_chat_approve_then_resume_continues_after_failed_ip(tmp_path: Path):
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
run_session(session, ["run", "yes", "yes", "yes", "approve", "resume", "exit"])
output = run_session(session, ["run", "yes", "yes", "yes", "rollback", "resume", "exit"])
assert session.state is not None
assert session.state.pending_confirmation == ""
assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE"
assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS"
assert any(call[0] == "rollback-ip" for call in fake.calls)
assert any("回滚已完成;如需继续主流程,输入 resume" in item for item in output)
def test_chat_params_events_and_checkpoint_commands(tmp_path: Path):
@ -185,7 +529,7 @@ def test_chat_params_events_and_checkpoint_commands(tmp_path: Path):
"yes",
"yes",
"yes",
"events 2",
"events 20",
"list checkpoints",
"load checkpoint " + str(checkpoint),
"exit",
@ -260,6 +604,24 @@ def test_chat_llm_review_block_message_is_visible(tmp_path: Path):
assert any("如需继续,输入 resume" in item for item in output)
def test_chat_llm_test_command_uses_current_client(tmp_path: Path):
llm = FakeTestableLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["llm test 检查模型", "exit"])
assert llm.requests == ["检查模型"]
assert any("正在测试 LLM: FakeTestableLlmClient" in item for item in output)
assert any("LLM 测试通过" in item for item in output)
assert any("- intent: deploy" in item for item in output)
assert any("- strategy: hybrid_node_mcp" in item for item in output)
def test_chat_can_hot_load_mcp_config(tmp_path: Path):
mcp_config = tmp_path / "mcp.json"
mcp_config.write_text('{"transport": "stdio", "command": "python"}', encoding="utf-8")
@ -285,3 +647,15 @@ def test_prompt_history_creates_runtime_dir(tmp_path: Path, monkeypatch):
assert callable(prompt)
assert (tmp_path / "runtime").is_dir()
def test_prompt_toolkit_enabled_when_frozen(tmp_path: Path, monkeypatch):
pytest.importorskip("prompt_toolkit")
monkeypatch.chdir(tmp_path)
monkeypatch.setattr(sys, "frozen", True, raising=False)
prompt = _build_prompt_input(builtins.input)
assert callable(prompt)
assert prompt is not builtins.input
assert (tmp_path / "runtime").is_dir()

View File

@ -14,10 +14,12 @@ PARAMS = {
"MODULE_NAME": "Node",
"VERSION_NUMBER": "2.0.5",
"ZIP_FILE_PATH": "C:/pkg.zip",
"VERIFY_INTERVAL_SEC": 0,
"VERIFY_MAX_ATTEMPTS": 2,
}
def test_langgraph_runtime_interrupts_and_resumes_confirmation(tmp_path: Path):
def test_langgraph_runtime_pauses_failure_and_resume_retries(tmp_path: Path):
fake = FakeActionRunner(
{
"verify-ip:192.168.1.10": {
@ -39,16 +41,22 @@ def test_langgraph_runtime_interrupts_and_resumes_confirmation(tmp_path: Path):
first = runtime.start(state)
assert first.interrupted is True
assert runtime.waiting_confirmation is True
assert first.confirmation["type"] == "rollback-ip"
assert first.confirmation["ip"] == "192.168.1.10"
assert first.interrupted is False
assert runtime.waiting_confirmation is False
assert first.confirmation == {}
assert first.state is not None
assert first.state.paused is True
assert first.state.pending_confirmation == ""
assert first.state.ip_states["192.168.1.10"]["failed_stage"] == "verify-ip"
second = runtime.resume(approved=True)
fake.fixtures = {}
agent.resume_state(first.state)
second = runtime.start(first.state)
assert second.interrupted is False
assert runtime.waiting_confirmation is False
assert second.state is not None
assert second.state.pending_confirmation == ""
assert second.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE"
assert second.state.paused is False
assert second.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN"
assert second.state.ip_states["192.168.1.11"]["status"] == "SUCCESS"

View File

@ -1,4 +1,5 @@
from dataclasses import asdict
import json
from pam_deploy_graph.agent import PamDeployAgent
from pam_deploy_graph.checkpoint_store import redact_mapping
@ -20,13 +21,22 @@ def test_understand_request_prefers_hybrid_for_mcp():
def test_extract_params_from_key_value_text():
result = RuleBasedLlmClient().extract_params(
"HOME_BASE_URL=https://x CLIENT_ID=id CLIENT_SECRET=s AIRPORT_CODE=HET "
"APP_NAME=PAM MODULE_NAME=Node VERSION_NUMBER=2.0.5 ZIP_FILE_PATH=C:/pkg.zip"
"APP_NAME=PAM MODULE_NAME=Node VERSION_NUMBER=2.0.5 "
"parentVersionNumber=2.0.4 ZIP_FILE_PATH=C:/pkg.zip"
)
assert result.extracted_params["AIRPORT_CODE"] == "HET"
assert result.extracted_params["PARENT_VERSION_NUMBER"] == "2.0.4"
assert result.missing_required_params == []
assert "CLIENT_SECRET" in result.sensitive_fields_present
def test_extract_parent_version_from_chinese_text():
result = RuleBasedLlmClient().extract_params("请部署版本 2.0.5,云下载继承版本 2.0.4 的规则")
assert result.extracted_params["VERSION_NUMBER"] == "2.0.5"
assert result.extracted_params["PARENT_VERSION_NUMBER"] == "2.0.4"
def test_analyze_request_returns_structured_objects():
agent = PamDeployAgent()
result = agent.analyze_request(
@ -44,29 +54,7 @@ def test_analyze_request_returns_structured_objects():
)
payload = {key: asdict(value) for key, value in result.items()}
assert payload["intent"]["intent"] == "preview"
assert payload["mode_decision"]["mode"] == "fixed_runtime"
assert payload["plan"]["execution_strategy"] == "hybrid_node_mcp"
assert payload["plan"]["planned_actions"][:3] == ["get-token", "create-version", "upload-package"]
def test_rule_based_mode_decision_can_choose_agentic_skill():
agent = PamDeployAgent()
result = agent.analyze_request(
"请按 skill 自主编排并自动选择工具,帮我排查 HET PAM Node 部署异常",
{
"HOME_BASE_URL": "https://x",
"CLIENT_ID": "id",
"CLIENT_SECRET": "s",
"AIRPORT_CODE": "HET",
"APP_NAME": "PAM",
"MODULE_NAME": "Node",
"VERSION_NUMBER": "2.0.5",
"ZIP_FILE_PATH": "C:/pkg.zip",
},
)
assert result["mode_decision"].mode == "agentic_skill"
assert "自主编排" in result["mode_decision"].reason
def test_analyze_payload_can_be_redacted():
@ -241,13 +229,206 @@ def test_openai_compatible_client_analyzes_action_result_with_redaction():
ok=False,
values={"CLIENT_SECRET": "real-secret", "SUCCESS": "false"},
stderr="x" * 1200,
error_summary="failed",
),
state_summary={"params": {"CLIENT_SECRET": "real-secret"}},
)
serialized_prompt = str(calls[0])
input_payload = _llm_input_payload(calls[0])
assert analysis.has_anomaly is True
assert analysis.severity == "high"
assert "real-secret" not in serialized_prompt
assert "[已截断]" in serialized_prompt
assert "state_summary" not in input_payload
assert input_payload["result"]["diagnostic_log"].startswith("[已截断]...")
def test_openai_compatible_client_omits_success_script_logs_from_action_review():
calls = []
def transport(url, headers, payload, timeout_sec):
calls.append(payload)
return {
"choices": [
{
"message": {
"content": (
'{"action":"get-online-ips","has_anomaly":false,"severity":"info",'
'"possible_reason":"","suggested_action":"continue",'
'"requires_confirmation":false,"should_continue":true}'
)
}
}
]
}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
transport=transport,
)
client.analyze_action_result(
action="get-online-ips",
result=ActionResult(
action="get-online-ips",
backend="script",
ok=True,
values={"ACTION": "get-online-ips", "COUNT": "1", "IP": ["10.4.1.1"]},
stdout="ACTION=get-online-ips\nCOUNT=1\nIP=10.4.1.1\n",
stderr="[INFO] [FLOW][START] get_token\n[INFO] [FLOW][DONE] get_online_ips\n",
),
)
input_payload = _llm_input_payload(calls[0])
result_payload = input_payload["result"]
assert "diagnostic_log" not in result_payload
assert "stdout" not in result_payload
assert "stderr" not in result_payload
assert "[FLOW][START]" not in json.dumps(input_payload, ensure_ascii=False)
def test_openai_compatible_client_supports_plain_chat():
calls = []
def transport(url, headers, payload, timeout_sec):
calls.append(payload)
return {"choices": [{"message": {"content": "普通回答"}}]}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
transport=transport,
)
answer = client.chat("你好", context={"CLIENT_SECRET": "real-secret"})
serialized_prompt = str(calls[0])
assert answer == "普通回答"
assert "response_format" not in calls[0]
assert "real-secret" not in serialized_prompt
assert "不要自动触发部署" in calls[0]["messages"][0]["content"]
assert "不要输出 `<think>`" in calls[0]["messages"][0]["content"]
def test_openai_compatible_client_strips_think_from_plain_chat():
def transport(url, headers, payload, timeout_sec):
return {"choices": [{"message": {"content": "开头<think>内部思考</think>结论"}}]}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
transport=transport,
)
answer = client.chat("你好")
assert answer == "开头结论"
def test_openai_compatible_client_streams_plain_chat_and_filters_think():
calls = []
def stream_transport(url, headers, payload, timeout_sec):
calls.append((url, headers, payload, timeout_sec))
return iter(["开头", "<thi", "nk>内部思考", "</think>结论", ""])
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
stream_transport=stream_transport,
)
answer = "".join(client.chat_stream("你好", context={"CLIENT_SECRET": "real-secret"}))
assert answer == "开头结论。"
assert calls[0][0] == "https://llm.example/v1/chat/completions"
assert calls[0][1]["Authorization"] == "Bearer secret-key"
assert calls[0][2]["stream"] is True
assert "response_format" not in calls[0][2]
assert "real-secret" not in json.dumps(calls[0][2], ensure_ascii=False)
def test_openai_compatible_client_analyzes_log_with_redaction():
calls = []
def transport(url, headers, payload, timeout_sec):
calls.append(payload)
return {"choices": [{"message": {"content": "<think>隐藏分析</think>日志分析"}}]}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
transport=transport,
)
answer = client.analyze_log("ERROR CLIENT_SECRET=real-secret", question="为什么失败", source_path="agent.log")
input_payload = _llm_input_payload(calls[0])
assert answer == "日志分析"
assert input_payload["source_path"] == "agent.log"
assert input_payload["question"] == "为什么失败"
assert "real-secret" not in json.dumps(input_payload, ensure_ascii=False)
assert "不要因为日志来自 stderr" in calls[0]["messages"][0]["content"]
assert "不要输出 `<think>`" in calls[0]["messages"][0]["content"]
def test_openai_compatible_client_proposes_single_action():
calls = []
def transport(url, headers, payload, timeout_sec):
calls.append(payload)
return {
"choices": [
{
"message": {
"content": (
'{"action":"verify-ip","ip":"192.168.1.10","kwargs":{"timeout_sec":10},'
'"reason":"用户要求健康检查","risk_level":"low","requires_confirmation":false}'
)
}
}
]
}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
transport=transport,
)
proposal = client.propose_action(
"检查 192.168.1.10",
["verify-ip", "get-online-ips"],
{"CLIENT_SECRET": "real-secret"},
state_summary={"node_url_present": True},
)
input_payload = _llm_input_payload(calls[0])
assert proposal.action == "verify-ip"
assert proposal.ip == "192.168.1.10"
assert proposal.kwargs == {"timeout_sec": 10}
assert proposal.risk_level == "low"
assert proposal.requires_confirmation is True
assert "real-secret" not in json.dumps(input_payload, ensure_ascii=False)
def test_rule_based_client_proposes_only_explicit_action():
client = RuleBasedLlmClient()
proposal = client.propose_action("请 verify-ip 192.168.1.10", ["verify-ip"], {}, {})
unknown = client.propose_action("帮我检查一下", ["verify-ip"], {}, {})
assert proposal.action == "verify-ip"
assert proposal.ip == "192.168.1.10"
assert unknown.action == ""
def _llm_input_payload(request_payload):
content = request_payload["messages"][1]["content"]
_, _, raw_json = content.partition("输入 JSON:\n")
return json.loads(raw_json)

View File

@ -0,0 +1,83 @@
from pam_deploy_graph.llm.text_filter import filter_thinking_chunks, strip_thinking_text
def test_strip_thinking_text_removes_complete_block():
text = "开头<think>这里是很长的内部思考\n不应该展示</think>结论"
assert strip_thinking_text(text) == "开头结论"
def test_strip_thinking_text_removes_unclosed_block():
text = "可见内容\n<THINK>未闭合的内部思考不应该展示"
assert strip_thinking_text(text) == "可见内容"
def test_filter_thinking_chunks_handles_split_tags():
chunks = ["回答", "<thi", "nk>隐藏", "内容</th", "ink>继续。"]
visible = list(filter_thinking_chunks(chunks))
assert "".join(visible) == "回答继续。"
def test_filter_thinking_chunks_drops_unclosed_think_tail():
chunks = ["回答", "<think>", "隐藏内容"]
visible = list(filter_thinking_chunks(chunks))
assert "".join(visible) == "回答"
def test_strip_thinking_text_removes_explicit_thinking_process_without_tags():
text = """Thinking Process:
1. **Analyze the Request:**
* Input: JSON object containing context and user_text ("你是谁?").
* Role: PAM Deployment Agent Interaction Assistant.
* Constraints:
* Do NOT automatically trigger deployment, rollback, upgrade, script execution, or MCP calls.
* Do NOT output secrets.
2. **Determine the Response:**
* The user is asking about my identity.
* I need to introduce myself briefly.
3. **Drafting the Response:**
* Greeting/Identity: 我是 PAM 部署 Agent 的交互助手
* Function: 我可以回答普通问题解释命令和部署流程
4. **Refining the Response:**
* Keep it concise and friendly.
我是 PAM 部署 Agent 的交互助手
我可以回答普通问题解释当前 Agent 的命令和部署流程
"""
visible = strip_thinking_text(text)
assert "Thinking Process" not in visible
assert "Analyze the Request" not in visible
assert "Determine the Response" not in visible
assert "Drafting the Response" not in visible
assert "我是 PAM 部署 Agent 的交互助手。" in visible
assert visible.startswith("我是 PAM 部署 Agent")
def test_strip_thinking_text_keeps_content_after_final_answer_marker():
text = """Reasoning Process:
I should not expose this.
Final Answer: 可以我只展示最终回答
"""
assert strip_thinking_text(text) == "可以,我只展示最终回答。"
def test_filter_thinking_chunks_suppresses_explicit_reasoning_until_finish():
chunks = [
"Think",
"ing Process:\n",
"I should hide this reasoning.\n",
"Final Answer: ",
"这是最终回答。",
]
visible = list(filter_thinking_chunks(chunks))
assert "".join(visible) == "这是最终回答。"

View File

@ -0,0 +1,66 @@
import logging
from logging.handlers import TimedRotatingFileHandler
from pathlib import Path
from pam_deploy_graph.logging_utils import configure_logging, json_for_log, redact_for_log
def test_redact_for_log_masks_sensitive_keys_and_inline_assignments():
payload = {
"CLIENT_SECRET": "home-secret",
"api_key": "llm-key",
"nested": {
"Authorization": "Bearer token-value",
"message": "CLIENT_SECRET=abc api_key:xyz Authorization=Bearer raw-token header Bearer plain-token",
},
}
redacted = redact_for_log(payload)
serialized = json_for_log(payload)
assert redacted["CLIENT_SECRET"] == "***"
assert redacted["api_key"] == "***"
assert redacted["nested"]["Authorization"] == "***"
assert "home-secret" not in serialized
assert "llm-key" not in serialized
assert "token-value" not in serialized
assert "CLIENT_SECRET=***" in serialized
assert "api_key:***" in serialized
assert "Authorization=***" in serialized
assert "Bearer ***" in serialized
assert "raw-token" not in serialized
assert "plain-token" not in serialized
def test_configure_logging_uses_daily_rotation_and_retention(tmp_path: Path):
log_path = tmp_path / "pam_deploy_agent.log"
package_logger = logging.getLogger("pam_deploy_graph")
previous_handlers = list(package_logger.handlers)
for handler in previous_handlers:
package_logger.removeHandler(handler)
try:
result = configure_logging(log_file=log_path, level="DEBUG", retention_days=3)
assert result == log_path
handlers = [handler for handler in package_logger.handlers if isinstance(handler, TimedRotatingFileHandler)]
assert len(handlers) == 1
handler = handlers[0]
assert Path(handler.baseFilename) == log_path.resolve()
assert handler.when == "MIDNIGHT"
assert handler.backupCount == 3
assert package_logger.level == logging.DEBUG
configure_logging(log_file=log_path, level="INFO", retention_days=5)
handlers = [handler for handler in package_logger.handlers if isinstance(handler, TimedRotatingFileHandler)]
assert len(handlers) == 1
assert handlers[0] is handler
assert handler.backupCount == 5
assert package_logger.level == logging.INFO
finally:
for handler in list(package_logger.handlers):
package_logger.removeHandler(handler)
handler.close()
for handler in previous_handlers:
package_logger.addHandler(handler)

View File

@ -215,6 +215,51 @@ def test_mcp_runner_passes_hash_code_and_node_url():
assert calls[0][1]["nodeUrl"] == "https://pam.node"
def test_mcp_runner_passes_parent_version_only_for_download_task():
calls = []
class Client:
def call_tool(self, tool_name, arguments):
calls.append((tool_name, arguments))
return {"ACTION": "create-download-task", "SUCCESS": "true"}
runner = McpActionRunner(client=Client())
result = runner.run(
"create-download-task",
params={
"VERSION_NUMBER": "2.0.5",
"PARENT_VERSION_NUMBER": "2.0.4",
},
)
assert result.ok is True
assert calls[0][1]["versionNumber"] == "2.0.5"
assert calls[0][1]["parentVersionNumber"] == "2.0.4"
def test_mcp_runner_omits_blank_parent_version():
calls = []
class Client:
def call_tool(self, tool_name, arguments):
calls.append((tool_name, arguments))
return {"ACTION": "create-download-task", "SUCCESS": "true"}
runner = McpActionRunner(client=Client())
result = runner.run(
"create-download-task",
params={
"VERSION_NUMBER": "2.0.5",
"PARENT_VERSION_NUMBER": "",
},
)
assert result.ok is True
assert "parentVersionNumber" not in calls[0][1]
def _write_json_config(tmpdir, payload):
path = tmpdir / "mcp.json"
path.write_text(__import__("json").dumps(payload), encoding="utf-8")

View File

@ -1,3 +1,5 @@
from pathlib import Path
from pam_deploy_graph.script_runner import ScriptActionRunner
@ -46,3 +48,23 @@ def test_build_powershell_action_command():
"-RollbackStopFirst",
]
def test_shell_rollback_uses_query_parameters_not_form_body():
text = Path("doc_scripts/deploy.sh").read_text(encoding="utf-8")
start = text.index("rollback_ip()")
end = text.index("run_manual_rollback()", start)
rollback_block = text[start:end]
assert "/api/mcp/version/upgrade/rollback?${rollback_query}" in rollback_block
assert "application/x-www-form-urlencoded" not in rollback_block
def test_powershell_rollback_uses_query_parameters_not_form_body():
text = Path("doc_scripts/deploy.ps1").read_text(encoding="utf-8")
start = text.index("function Invoke-Rollback")
end = text.index("function Invoke-IpDeploy", start)
rollback_block = text[start:end]
assert "/api/mcp/version/upgrade/rollback?$query" in rollback_block
assert "-Body $body" not in rollback_block
assert "application/x-www-form-urlencoded" not in rollback_block

View File

@ -6,11 +6,6 @@ from pam_deploy_graph.skill_policy import load_skill_policy
def test_load_skill_policy_from_doc():
policy = load_skill_policy(Path("doc_scripts/PAM_AUTO_DEPLY_SKILL.md"))
assert policy.name == "pam-auto-deply"
assert "fixed_runtime" in policy.allowed_execution_modes
assert "get-token" in policy.allowed_actions
assert "CLIENT_SECRET" in policy.required_params
assert "params" in policy.required_confirmations
assert "rollback" in policy.required_confirmations
assert "script-main-flow" in policy.forbidden_actions
assert policy.action_sequence[0] == "get-token"
assert "upgrade-ip" in policy.ip_action_sequence