diff --git a/README.md b/README.md index 4888779..c6ba8e0 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,90 @@ # opagent +## PAM Deploy Agent + +本仓库正在把原有 `PAM_AUTO_DEPLY_SKILL.md` + `deploy.sh` / `deploy.ps1` 的组合,重构为一个 LangGraph-style 的 PAM 智能部署 Agent。 + +当前已加入 `pam_deploy_graph` Python 包,用于先落地 Agent Runtime 的核心骨架: + +- PAM_HOME action 固定通过 `deploy.sh` / `deploy.ps1` 调用。 +- PAM_NODE action 可通过 MCP runner 调用。 +- 默认执行策略为 `hybrid_node_mcp`,即 HOME 脚本 action + NODE MCP。 +- 离线策略为 `script_only`,全部 action 走脚本 action。 +- `langgraph` 当前作为可选依赖;本地未安装时,核心 Agent、runner、router 和 parser 仍可独立测试。 + +## 当前代码骨架 + +```text +pam_deploy_graph/ + agent.py # Agent runtime,参数归一化、预演、fake 全局流程 + action_router.py # 按 action 路由到脚本、MCP 或 fake runner + script_runner.py # deploy.sh / deploy.ps1 action 调用封装 + mcp_runner.py # PAM_NODE MCP runner 协议与 action -> tool 映射 + fake_runner.py # 测试用 runner,不访问真实环境 + output_parser.py # 解析 key=value、MCP JSON、待确认回滚标记 + skill_policy.py # 从 PAM_AUTO_DEPLY_SKILL.md 加载 Skill 策略 + config_writer.py # 生成脚本 action 所需 config 文件 + checkpoint_store.py # 业务 checkpoint JSON 读写 + params_loader.py # 读取 JSON 或 config.txt 风格参数文件 + graph.py # 可选 LangGraph 集成入口 + cli.py # CLI 入口 + +tests/ + test_action_router.py + test_output_parser.py + test_params_loader.py + test_script_runner.py + test_skill_policy.py +``` + +## 当前进度 + +已完成: + +- 建立 Python 工程骨架和 `pyproject.toml`。 +- 实现 `hybrid_node_mcp` 路由规则:PAM_HOME 走脚本 action,PAM_NODE 走 MCP。 +- 实现 `script_only` 路由规则:所有 action 走脚本 action。 +- 实现脚本 action 命令构造,避免调用脚本主流程。 +- 实现 MCP runner 抽象和 PAM_NODE action 到 MCP tool 的默认映射。 +- 实现脚本/MCP/fake action 结果统一为 `ActionResult`。 +- 实现 `config.txt.example` 风格和 JSON 风格参数读取。 +- 实现 fake 全局流程,便于不触碰真实环境地验证 Agent 路由。 +- 添加基础测试,当前 `10 passed`。 + +未完成: + +- 尚未接入真实 MCP client。 +- 尚未安装并接入真实 LangGraph `StateGraph` 主图。 +- 尚未实现 LLM 结构化意图识别、参数抽取和计划生成。 +- 尚未实现人工确认 interrupt、断点续跑完整图流程和单 IP 子流程。 +- 尚未执行真实脚本 action 或真实 PAM_NODE MCP 调用。 + +## 使用方式 + +预演: + +```bash +python -m pam_deploy_graph.cli preview --config doc_scripts/config.txt.example --strategy fake +``` + +fake 全局流程验证: + +```bash +python -m pam_deploy_graph.cli run-global --config doc_scripts/config.txt.example --strategy fake --confirm +``` + +测试: + +```bash +pytest -q +``` + +## 下一步建议 + +1. 接入真实 PAM_NODE MCP client,实现 `McpToolClient.call_tool()`。 +2. 用 fake runner 补齐完整部署主流程和单 IP 子流程测试。 +3. 引入 LangGraph,把当前 Agent 节点接入 `StateGraph`。 +4. 增加人工确认节点:参数确认、IP 范围确认、回滚确认。 +5. 增加 LLM structured output:意图识别、参数抽取、部署计划、失败解释。 +6. 完善 checkpoint 恢复:全局步骤跳过、成功 IP 跳过、pending rollback 恢复。 +7. 在测试环境中做 smoke:HOME 脚本 `get-token/get-node-url` + NODE MCP `get-online-ips`。 diff --git a/pam_deploy_graph/__init__.py b/pam_deploy_graph/__init__.py new file mode 100644 index 0000000..71b73f5 --- /dev/null +++ b/pam_deploy_graph/__init__.py @@ -0,0 +1,6 @@ +"""PAM deploy agent package.""" + +from .agent import PamDeployAgent + +__all__ = ["PamDeployAgent"] + diff --git a/pam_deploy_graph/action_router.py b/pam_deploy_graph/action_router.py new file mode 100644 index 0000000..c86d195 --- /dev/null +++ b/pam_deploy_graph/action_router.py @@ -0,0 +1,47 @@ +"""Action routing for HOME script actions and NODE MCP actions.""" + +from __future__ import annotations + +from .constants import ALLOWED_ACTIONS, HOME_ACTIONS, NODE_ACTIONS +from .models import AgentState, BackendName, ExecutionStrategy, ActionResult + + +def build_action_backends(strategy: ExecutionStrategy) -> dict[str, BackendName]: + if strategy == "fake": + return {action: "fake" for action in ALLOWED_ACTIONS} + if strategy == "script_only": + return {action: "script" for action in ALLOWED_ACTIONS} + if strategy == "hybrid_node_mcp": + routes: dict[str, BackendName] = {action: "script" for action in HOME_ACTIONS} + routes.update({action: "mcp" for action in NODE_ACTIONS}) + return routes + raise ValueError(f"Unknown execution strategy: {strategy}") + + +class ActionRouter: + def __init__(self, *, script_runner, mcp_runner=None, fake_runner=None) -> None: + self.script_runner = script_runner + self.mcp_runner = mcp_runner + self.fake_runner = fake_runner + + def run_action(self, state: AgentState, action: str, **kwargs) -> ActionResult: + backend = state.action_backends.get(action) + if not backend: + raise ValueError(f"Action is not routed: {action}") + if backend == "script": + return self.script_runner.run( + action, + params=state.params, + script_entry=state.script_entry, + config_path=state.config_path, + trace_file_path=state.trace_file_path, + **kwargs, + ) + if backend == "mcp": + if self.mcp_runner is None: + raise RuntimeError(f"MCP runner is required for action: {action}") + return self.mcp_runner.run(action, params=state.params, **kwargs) + if self.fake_runner is None: + raise RuntimeError(f"Fake runner is required for action: {action}") + return self.fake_runner.run(action, params=state.params, **kwargs) + diff --git a/pam_deploy_graph/agent.py b/pam_deploy_graph/agent.py new file mode 100644 index 0000000..4f48c93 --- /dev/null +++ b/pam_deploy_graph/agent.py @@ -0,0 +1,140 @@ +"""PAM deploy Agent runtime. + +This is intentionally runnable without langgraph installed. The same nodes can +be wired into LangGraph later via pam_deploy_graph.graph. +""" + +from __future__ import annotations + +import time +from pathlib import Path +from typing import Any + +from .action_router import ActionRouter, build_action_backends +from .config_writer import write_config +from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS +from .fake_runner import FakeActionRunner +from .mcp_runner import McpActionRunner +from .models import AgentState, ExecutionStrategy +from .script_runner import ScriptActionRunner, select_script_entry +from .skill_policy import load_skill_policy + + +class PamDeployAgent: + def __init__( + self, + *, + skill_path: str | Path = "doc_scripts/PAM_AUTO_DEPLY_SKILL.md", + script_base_dir: str | Path = "doc_scripts", + mcp_runner: McpActionRunner | None = None, + fake_runner: FakeActionRunner | None = None, + ) -> None: + self.skill_policy = load_skill_policy(skill_path) + self.script_base_dir = Path(script_base_dir) + self.script_runner = ScriptActionRunner(self.script_base_dir) + self.fake_runner = fake_runner or FakeActionRunner() + self.mcp_runner = mcp_runner + self.router = ActionRouter( + script_runner=self.script_runner, + mcp_runner=mcp_runner, + fake_runner=self.fake_runner, + ) + + def normalize_params(self, params: dict[str, Any]) -> dict[str, Any]: + normalized = {**DEFAULT_PARAMS, **params} + missing = [key for key in REQUIRED_PARAMS if not normalized.get(key)] + if missing: + raise ValueError(f"Missing required params: {', '.join(missing)}") + return normalized + + def create_state( + self, + *, + params: dict[str, Any], + execution_strategy: ExecutionStrategy = "hybrid_node_mcp", + run_id: str | None = None, + script_entry: str | None = None, + config_path: str | None = None, + trace_file_path: str | None = None, + ) -> AgentState: + normalized = self.normalize_params(params) + actual_run_id = run_id or time.strftime("%Y%m%d_%H%M%S") + actual_script_entry = script_entry or select_script_entry() + runtime_dir = Path("runtime") + actual_config_path = config_path or str(runtime_dir / f"config_{actual_run_id}.txt") + actual_trace_path = trace_file_path or str(Path("logs") / f"api_trace_{actual_run_id}.log") + write_config(normalized, actual_config_path) + return AgentState( + run_id=actual_run_id, + params=normalized, + execution_strategy=execution_strategy, + action_backends=build_action_backends(execution_strategy), + script_entry=actual_script_entry, + script_base_dir=str(self.script_base_dir), + config_path=actual_config_path, + trace_file_path=actual_trace_path, + ) + + def preview(self, params: dict[str, Any], strategy: ExecutionStrategy = "hybrid_node_mcp") -> str: + normalized = self.normalize_params(params) + routes = build_action_backends(strategy) + if strategy == "hybrid_node_mcp": + home_backend = "脚本 action" + node_backend = "MCP" + elif strategy == "script_only": + home_backend = "脚本 action" + node_backend = "脚本 action" + else: + home_backend = "fake" + node_backend = "fake" + lines = [ + "## PAM 部署预演", + "", + f"- 执行策略: {strategy}", + f"- PAM_HOME: {home_backend}", + f"- PAM_NODE: {node_backend}", + f"- 机场: {normalized['AIRPORT_CODE']}", + f"- 应用: {normalized['APP_NAME']}", + f"- 模块: {normalized['MODULE_NAME']}", + f"- 版本: {normalized['VERSION_NUMBER']}", + "", + "| action | backend |", + "| --- | --- |", + ] + for action in GLOBAL_ACTION_SEQUENCE: + lines.append(f"| `{action}` | `{routes[action]}` |") + return "\n".join(lines) + + def run_global_flow(self, state: AgentState) -> AgentState: + for action in GLOBAL_ACTION_SEQUENCE: + kwargs: dict[str, Any] = {} + if action == "publish-version": + kwargs["hash_code"] = state.hash_code + result = self.router.run_action(state, action, **kwargs) + state.events.append( + { + "type": "ACTION_DONE" if result.ok else "ACTION_FAIL", + "stage": action, + "backend": result.backend, + "message": result.error_summary or "ok", + } + ) + if not result.ok: + state.last_failed_step = action + raise RuntimeError(f"{action} failed: {result.error_summary}") + self._apply_result(state, action, result.values) + state.completed_global_steps.append(action) + state.last_success_step = action + return state + + def _apply_result(self, state: AgentState, action: str, values: dict[str, Any]) -> None: + if "HASH_CODE" in values: + state.hash_code = str(values["HASH_CODE"]) + if "NODE_URL" in values: + state.node_url = str(values["NODE_URL"]) + if action == "get-online-ips": + ips = values.get("IP", []) + if isinstance(ips, str): + ips = [ips] + state.online_ips = list(ips) + state.target_ips = state.target_ips or state.online_ips.copy() diff --git a/pam_deploy_graph/checkpoint_store.py b/pam_deploy_graph/checkpoint_store.py new file mode 100644 index 0000000..aa540a0 --- /dev/null +++ b/pam_deploy_graph/checkpoint_store.py @@ -0,0 +1,40 @@ +"""Business checkpoint JSON storage.""" + +from __future__ import annotations + +import json +from dataclasses import asdict, is_dataclass +from pathlib import Path +from typing import Any + +from .constants import SENSITIVE_KEYS + + +def redact_mapping(value: Any) -> Any: + if isinstance(value, dict): + result = {} + for key, item in value.items(): + if str(key) in SENSITIVE_KEYS: + result[key] = "***" + else: + result[key] = redact_mapping(item) + return result + if isinstance(value, list): + return [redact_mapping(item) for item in value] + return value + + +def save_checkpoint(state: Any, path: str | Path) -> Path: + checkpoint_path = Path(path) + checkpoint_path.parent.mkdir(parents=True, exist_ok=True) + payload = asdict(state) if is_dataclass(state) else state + checkpoint_path.write_text( + json.dumps(redact_mapping(payload), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + return checkpoint_path + + +def load_checkpoint(path: str | Path) -> dict[str, Any]: + return json.loads(Path(path).read_text(encoding="utf-8")) + diff --git a/pam_deploy_graph/cli.py b/pam_deploy_graph/cli.py new file mode 100644 index 0000000..e6fb7b0 --- /dev/null +++ b/pam_deploy_graph/cli.py @@ -0,0 +1,41 @@ +"""Command line interface for the PAM deploy agent.""" + +from __future__ import annotations + +import argparse +import json + +from .agent import PamDeployAgent +from .params_loader import load_params_file + + +def main() -> None: + parser = argparse.ArgumentParser(prog="pam-deploy-agent") + sub = parser.add_subparsers(dest="command", required=True) + + preview = sub.add_parser("preview") + preview.add_argument("--config", required=True) + preview.add_argument("--strategy", default="hybrid_node_mcp", choices=["hybrid_node_mcp", "script_only", "fake"]) + + run = sub.add_parser("run-global") + run.add_argument("--config", required=True) + run.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"]) + run.add_argument("--confirm", action="store_true") + + args = parser.parse_args() + params = load_params_file(args.config) + agent = PamDeployAgent() + + if args.command == "preview": + print(agent.preview(params, args.strategy)) + return + + if not args.confirm: + raise SystemExit("Refusing to execute actions without --confirm.") + state = agent.create_state(params=params, execution_strategy=args.strategy) + state = agent.run_global_flow(state) + print(json.dumps({"events": state.events}, ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/pam_deploy_graph/config_writer.py b/pam_deploy_graph/config_writer.py new file mode 100644 index 0000000..38de0fe --- /dev/null +++ b/pam_deploy_graph/config_writer.py @@ -0,0 +1,29 @@ +"""Write script config files for PAM HOME action calls.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +CONFIG_KEYS = ( + "HOME_BASE_URL", + "CLIENT_ID", + "CLIENT_SECRET", + "AIRPORT_CODE", + "APP_NAME", + "MODULE_NAME", + "VERSION_NUMBER", + "ZIP_FILE_PATH", + "ACTION_TYPE", + "TIMEOUT", + "LOG_NAME", +) + + +def write_config(params: dict[str, Any], path: str | Path) -> Path: + config_path = Path(path) + config_path.parent.mkdir(parents=True, exist_ok=True) + lines = [f"{key}={params.get(key, '')}" for key in CONFIG_KEYS] + config_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + return config_path + diff --git a/pam_deploy_graph/constants.py b/pam_deploy_graph/constants.py new file mode 100644 index 0000000..4d2b0ef --- /dev/null +++ b/pam_deploy_graph/constants.py @@ -0,0 +1,69 @@ +"""Constants for PAM deploy action routing.""" + +HOME_ACTIONS = ( + "get-token", + "create-version", + "upload-package", + "publish-version", + "get-node-url", +) + +NODE_ACTIONS = ( + "get-online-ips", + "create-download-task", + "poll-download-progress", + "upgrade-ip", + "poll-upgrade-progress", + "start-ip", + "stop-ip", + "verify-ip", + "download-log", + "rollback-ip", +) + +GLOBAL_ACTION_SEQUENCE = ( + "get-token", + "create-version", + "upload-package", + "publish-version", + "get-node-url", + "get-online-ips", + "create-download-task", + "poll-download-progress", +) + +IP_ACTION_SEQUENCE = ( + "upgrade-ip", + "poll-upgrade-progress", + "start-ip", + "verify-ip", + "download-log", +) + +ALLOWED_ACTIONS = HOME_ACTIONS + NODE_ACTIONS + +REQUIRED_PARAMS = ( + "HOME_BASE_URL", + "CLIENT_ID", + "CLIENT_SECRET", + "AIRPORT_CODE", + "APP_NAME", + "MODULE_NAME", + "VERSION_NUMBER", + "ZIP_FILE_PATH", +) + +DEFAULT_PARAMS = { + "ACTION_TYPE": "FULL", + "TIMEOUT": 120, + "LOG_NAME": "app.log", +} + +SENSITIVE_KEYS = { + "CLIENT_SECRET", + "TOKEN", + "Authorization", + "access_token", + "ACCESS_TOKEN", +} + diff --git a/pam_deploy_graph/fake_runner.py b/pam_deploy_graph/fake_runner.py new file mode 100644 index 0000000..7088bd1 --- /dev/null +++ b/pam_deploy_graph/fake_runner.py @@ -0,0 +1,44 @@ +"""Fake action runner for graph and agent tests.""" + +from __future__ import annotations + +from typing import Any + +from .models import ActionResult + + +class FakeActionRunner: + def __init__(self, fixtures: dict[str, dict[str, Any]] | None = None) -> None: + self.fixtures = fixtures or {} + self.calls: list[tuple[str, dict[str, Any]]] = [] + + def run(self, action: str, *, params: dict[str, Any], **kwargs: Any) -> ActionResult: + self.calls.append((action, kwargs)) + values = self.fixtures.get(action, {}).copy() + if not values: + values = self._default_values(action, kwargs) + ok = not values.pop("_fail", False) + return ActionResult( + action=action, + backend="fake", + tool_name=f"fake:{action}", + ok=ok, + values=values, + exit_code=0 if ok else 1, + raw_output=str(values), + error_summary="" if ok else str(values.get("MESSAGE", "Fake action failed")), + ) + + def _default_values(self, action: str, kwargs: dict[str, Any]) -> dict[str, Any]: + if action == "get-token": + return {"ACTION": action, "TOKEN": "***"} + if action == "upload-package": + return {"ACTION": action, "HASH_CODE": "fake-hash"} + if action == "get-node-url": + return {"ACTION": action, "NODE_URL": "https://fake-node.local"} + if action == "get-online-ips": + return {"ACTION": action, "COUNT": "2", "IP": ["192.168.1.10", "192.168.1.11"]} + if action == "download-log": + return {"ACTION": action, "IP": kwargs.get("ip", ""), "LOG_FILE": "logs/fake.zip"} + return {"ACTION": action, "RESULT": "OK"} + diff --git a/pam_deploy_graph/graph.py b/pam_deploy_graph/graph.py new file mode 100644 index 0000000..607b272 --- /dev/null +++ b/pam_deploy_graph/graph.py @@ -0,0 +1,24 @@ +"""Optional LangGraph integration. + +The runtime works without LangGraph installed. This module exposes a factory for +projects that install the optional dependency. +""" + +from __future__ import annotations + + +def build_langgraph(): + try: + from langgraph.graph import END, START, StateGraph + except ImportError as exc: # pragma: no cover - depends on optional package + raise RuntimeError( + "langgraph is not installed. Install the optional dependency with " + "`pip install -e .[langgraph]`." + ) from exc + + graph = StateGraph(dict) + graph.add_node("start", lambda state: state) + graph.add_edge(START, "start") + graph.add_edge("start", END) + return graph.compile() + diff --git a/pam_deploy_graph/mcp_runner.py b/pam_deploy_graph/mcp_runner.py new file mode 100644 index 0000000..8cff615 --- /dev/null +++ b/pam_deploy_graph/mcp_runner.py @@ -0,0 +1,93 @@ +"""Runner wrapper for PAM_NODE MCP tools.""" + +from __future__ import annotations + +from typing import Any, Protocol + +from .models import ActionResult +from .output_parser import parse_mcp_result + + +class McpToolClient(Protocol): + def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: + ... + + +DEFAULT_NODE_MCP_TOOLS = { + "get-online-ips": "pam_get_online_ips", + "create-download-task": "pam_create_download_task", + "poll-download-progress": "pam_poll_download_progress", + "upgrade-ip": "pam_upgrade_ip", + "poll-upgrade-progress": "pam_poll_upgrade_progress", + "start-ip": "pam_start_ip", + "stop-ip": "pam_stop_ip", + "verify-ip": "pam_verify_ip", + "download-log": "pam_download_log", + "rollback-ip": "pam_rollback_ip", +} + + +class McpActionRunner: + def __init__( + self, + client: McpToolClient | None = None, + tool_names: dict[str, str] | None = None, + ) -> None: + self.client = client + self.tool_names = tool_names or DEFAULT_NODE_MCP_TOOLS.copy() + + def run( + self, + action: str, + *, + params: dict[str, Any], + ip: str | None = None, + hash_code: str | None = None, + stop_first: bool = False, + **_: Any, + ) -> ActionResult: + if self.client is None: + raise RuntimeError("MCP client is not configured") + tool_name = self.tool_names.get(action) + if not tool_name: + raise ValueError(f"No MCP tool mapped for action: {action}") + arguments = self._build_arguments( + action, + params=params, + ip=ip, + hash_code=hash_code, + stop_first=stop_first, + ) + try: + payload = self.client.call_tool(tool_name, arguments) + except Exception as exc: # pragma: no cover - defensive wrapper + return parse_mcp_result(action, {}, ok=False, tool_name=tool_name, error=str(exc)) + return parse_mcp_result(action, payload, ok=True, tool_name=tool_name) + + def _build_arguments( + self, + action: str, + *, + params: dict[str, Any], + ip: str | None, + hash_code: str | None, + stop_first: bool, + ) -> dict[str, Any]: + arguments = { + "homeBaseUrl": params.get("HOME_BASE_URL"), + "airportCode": params.get("AIRPORT_CODE"), + "applicationName": params.get("APP_NAME"), + "moduleName": params.get("MODULE_NAME"), + "versionNumber": params.get("VERSION_NUMBER"), + "actionType": params.get("ACTION_TYPE"), + "timeOut": params.get("TIMEOUT"), + "logName": params.get("LOG_NAME"), + } + if ip: + arguments["targetIp"] = ip + if hash_code: + arguments["hashCode"] = hash_code + if action == "rollback-ip": + arguments["stopFirst"] = stop_first + return {key: value for key, value in arguments.items() if value not in (None, "")} + diff --git a/pam_deploy_graph/models.py b/pam_deploy_graph/models.py new file mode 100644 index 0000000..fd67e6f --- /dev/null +++ b/pam_deploy_graph/models.py @@ -0,0 +1,71 @@ +"""Shared dataclasses for the PAM deploy agent.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal + +BackendName = Literal["mcp", "script", "fake"] +ExecutionStrategy = Literal["hybrid_node_mcp", "script_only", "fake"] + + +@dataclass(slots=True) +class ActionResult: + action: str + backend: BackendName + ok: bool + values: dict[str, Any] = field(default_factory=dict) + exit_code: int = 0 + tool_name: str = "" + stdout: str = "" + stderr: str = "" + raw_output: str = "" + error_summary: str = "" + + +@dataclass(slots=True) +class SkillPolicy: + name: str + source_path: str + description: str = "" + allowed_modes: tuple[str, ...] = ("MCP", "API脚本") + allowed_actions: tuple[str, ...] = () + required_confirmations: tuple[str, ...] = ( + "params", + "target_scope", + "rollback", + ) + required_params: tuple[str, ...] = () + optional_params: dict[str, Any] = field(default_factory=dict) + action_sequence: tuple[str, ...] = () + ip_action_sequence: tuple[str, ...] = () + forbidden_actions: tuple[str, ...] = ( + "script-main-flow", + "auto-rollback", + "modify-deploy-scripts", + ) + + +@dataclass(slots=True) +class AgentState: + run_id: str + params: dict[str, Any] + execution_strategy: ExecutionStrategy + action_backends: dict[str, BackendName] + script_entry: str = "" + script_base_dir: str = "." + config_path: str = "" + trace_file_path: str = "" + node_mcp_server_name: str = "" + node_mcp_tool_names: dict[str, str] = field(default_factory=dict) + completed_global_steps: list[str] = field(default_factory=list) + hash_code: str = "" + node_url: str = "" + online_ips: list[str] = field(default_factory=list) + target_ips: list[str] = field(default_factory=list) + ip_states: dict[str, dict[str, Any]] = field(default_factory=dict) + pending_confirmation: str = "" + last_success_step: str = "" + last_failed_step: str = "" + events: list[dict[str, Any]] = field(default_factory=list) + diff --git a/pam_deploy_graph/output_parser.py b/pam_deploy_graph/output_parser.py new file mode 100644 index 0000000..d3a48d7 --- /dev/null +++ b/pam_deploy_graph/output_parser.py @@ -0,0 +1,133 @@ +"""Normalize script stdout and MCP tool returns into ActionResult objects.""" + +from __future__ import annotations + +import json +import re +from typing import Any + +from .constants import SENSITIVE_KEYS +from .models import ActionResult, BackendName + +PENDING_CONFIRMATION_RE = re.compile(r"PENDING_AGENT_CONFIRMATION\((?P
[^)]*)\)") +KEY_VALUE_RE = re.compile(r"^(?P