diff --git a/README.md b/README.md index c1c8ee3..27ab02b 100644 --- a/README.md +++ b/README.md @@ -79,9 +79,11 @@ packaging/ - 本地已安装 `langgraph` 和 `mcp`,并完成 LangGraph fake 全局流程 smoke。 - CLI `analyze` 输出已做敏感字段脱敏。 - 增加 `chat` 常驻式 CLI 对话框,支持自然语言分析、参数设置、执行确认、回滚确认、状态查看、事件查看、checkpoint 选择和续跑。 -- chat 可选启用 `rich` / `prompt_toolkit`,支持更清晰输出、命令补全和输入历史。 +- chat 在开发环境可选启用 `rich` / `prompt_toolkit`;PyInstaller 打包环境默认使用普通文本输入,避免交互兼容问题。 +- chat 执行前会归一化参数并展示实际写入脚本配置的值;`script_only` / `hybrid_node_mcp` 会提前检查 `ZIP_FILE_PATH` 是否存在。 +- chat 执行中会播报每个 action 的开始、完成或失败;action 执行失败会停在当前 checkpoint,不再误报 LangGraph 不可用。 - 增加 action 后 LLM/规则诊断,可通过 `--analyze-actions` 或 `llm action-analysis on` 显式开启。 -- 添加基础测试,当前本地结果为 `42 passed, 1 skipped`。 +- 添加基础测试,当前本地结果为 `51 passed, 2 skipped`。 未完成: @@ -253,6 +255,8 @@ PAM> preview PAM> set VERSION_NUMBER=2.0.6 PAM> run 即将执行真实 action;确认执行请输入 yes: yes +开始执行 action: get-token [backend=fake] +完成 action: get-token [backend=fake] PAM> status PAM> params PAM> events 5 @@ -265,7 +269,7 @@ PAM> resume PAM> exit ``` -`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action;如果某个 IP 失败,会通过 LangGraph interrupt 暂停并提示输入 `approve` 或 `reject [原因]`,确认后恢复同一个图线程继续执行。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model`、`--mcp-config` 和 `--analyze-actions`。 +`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。如果某个 IP 失败,会通过 LangGraph interrupt 暂停并提示输入 `approve` 或 `reject [原因]`,确认后恢复同一个图线程继续执行。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model`、`--mcp-config` 和 `--analyze-actions`。 预演: diff --git a/packaging/README_packaged_agent.md b/packaging/README_packaged_agent.md index f9c0d9f..e23cbfb 100644 --- a/packaging/README_packaged_agent.md +++ b/packaging/README_packaged_agent.md @@ -32,8 +32,9 @@ pam-deploy-agent-linux-x86_64/ ./run.sh run-deploy --help ``` -发布包默认包含 `rich` 和 `prompt_toolkit`。如果终端支持,chat 会自动启用更清晰的输出、命令补全和输入历史;不可用时会自动降级为普通文本输入输出。 +发布包默认使用普通文本输入,避免 PyInstaller 环境下 `prompt_toolkit` 兼容性问题;输出仍会在可用时使用 `rich` 做更清晰的文本展示。 chat 内的失败回滚确认由 LangGraph interrupt 托管;执行停在确认点后,输入 `approve` 或 `reject [原因]` 会恢复同一个图线程继续处理。 +chat 会在执行前归一化并展示实际写入脚本配置的参数;`script_only` / `hybrid_node_mcp` 会先检查 `ZIP_FILE_PATH` 是否存在,避免脚本运行后才用默认路径失败。执行过程中每个 action 都会输出开始、完成或失败状态。 ## 交互式使用 @@ -61,6 +62,8 @@ PAM> preview PAM> set VERSION_NUMBER=2.0.6 PAM> run 即将执行真实 action;确认执行请输入 yes: yes +开始执行 action: get-token [backend=fake] +完成 action: get-token [backend=fake] PAM> status PAM> params PAM> events 5 @@ -199,5 +202,6 @@ MCP token 获取方式与 HOME 一致,默认按 `client_credentials` POST 到 ## 注意事项 - 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL`、`CLIENT_ID`、`CLIENT_SECRET`、`AIRPORT_CODE`、`APP_NAME`、`MODULE_NAME`、`VERSION_NUMBER`、`ZIP_FILE_PATH`。 +- `chat` 中输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时请直接描述部署任务,或显式使用 `analyze <需求>`。 - `checkpoint` 会保存完整运行参数,请放在受控目录。 - `hybrid_node_mcp`、`resume`、`confirm` 如果需要执行 MCP action,请同时传入 `--mcp-config`。 diff --git a/packaging/build_linux_self_contained.sh b/packaging/build_linux_self_contained.sh index 550d0da..118ff81 100644 --- a/packaging/build_linux_self_contained.sh +++ b/packaging/build_linux_self_contained.sh @@ -121,7 +121,7 @@ PAM 部署 Agent 解压即用包 --confirm 非交互命令执行真实 action 前必须显式传入。 - chat 模式会在会话中要求输入 run 和 yes。 + chat 模式会在会话中要求输入 run,并分别确认参数、目标范围和最终执行。 --analyze-actions 每个 action 完成后追加 LLM/规则诊断建议。诊断只作为辅助建议, @@ -164,8 +164,10 @@ LLM 环境变量: 2. doc_scripts 只包含运行必需文件:deploy.sh、config.txt.example、PAM_AUTO_DEPLY_SKILL.md。 3. mcp_client.example.json 是 MCP server URL + 独立鉴权配置示例,需要按真实 MCP server 修改。 4. confirm 会通过 LangGraph interrupt resume 处理确认,并继续后续图节点;进程中断时再使用 resume。 - 5. chat 内可使用 params、events、list checkpoints、load checkpoint、llm config、mcp config 等命令。 - 6. checkpoint 会保存完整运行参数,请放在受控目录。 + 5. chat 会在执行前归一化并展示实际写入脚本配置的参数;script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。 + 6. chat 执行过程中会播报每个 action 的开始、完成或失败;普通问候不会触发 LLM/结构化分析。 + 7. chat 内可使用 params、events、list checkpoints、load checkpoint、llm config、mcp config 等命令。 + 8. checkpoint 会保存完整运行参数,请放在受控目录。 HELP_TEXT } diff --git a/pam_deploy_graph/action_router.py b/pam_deploy_graph/action_router.py index 22a7f33..48622cb 100644 --- a/pam_deploy_graph/action_router.py +++ b/pam_deploy_graph/action_router.py @@ -45,7 +45,16 @@ class ActionRouter: if backend == "mcp": if self.mcp_runner is None: raise RuntimeError(f"action 需要 MCP runner: {action}") - return self.mcp_runner.run(action, params=state.params, **kwargs) + mcp_kwargs = dict(kwargs) + hash_code = mcp_kwargs.pop("hash_code", None) or state.hash_code + node_url = mcp_kwargs.pop("node_url", None) or state.node_url + return self.mcp_runner.run( + action, + params=state.params, + hash_code=hash_code, + node_url=node_url, + **mcp_kwargs, + ) if self.fake_runner is None: raise RuntimeError(f"action 需要 fake runner: {action}") return self.fake_runner.run(action, params=state.params, **kwargs) diff --git a/pam_deploy_graph/agent.py b/pam_deploy_graph/agent.py index 3ef45a8..5b4b8ea 100644 --- a/pam_deploy_graph/agent.py +++ b/pam_deploy_graph/agent.py @@ -6,10 +6,11 @@ from __future__ import annotations +import re import time from dataclasses import asdict from pathlib import Path -from typing import Any +from typing import Any, Callable from .action_router import ActionRouter, build_action_backends from .checkpoint_store import save_checkpoint @@ -18,10 +19,15 @@ from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENC from .fake_runner import FakeActionRunner from .llm import LlmClient, RuleBasedLlmClient, validate_deploy_plan, validate_intent_result from .mcp_runner import McpActionRunner -from .models import AgentState, ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult +from .models import ActionResult, AgentState, ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult from .script_runner import ScriptActionRunner, select_script_entry from .skill_policy import load_skill_policy +REQUIRED_ACTION_VALUES = { + "upload-package": ("HASH_CODE",), + "get-node-url": ("NODE_URL",), +} + class PamDeployAgent: """PAM 部署主 Agent,串联 LLM、action 路由、确认和续跑状态。""" @@ -35,6 +41,7 @@ class PamDeployAgent: fake_runner: FakeActionRunner | None = None, llm_client: LlmClient | None = None, action_analysis_enabled: bool = False, + progress_callback: Callable[[dict[str, Any]], None] | None = None, ) -> None: """初始化策略、脚本 runner、MCP runner、fake runner 和 LLM client。""" self.skill_policy = load_skill_policy(skill_path) @@ -44,6 +51,7 @@ class PamDeployAgent: self.mcp_runner = mcp_runner self.llm_client = llm_client or RuleBasedLlmClient() self.action_analysis_enabled = action_analysis_enabled + self.progress_callback = progress_callback self.router = ActionRouter( script_runner=self.script_runner, mcp_runner=mcp_runner, @@ -94,6 +102,7 @@ class PamDeployAgent: missing = [key for key in REQUIRED_PARAMS if not normalized.get(key)] if missing: raise ValueError(f"缺少必填参数: {', '.join(missing)}") + normalized["ZIP_FILE_PATH"] = _normalize_local_file_path(str(normalized["ZIP_FILE_PATH"]).strip()) return normalized def _choose_strategy(self, preference: str) -> ExecutionStrategy: @@ -119,8 +128,8 @@ class PamDeployAgent: actual_run_id = run_id or time.strftime("%Y%m%d_%H%M%S") actual_script_entry = script_entry or select_script_entry() runtime_dir = Path("runtime") - actual_config_path = config_path or str(runtime_dir / f"config_{actual_run_id}.txt") - actual_trace_path = trace_file_path or str(Path("logs") / f"api_trace_{actual_run_id}.log") + actual_config_path = _absolute_path(config_path or runtime_dir / f"config_{actual_run_id}.txt") + actual_trace_path = _absolute_path(trace_file_path or Path("logs") / f"api_trace_{actual_run_id}.log") write_config(normalized, actual_config_path) return AgentState( run_id=actual_run_id, @@ -129,8 +138,8 @@ class PamDeployAgent: action_backends=build_action_backends(execution_strategy), script_entry=actual_script_entry, script_base_dir=str(self.script_base_dir), - config_path=actual_config_path, - trace_file_path=actual_trace_path, + config_path=str(actual_config_path), + trace_file_path=str(actual_trace_path), checkpoint_path=checkpoint_path or "", target_ips=target_ips or [], ) @@ -188,8 +197,22 @@ class PamDeployAgent: return state kwargs: dict[str, Any] = {} if action == "publish-version": + if not state.hash_code: + state.last_failed_step = action + self._save_checkpoint(state) + raise RuntimeError("publish-version 缺少 HASH_CODE,请确认 upload-package 是否成功返回 HASH_CODE") kwargs["hash_code"] = state.hash_code - result = self.router.run_action(state, action, **kwargs) + backend = state.action_backends.get(action, "script") + self._emit_progress({"type": "ACTION_START", "stage": action, "backend": backend}) + try: + result = self.router.run_action(state, action, **kwargs) + except Exception as exc: + result = ActionResult( + action=action, + backend=backend, + ok=False, + error_summary=str(exc), + ) state.events.append( { "type": "ACTION_DONE" if result.ok else "ACTION_FAIL", @@ -200,15 +223,50 @@ class PamDeployAgent: ) self._append_action_analysis(state, action, result) if not result.ok: + self._emit_progress( + { + "type": "ACTION_FAIL", + "stage": action, + "backend": result.backend, + "message": result.error_summary or "action 执行失败", + } + ) state.last_failed_step = action self._save_checkpoint(state) raise RuntimeError(f"{action} 执行失败: {result.error_summary}") + missing_values = self._missing_required_values(action, result.values) + if missing_values: + message = f"{action} 返回缺少必要字段: {', '.join(missing_values)}" + self._emit_progress( + { + "type": "ACTION_FAIL", + "stage": action, + "backend": result.backend, + "message": message, + } + ) + state.last_failed_step = action + self._save_checkpoint(state) + raise RuntimeError(message) self._apply_result(state, action, result.values) state.completed_global_steps.append(action) state.last_success_step = action + self._emit_progress( + { + "type": "ACTION_DONE", + "stage": action, + "backend": result.backend, + "message": result.values.get("MESSAGE", "ok"), + } + ) self._save_checkpoint(state) return state + def _missing_required_values(self, action: str, values: dict[str, Any]) -> list[str]: + """检查 action 成功返回后是否带回后续步骤必需字段。""" + required = REQUIRED_ACTION_VALUES.get(action, ()) + return [key for key in required if not values.get(key)] + def run_deploy_flow(self, state: AgentState) -> AgentState: """执行完整部署流程:全局阶段后进入逐 IP 阶段。""" if state.pending_confirmation: @@ -272,7 +330,24 @@ class PamDeployAgent: completed_steps = ip_state.setdefault("completed_steps", []) if action in completed_steps: return state - result = self.router.run_action(state, action, ip=ip) + self._emit_progress( + { + "type": "ACTION_START", + "stage": action, + "backend": state.action_backends.get(action, ""), + "ip": ip, + } + ) + backend = state.action_backends.get(action, "script") + try: + result = self.router.run_action(state, action, ip=ip) + except Exception as exc: + result = ActionResult( + action=action, + backend=backend, + ok=False, + error_summary=str(exc), + ) failed = (not result.ok) or self._business_failed(action, result.values) state.events.append( { @@ -286,6 +361,15 @@ class PamDeployAgent: self._append_action_analysis(state, action, result, ip=ip) if failed: + self._emit_progress( + { + "type": "ACTION_FAIL", + "stage": action, + "backend": result.backend, + "ip": ip, + "message": result.error_summary or result.values.get("MESSAGE", "action 执行失败"), + } + ) self._record_ip_failure(state, ip, action, result.error_summary or str(result.values)) if action != "download-log": self._download_log_best_effort(state, ip) @@ -295,6 +379,15 @@ class PamDeployAgent: self._apply_ip_result(ip_state, action, result.values) completed_steps.append(action) + self._emit_progress( + { + "type": "ACTION_DONE", + "stage": action, + "backend": result.backend, + "ip": ip, + "message": result.values.get("MESSAGE", "ok"), + } + ) self._save_checkpoint(state) return state @@ -343,12 +436,29 @@ class PamDeployAgent: self._save_checkpoint(state) return state - result = self.router.run_action( - state, - "rollback-ip", - ip=ip, - stop_first=bool(ip_state.get("rollback_stop_first", False)), + backend = state.action_backends.get("rollback-ip", "script") + self._emit_progress( + { + "type": "ACTION_START", + "stage": "rollback-ip", + "backend": backend, + "ip": ip, + } ) + try: + result = self.router.run_action( + state, + "rollback-ip", + ip=ip, + stop_first=bool(ip_state.get("rollback_stop_first", False)), + ) + except Exception as exc: + result = ActionResult( + action="rollback-ip", + backend=backend, + ok=False, + error_summary=str(exc), + ) ip_state["rollback_status"] = "ROLLBACK_DONE" if result.ok else "ROLLBACK_FAILED" state.events.append( { @@ -364,12 +474,39 @@ class PamDeployAgent: state.pending_confirmation = "" state.last_success_step = "rollback-ip" state.last_failed_step = "" + self._emit_progress( + { + "type": "ACTION_DONE", + "stage": "rollback-ip", + "backend": result.backend, + "ip": ip, + "message": result.values.get("MESSAGE", "ok"), + } + ) else: state.pending_confirmation = f"rollback-ip:{ip}" state.last_failed_step = "rollback-ip" + self._emit_progress( + { + "type": "ACTION_FAIL", + "stage": "rollback-ip", + "backend": result.backend, + "ip": ip, + "message": result.error_summary or result.values.get("MESSAGE", "rollback 执行失败"), + } + ) self._save_checkpoint(state) return state + def _emit_progress(self, payload: dict[str, Any]) -> None: + """向 CLI/chat 回调 action 执行进度,回调失败不影响主流程。""" + if self.progress_callback is None: + return + try: + self.progress_callback(payload) + except Exception: + return + def _apply_result(self, state: AgentState, action: str, values: dict[str, Any]) -> None: """把全局 action 返回值写回 AgentState。""" if "HASH_CODE" in values: @@ -442,7 +579,25 @@ class PamDeployAgent: def _download_log_best_effort(self, state: AgentState, ip: str) -> None: """失败后尽力下载日志,日志失败不覆盖原失败原因。""" - result = self.router.run_action(state, "download-log", ip=ip) + backend = state.action_backends.get("download-log", "script") + self._emit_progress( + { + "type": "ACTION_START", + "stage": "download-log", + "backend": backend, + "ip": ip, + "message": "失败后尝试下载日志", + } + ) + try: + result = self.router.run_action(state, "download-log", ip=ip) + except Exception as exc: + result = ActionResult( + action="download-log", + backend=backend, + ok=False, + error_summary=str(exc), + ) ip_state = state.ip_states[ip] if result.ok: ip_state["log_file"] = str(result.values.get("LOG_FILE", "")) @@ -455,6 +610,15 @@ class PamDeployAgent: "message": "已尽力下载日志", } ) + self._emit_progress( + { + "type": "ACTION_DONE", + "stage": "download-log", + "backend": result.backend, + "ip": ip, + "message": result.values.get("MESSAGE", "已尽力下载日志"), + } + ) else: state.events.append( { @@ -465,6 +629,15 @@ class PamDeployAgent: "message": result.error_summary or "尽力下载日志失败", } ) + self._emit_progress( + { + "type": "ACTION_FAIL", + "stage": "download-log", + "backend": result.backend, + "ip": ip, + "message": result.error_summary or "尽力下载日志失败", + } + ) self._append_action_analysis(state, "download-log", result, ip=ip) def _save_checkpoint(self, state: AgentState) -> None: @@ -552,3 +725,20 @@ class PamDeployAgent: ) ) return "\n".join(lines) + + +def _absolute_path(path: str | Path) -> Path: + """把传给脚本的文件路径转换为绝对路径,避免 cwd 切换后读错文件。""" + return Path(path).expanduser().resolve() + + +def _normalize_local_file_path(path: str) -> str: + """把本地相对文件路径转换为绝对路径,Windows/Unix 绝对路径保持不变。""" + if re.match(r"^[A-Za-z]:[\\/]", path): + return path + if path.startswith("/"): + return path + value = Path(path).expanduser() + if value.is_absolute(): + return str(value) + return str(value.resolve()) diff --git a/pam_deploy_graph/interactive.py b/pam_deploy_graph/interactive.py index 31c2c30..87739e1 100644 --- a/pam_deploy_graph/interactive.py +++ b/pam_deploy_graph/interactive.py @@ -75,6 +75,7 @@ class InteractiveCliSession: self.llm_config: dict[str, str] = {} self.mcp_config_path: str = "" self.graph_runtime: LangGraphDeploymentRuntime | None = None + self.agent.progress_callback = self._on_progress def run(self) -> None: """启动 REPL 循环,直到用户 exit 或输入流结束。""" @@ -151,6 +152,14 @@ class InteractiveCliSession: self._load_checkpoint(rest.strip()[len("checkpoint") :].strip()) return True + if _is_small_talk(text): + self.output("你好。可以输入 help 查看命令,或直接描述部署需求;执行前仍需输入 run 并确认。") + return True + if not _looks_like_deploy_request(text): + self.output("我没有识别到明确的部署需求。可以输入 help 查看命令,或用 analyze <需求> 明确触发需求分析。") + return True + + self.output("正在分析需求...") self._analyze(text) return True @@ -160,7 +169,11 @@ class InteractiveCliSession: self.output("请输入要分析的自然语言需求,例如:analyze 请用 MCP 预演部署 HET。") return - result = self.agent.analyze_request(text, self.params) + try: + result = self.agent.analyze_request(text, self.params) + except Exception as exc: + self.output(f"需求分析失败: {exc}") + return self.last_analysis = result param_result = result["params"] intent_result = result["intent"] @@ -309,6 +322,17 @@ class InteractiveCliSession: self._print_confirmation() return + if not self._prepare_params_for_run(): + return + + problems = self._validate_run_prerequisites(self.params) + if problems: + self.output("执行前检查未通过:") + for problem in problems: + self.output(f"- {problem}") + self.output("请修正参数或配置后再输入 run。") + return + if not self._confirm_params_and_scope(): self.output("已取消执行。") return @@ -356,18 +380,63 @@ class InteractiveCliSession: if self.state is None: self.output("当前没有运行状态。") return - try: - if self.graph_runtime is None or not self.graph_runtime.waiting_confirmation: + if self.graph_runtime is None or not self.graph_runtime.waiting_confirmation: + try: self.graph_runtime = LangGraphDeploymentRuntime(agent=self.agent) + except RuntimeError as exc: + self.output(f"LangGraph 确认运行器不可用,降级为本地执行: {exc}") + self.graph_runtime = None + try: + self.state = self.agent.run_deploy_flow(self.state) + except Exception as fallback_exc: + self._handle_execution_error(fallback_exc) + return + self._print_state_report_and_checkpoint() + return + try: result = self.graph_runtime.start(self.state) - except RuntimeError as exc: - self.output(f"LangGraph 确认运行器不可用,降级为本地执行: {exc}") - self.graph_runtime = None - self.state = self.agent.run_deploy_flow(self.state) - self._print_state_report_and_checkpoint() + except Exception as exc: + self._handle_execution_error(exc) return self._apply_graph_result(result) + def _prepare_params_for_run(self) -> bool: + """执行前归一化参数,确保确认值和实际写入脚本配置一致。""" + try: + self.params = self.agent.normalize_params(self.params) + except ValueError as exc: + self.output(f"参数检查失败: {exc}") + return False + return True + + def _validate_run_prerequisites(self, params: dict[str, Any]) -> list[str]: + """在创建 state 前检查可提前发现的运行问题。""" + problems: list[str] = [] + if self.strategy != "fake": + zip_path = str(params.get("ZIP_FILE_PATH", "")).strip() + if not _path_exists(zip_path): + problems.append(f"ZIP_FILE_PATH 不存在: {zip_path}") + if self.strategy in ("script_only", "hybrid_node_mcp"): + script_entry = self.agent.script_base_dir / "deploy.sh" + ps_entry = self.agent.script_base_dir / "deploy.ps1" + if not script_entry.exists() and not ps_entry.exists(): + problems.append(f"脚本入口不存在: {script_entry} 或 {ps_entry}") + if self.strategy == "hybrid_node_mcp" and self.agent.mcp_runner is None: + problems.append("当前策略需要 MCP runner,请启动时传 --mcp-config 或在 chat 内执行 mcp config <路径>。") + return problems + + def _handle_execution_error(self, exc: Exception) -> None: + """输出 action 执行失败后的可恢复提示,不再误报 LangGraph 不可用。""" + self.output(f"执行已停止: {exc}") + if self.state is None: + return + if self.state.last_failed_step: + self.output(f"最后失败步骤: {self.state.last_failed_step}") + if self.state.pending_confirmation: + self._print_confirmation() + self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}") + self.output("请修正参数或外部环境后,使用 load checkpoint <路径> / resume 继续,或重新 run。") + def _apply_graph_result(self, result: LangGraphRunResult) -> None: """把 LangGraph 运行结果同步回 chat 会话并输出用户可见状态。""" if result.state is not None: @@ -429,6 +498,28 @@ class InteractiveCliSession: if self.state.pending_confirmation: self._print_confirmation() + def _on_progress(self, payload: dict[str, Any]) -> None: + """把 Agent action 进度转成 chat 可见输出。""" + event_type = str(payload.get("type", "")) + stage = str(payload.get("stage", "")) + backend = str(payload.get("backend", "")) + ip = str(payload.get("ip", "")) + message = str(payload.get("message", "")) + suffix_parts = [] + if backend: + suffix_parts.append(f"backend={backend}") + if ip: + suffix_parts.append(f"ip={ip}") + suffix = f" [{', '.join(suffix_parts)}]" if suffix_parts else "" + if event_type == "ACTION_START": + self.output(f"开始执行 action: {stage}{suffix}") + elif event_type == "ACTION_DONE": + detail = f": {message}" if message and message != "ok" else "" + self.output(f"完成 action: {stage}{suffix}{detail}") + elif event_type == "ACTION_FAIL": + detail = f": {message}" if message else "" + self.output(f"失败 action: {stage}{suffix}{detail}") + def _print_confirmation(self) -> None: """输出当前待人工确认事项。""" if self.state is None: @@ -526,10 +617,68 @@ def _parse_key_values(parts: list[str]) -> dict[str, str]: return values +def _is_small_talk(text: str) -> bool: + """识别不应触发 LLM/结构化分析的简单寒暄。""" + normalized = text.strip().lower() + return normalized in { + "你好", + "您好", + "hello", + "hi", + "hey", + "在吗", + "谢谢", + "thanks", + "thank you", + } + + +def _looks_like_deploy_request(text: str) -> bool: + """粗筛自然语言部署需求,避免任意闲聊都触发耗时分析。""" + lowered = text.lower() + deploy_keywords = ( + "部署", + "发布", + "升级", + "回滚", + "预演", + "执行", + "pam", + "mcp", + "node", + "版本", + "机场", + "deploy", + "release", + "upgrade", + "rollback", + "preview", + ) + param_markers = ( + "HOME_BASE_URL", + "CLIENT_ID", + "AIRPORT_CODE", + "APP_NAME", + "MODULE_NAME", + "VERSION_NUMBER", + "ZIP_FILE_PATH", + ) + return any(keyword in lowered for keyword in deploy_keywords) or any(marker in text for marker in param_markers) + + +def _path_exists(path: str) -> bool: + """检查本地路径是否存在,兼容打包到 Linux 后的绝对路径。""" + if not path: + return False + return Path(path).expanduser().exists() + + def _build_prompt_input(input_func: InputFunc) -> InputFunc: """如果安装了 prompt_toolkit,则启用历史记录和命令补全。""" if input_func is not builtins.input: return input_func + if getattr(sys, "frozen", False): + return input_func try: from prompt_toolkit import PromptSession from prompt_toolkit.completion import WordCompleter diff --git a/pam_deploy_graph/mcp_runner.py b/pam_deploy_graph/mcp_runner.py index 65df751..700ad59 100644 --- a/pam_deploy_graph/mcp_runner.py +++ b/pam_deploy_graph/mcp_runner.py @@ -54,6 +54,7 @@ class McpActionRunner: params: dict[str, Any], ip: str | None = None, hash_code: str | None = None, + node_url: str | None = None, stop_first: bool = False, **_: Any, ) -> ActionResult: @@ -66,6 +67,7 @@ class McpActionRunner: params=params, ip=ip, hash_code=hash_code, + node_url=node_url, stop_first=stop_first, ) try: @@ -116,6 +118,7 @@ class McpActionRunner: params: dict[str, Any], ip: str | None, hash_code: str | None, + node_url: str | None, stop_first: bool, ) -> dict[str, Any]: """把 Agent 参数转换为 MCP tool 所需的入参。""" @@ -133,6 +136,8 @@ class McpActionRunner: arguments["targetIp"] = ip if hash_code: arguments["hashCode"] = hash_code + if node_url: + arguments["nodeUrl"] = node_url if action == "rollback-ip": arguments["stopFirst"] = stop_first return {key: value for key, value in arguments.items() if value not in (None, "")} diff --git a/tests/test_agent_flow.py b/tests/test_agent_flow.py index 05a196f..66a8dd1 100644 --- a/tests/test_agent_flow.py +++ b/tests/test_agent_flow.py @@ -1,5 +1,7 @@ from pathlib import Path +import pytest + from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.checkpoint_store import load_agent_state from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE @@ -33,6 +35,41 @@ def test_run_deploy_flow_success(tmp_path: Path): assert all(item["status"] == "SUCCESS" for item in state.ip_states.values()) +def test_create_state_writes_absolute_script_config_path_and_normalized_zip(tmp_path: Path): + package_path = tmp_path / "pkg.zip" + params = {**PARAMS, "ZIP_FILE_PATH": str(package_path)} + agent = PamDeployAgent(fake_runner=FakeActionRunner()) + + state = agent.create_state( + params=params, + execution_strategy="fake", + config_path=str(tmp_path / "runtime" / "config.txt"), + trace_file_path=str(tmp_path / "logs" / "trace.log"), + ) + + assert Path(state.config_path).is_absolute() + assert Path(state.trace_file_path).is_absolute() + config_text = Path(state.config_path).read_text(encoding="utf-8") + assert f"ZIP_FILE_PATH={package_path.resolve()}" in config_text + + +def test_global_action_requires_hash_code_from_upload_package(tmp_path: Path): + fake = FakeActionRunner({"upload-package": {"ACTION": "upload-package"}}) + agent = PamDeployAgent(fake_runner=fake) + state = agent.create_state( + params=PARAMS, + execution_strategy="fake", + config_path=str(tmp_path / "config.txt"), + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + with pytest.raises(RuntimeError, match="缺少必要字段: HASH_CODE"): + agent.run_deploy_flow(state) + + assert state.last_failed_step == "upload-package" + assert "upload-package" not in state.completed_global_steps + + def test_run_deploy_flow_stops_on_verify_failure(tmp_path: Path): fake = FakeActionRunner( { diff --git a/tests/test_interactive_cli.py b/tests/test_interactive_cli.py index 9e4a55b..bccebfa 100644 --- a/tests/test_interactive_cli.py +++ b/tests/test_interactive_cli.py @@ -61,6 +61,69 @@ def test_chat_run_executes_fake_deploy_and_writes_checkpoint(tmp_path: Path): assert all(item["status"] == "SUCCESS" for item in session.state.ip_states.values()) +def test_chat_run_prints_action_progress(tmp_path: Path): + checkpoint = tmp_path / "checkpoint.json" + session = InteractiveCliSession( + agent=PamDeployAgent(fake_runner=FakeActionRunner()), + params=PARAMS, + strategy="fake", + checkpoint_path=str(checkpoint), + ) + + output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) + + assert any("开始执行 action: get-token" in item for item in output) + assert any("完成 action: verify-ip" in item for item in output) + + +def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path): + session = InteractiveCliSession( + agent=PamDeployAgent(), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["你好", "exit"]) + + assert session.last_analysis is None + assert any("可以输入 help 查看命令" in item for item in output) + assert not any("已生成结构化理解" in item for item in output) + + +def test_chat_preflight_blocks_missing_zip_path_before_confirm(tmp_path: Path): + missing_package = tmp_path / "missing.zip" + session = InteractiveCliSession( + agent=PamDeployAgent(), + params={**PARAMS, "ZIP_FILE_PATH": str(missing_package)}, + strategy="script_only", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["run", "exit"]) + + assert session.state is None + assert any("执行前检查未通过" in item for item in output) + assert any("ZIP_FILE_PATH 不存在" in item for item in output) + + +def test_chat_action_failure_does_not_report_langgraph_unavailable(tmp_path: Path): + fake = FakeActionRunner({"upload-package": {"ACTION": "upload-package"}}) + session = InteractiveCliSession( + agent=PamDeployAgent(fake_runner=fake), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) + + assert session.state is not None + assert session.state.last_failed_step == "upload-package" + assert any("执行已停止" in item for item in output) + assert not any("LangGraph 确认运行器不可用" in item for item in output) + + def test_chat_approve_then_resume_continues_after_failed_ip(tmp_path: Path): fake = FakeActionRunner( { @@ -143,4 +206,3 @@ def test_prompt_history_creates_runtime_dir(tmp_path: Path, monkeypatch): assert callable(prompt) assert (tmp_path / "runtime").is_dir() - diff --git a/tests/test_mcp_client.py b/tests/test_mcp_client.py index bb88cc5..1f424c0 100644 --- a/tests/test_mcp_client.py +++ b/tests/test_mcp_client.py @@ -191,6 +191,30 @@ def test_mcp_runner_auto_discovers_tool_name(): assert result.tool_name == "pam_get_online_ips" +def test_mcp_runner_passes_hash_code_and_node_url(): + calls = [] + + class Client: + def call_tool(self, tool_name, arguments): + calls.append((tool_name, arguments)) + return {"ACTION": "upgrade-ip", "SUCCESS": "true"} + + runner = McpActionRunner(client=Client()) + + result = runner.run( + "upgrade-ip", + params={"HOME_BASE_URL": "https://pam.home", "AIRPORT_CODE": "HET"}, + ip="192.168.1.10", + hash_code="hash-1", + node_url="https://pam.node", + ) + + assert result.ok is True + assert calls[0][1]["targetIp"] == "192.168.1.10" + assert calls[0][1]["hashCode"] == "hash-1" + assert calls[0][1]["nodeUrl"] == "https://pam.node" + + def _write_json_config(tmpdir, payload): path = tmpdir / "mcp.json" path.write_text(__import__("json").dumps(payload), encoding="utf-8")