From ab7b839bc678df6082606f149ffde7bdb739465f Mon Sep 17 00:00:00 2001 From: dark Date: Fri, 29 May 2026 14:49:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=20PAM=20=E6=99=BA?= =?UTF-8?q?=E8=83=BD=E9=83=A8=E7=BD=B2=20Agent=20=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E6=97=B6=E9=AA=A8=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 pam_deploy_graph 包,包含 agent、action router、runner、parser 和配置加载能力 - 支持 hybrid_node_mcp 路由策略:PAM_HOME 走脚本 action,PAM_NODE 走 MCP - 新增 fake runner 和 CLI 预演/全局流程验证入口 - 新增路由、输出解析、配置加载、脚本命令构造、Skill 策略加载测试 - 在 README 中记录当前代码骨架、实现进度、使用方式和下一步建议 --- README.md | 88 +++++++++++++++++ pam_deploy_graph/__init__.py | 6 ++ pam_deploy_graph/action_router.py | 47 +++++++++ pam_deploy_graph/agent.py | 140 +++++++++++++++++++++++++++ pam_deploy_graph/checkpoint_store.py | 40 ++++++++ pam_deploy_graph/cli.py | 41 ++++++++ pam_deploy_graph/config_writer.py | 29 ++++++ pam_deploy_graph/constants.py | 69 +++++++++++++ pam_deploy_graph/fake_runner.py | 44 +++++++++ pam_deploy_graph/graph.py | 24 +++++ pam_deploy_graph/mcp_runner.py | 93 ++++++++++++++++++ pam_deploy_graph/models.py | 71 ++++++++++++++ pam_deploy_graph/output_parser.py | 133 +++++++++++++++++++++++++ pam_deploy_graph/params_loader.py | 27 ++++++ pam_deploy_graph/script_runner.py | 114 ++++++++++++++++++++++ pam_deploy_graph/skill_policy.py | 42 ++++++++ pyproject.toml | 15 +++ tests/test_action_router.py | 15 +++ tests/test_output_parser.py | 24 +++++ tests/test_params_loader.py | 18 ++++ tests/test_script_runner.py | 48 +++++++++ tests/test_skill_policy.py | 11 +++ 22 files changed, 1139 insertions(+) create mode 100644 pam_deploy_graph/__init__.py create mode 100644 pam_deploy_graph/action_router.py create mode 100644 pam_deploy_graph/agent.py create mode 100644 pam_deploy_graph/checkpoint_store.py create mode 100644 pam_deploy_graph/cli.py create mode 100644 pam_deploy_graph/config_writer.py create mode 100644 pam_deploy_graph/constants.py create mode 100644 pam_deploy_graph/fake_runner.py create mode 100644 pam_deploy_graph/graph.py create mode 100644 pam_deploy_graph/mcp_runner.py create mode 100644 pam_deploy_graph/models.py create mode 100644 pam_deploy_graph/output_parser.py create mode 100644 pam_deploy_graph/params_loader.py create mode 100644 pam_deploy_graph/script_runner.py create mode 100644 pam_deploy_graph/skill_policy.py create mode 100644 pyproject.toml create mode 100644 tests/test_action_router.py create mode 100644 tests/test_output_parser.py create mode 100644 tests/test_params_loader.py create mode 100644 tests/test_script_runner.py create mode 100644 tests/test_skill_policy.py diff --git a/README.md b/README.md index 4888779..c6ba8e0 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,90 @@ # opagent +## PAM Deploy Agent + +本仓库正在把原有 `PAM_AUTO_DEPLY_SKILL.md` + `deploy.sh` / `deploy.ps1` 的组合,重构为一个 LangGraph-style 的 PAM 智能部署 Agent。 + +当前已加入 `pam_deploy_graph` Python 包,用于先落地 Agent Runtime 的核心骨架: + +- PAM_HOME action 固定通过 `deploy.sh` / `deploy.ps1` 调用。 +- PAM_NODE action 可通过 MCP runner 调用。 +- 默认执行策略为 `hybrid_node_mcp`,即 HOME 脚本 action + NODE MCP。 +- 离线策略为 `script_only`,全部 action 走脚本 action。 +- `langgraph` 当前作为可选依赖;本地未安装时,核心 Agent、runner、router 和 parser 仍可独立测试。 + +## 当前代码骨架 + +```text +pam_deploy_graph/ + agent.py # Agent runtime,参数归一化、预演、fake 全局流程 + action_router.py # 按 action 路由到脚本、MCP 或 fake runner + script_runner.py # deploy.sh / deploy.ps1 action 调用封装 + mcp_runner.py # PAM_NODE MCP runner 协议与 action -> tool 映射 + fake_runner.py # 测试用 runner,不访问真实环境 + output_parser.py # 解析 key=value、MCP JSON、待确认回滚标记 + skill_policy.py # 从 PAM_AUTO_DEPLY_SKILL.md 加载 Skill 策略 + config_writer.py # 生成脚本 action 所需 config 文件 + checkpoint_store.py # 业务 checkpoint JSON 读写 + params_loader.py # 读取 JSON 或 config.txt 风格参数文件 + graph.py # 可选 LangGraph 集成入口 + cli.py # CLI 入口 + +tests/ + test_action_router.py + test_output_parser.py + test_params_loader.py + test_script_runner.py + test_skill_policy.py +``` + +## 当前进度 + +已完成: + +- 建立 Python 工程骨架和 `pyproject.toml`。 +- 实现 `hybrid_node_mcp` 路由规则:PAM_HOME 走脚本 action,PAM_NODE 走 MCP。 +- 实现 `script_only` 路由规则:所有 action 走脚本 action。 +- 实现脚本 action 命令构造,避免调用脚本主流程。 +- 实现 MCP runner 抽象和 PAM_NODE action 到 MCP tool 的默认映射。 +- 实现脚本/MCP/fake action 结果统一为 `ActionResult`。 +- 实现 `config.txt.example` 风格和 JSON 风格参数读取。 +- 实现 fake 全局流程,便于不触碰真实环境地验证 Agent 路由。 +- 添加基础测试,当前 `10 passed`。 + +未完成: + +- 尚未接入真实 MCP client。 +- 尚未安装并接入真实 LangGraph `StateGraph` 主图。 +- 尚未实现 LLM 结构化意图识别、参数抽取和计划生成。 +- 尚未实现人工确认 interrupt、断点续跑完整图流程和单 IP 子流程。 +- 尚未执行真实脚本 action 或真实 PAM_NODE MCP 调用。 + +## 使用方式 + +预演: + +```bash +python -m pam_deploy_graph.cli preview --config doc_scripts/config.txt.example --strategy fake +``` + +fake 全局流程验证: + +```bash +python -m pam_deploy_graph.cli run-global --config doc_scripts/config.txt.example --strategy fake --confirm +``` + +测试: + +```bash +pytest -q +``` + +## 下一步建议 + +1. 接入真实 PAM_NODE MCP client,实现 `McpToolClient.call_tool()`。 +2. 用 fake runner 补齐完整部署主流程和单 IP 子流程测试。 +3. 引入 LangGraph,把当前 Agent 节点接入 `StateGraph`。 +4. 增加人工确认节点:参数确认、IP 范围确认、回滚确认。 +5. 增加 LLM structured output:意图识别、参数抽取、部署计划、失败解释。 +6. 完善 checkpoint 恢复:全局步骤跳过、成功 IP 跳过、pending rollback 恢复。 +7. 在测试环境中做 smoke:HOME 脚本 `get-token/get-node-url` + NODE MCP `get-online-ips`。 diff --git a/pam_deploy_graph/__init__.py b/pam_deploy_graph/__init__.py new file mode 100644 index 0000000..71b73f5 --- /dev/null +++ b/pam_deploy_graph/__init__.py @@ -0,0 +1,6 @@ +"""PAM deploy agent package.""" + +from .agent import PamDeployAgent + +__all__ = ["PamDeployAgent"] + diff --git a/pam_deploy_graph/action_router.py b/pam_deploy_graph/action_router.py new file mode 100644 index 0000000..c86d195 --- /dev/null +++ b/pam_deploy_graph/action_router.py @@ -0,0 +1,47 @@ +"""Action routing for HOME script actions and NODE MCP actions.""" + +from __future__ import annotations + +from .constants import ALLOWED_ACTIONS, HOME_ACTIONS, NODE_ACTIONS +from .models import AgentState, BackendName, ExecutionStrategy, ActionResult + + +def build_action_backends(strategy: ExecutionStrategy) -> dict[str, BackendName]: + if strategy == "fake": + return {action: "fake" for action in ALLOWED_ACTIONS} + if strategy == "script_only": + return {action: "script" for action in ALLOWED_ACTIONS} + if strategy == "hybrid_node_mcp": + routes: dict[str, BackendName] = {action: "script" for action in HOME_ACTIONS} + routes.update({action: "mcp" for action in NODE_ACTIONS}) + return routes + raise ValueError(f"Unknown execution strategy: {strategy}") + + +class ActionRouter: + def __init__(self, *, script_runner, mcp_runner=None, fake_runner=None) -> None: + self.script_runner = script_runner + self.mcp_runner = mcp_runner + self.fake_runner = fake_runner + + def run_action(self, state: AgentState, action: str, **kwargs) -> ActionResult: + backend = state.action_backends.get(action) + if not backend: + raise ValueError(f"Action is not routed: {action}") + if backend == "script": + return self.script_runner.run( + action, + params=state.params, + script_entry=state.script_entry, + config_path=state.config_path, + trace_file_path=state.trace_file_path, + **kwargs, + ) + if backend == "mcp": + if self.mcp_runner is None: + raise RuntimeError(f"MCP runner is required for action: {action}") + return self.mcp_runner.run(action, params=state.params, **kwargs) + if self.fake_runner is None: + raise RuntimeError(f"Fake runner is required for action: {action}") + return self.fake_runner.run(action, params=state.params, **kwargs) + diff --git a/pam_deploy_graph/agent.py b/pam_deploy_graph/agent.py new file mode 100644 index 0000000..4f48c93 --- /dev/null +++ b/pam_deploy_graph/agent.py @@ -0,0 +1,140 @@ +"""PAM deploy Agent runtime. + +This is intentionally runnable without langgraph installed. The same nodes can +be wired into LangGraph later via pam_deploy_graph.graph. +""" + +from __future__ import annotations + +import time +from pathlib import Path +from typing import Any + +from .action_router import ActionRouter, build_action_backends +from .config_writer import write_config +from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS +from .fake_runner import FakeActionRunner +from .mcp_runner import McpActionRunner +from .models import AgentState, ExecutionStrategy +from .script_runner import ScriptActionRunner, select_script_entry +from .skill_policy import load_skill_policy + + +class PamDeployAgent: + def __init__( + self, + *, + skill_path: str | Path = "doc_scripts/PAM_AUTO_DEPLY_SKILL.md", + script_base_dir: str | Path = "doc_scripts", + mcp_runner: McpActionRunner | None = None, + fake_runner: FakeActionRunner | None = None, + ) -> None: + self.skill_policy = load_skill_policy(skill_path) + self.script_base_dir = Path(script_base_dir) + self.script_runner = ScriptActionRunner(self.script_base_dir) + self.fake_runner = fake_runner or FakeActionRunner() + self.mcp_runner = mcp_runner + self.router = ActionRouter( + script_runner=self.script_runner, + mcp_runner=mcp_runner, + fake_runner=self.fake_runner, + ) + + def normalize_params(self, params: dict[str, Any]) -> dict[str, Any]: + normalized = {**DEFAULT_PARAMS, **params} + missing = [key for key in REQUIRED_PARAMS if not normalized.get(key)] + if missing: + raise ValueError(f"Missing required params: {', '.join(missing)}") + return normalized + + def create_state( + self, + *, + params: dict[str, Any], + execution_strategy: ExecutionStrategy = "hybrid_node_mcp", + run_id: str | None = None, + script_entry: str | None = None, + config_path: str | None = None, + trace_file_path: str | None = None, + ) -> AgentState: + normalized = self.normalize_params(params) + actual_run_id = run_id or time.strftime("%Y%m%d_%H%M%S") + actual_script_entry = script_entry or select_script_entry() + runtime_dir = Path("runtime") + actual_config_path = config_path or str(runtime_dir / f"config_{actual_run_id}.txt") + actual_trace_path = trace_file_path or str(Path("logs") / f"api_trace_{actual_run_id}.log") + write_config(normalized, actual_config_path) + return AgentState( + run_id=actual_run_id, + params=normalized, + execution_strategy=execution_strategy, + action_backends=build_action_backends(execution_strategy), + script_entry=actual_script_entry, + script_base_dir=str(self.script_base_dir), + config_path=actual_config_path, + trace_file_path=actual_trace_path, + ) + + def preview(self, params: dict[str, Any], strategy: ExecutionStrategy = "hybrid_node_mcp") -> str: + normalized = self.normalize_params(params) + routes = build_action_backends(strategy) + if strategy == "hybrid_node_mcp": + home_backend = "脚本 action" + node_backend = "MCP" + elif strategy == "script_only": + home_backend = "脚本 action" + node_backend = "脚本 action" + else: + home_backend = "fake" + node_backend = "fake" + lines = [ + "## PAM 部署预演", + "", + f"- 执行策略: {strategy}", + f"- PAM_HOME: {home_backend}", + f"- PAM_NODE: {node_backend}", + f"- 机场: {normalized['AIRPORT_CODE']}", + f"- 应用: {normalized['APP_NAME']}", + f"- 模块: {normalized['MODULE_NAME']}", + f"- 版本: {normalized['VERSION_NUMBER']}", + "", + "| action | backend |", + "| --- | --- |", + ] + for action in GLOBAL_ACTION_SEQUENCE: + lines.append(f"| `{action}` | `{routes[action]}` |") + return "\n".join(lines) + + def run_global_flow(self, state: AgentState) -> AgentState: + for action in GLOBAL_ACTION_SEQUENCE: + kwargs: dict[str, Any] = {} + if action == "publish-version": + kwargs["hash_code"] = state.hash_code + result = self.router.run_action(state, action, **kwargs) + state.events.append( + { + "type": "ACTION_DONE" if result.ok else "ACTION_FAIL", + "stage": action, + "backend": result.backend, + "message": result.error_summary or "ok", + } + ) + if not result.ok: + state.last_failed_step = action + raise RuntimeError(f"{action} failed: {result.error_summary}") + self._apply_result(state, action, result.values) + state.completed_global_steps.append(action) + state.last_success_step = action + return state + + def _apply_result(self, state: AgentState, action: str, values: dict[str, Any]) -> None: + if "HASH_CODE" in values: + state.hash_code = str(values["HASH_CODE"]) + if "NODE_URL" in values: + state.node_url = str(values["NODE_URL"]) + if action == "get-online-ips": + ips = values.get("IP", []) + if isinstance(ips, str): + ips = [ips] + state.online_ips = list(ips) + state.target_ips = state.target_ips or state.online_ips.copy() diff --git a/pam_deploy_graph/checkpoint_store.py b/pam_deploy_graph/checkpoint_store.py new file mode 100644 index 0000000..aa540a0 --- /dev/null +++ b/pam_deploy_graph/checkpoint_store.py @@ -0,0 +1,40 @@ +"""Business checkpoint JSON storage.""" + +from __future__ import annotations + +import json +from dataclasses import asdict, is_dataclass +from pathlib import Path +from typing import Any + +from .constants import SENSITIVE_KEYS + + +def redact_mapping(value: Any) -> Any: + if isinstance(value, dict): + result = {} + for key, item in value.items(): + if str(key) in SENSITIVE_KEYS: + result[key] = "***" + else: + result[key] = redact_mapping(item) + return result + if isinstance(value, list): + return [redact_mapping(item) for item in value] + return value + + +def save_checkpoint(state: Any, path: str | Path) -> Path: + checkpoint_path = Path(path) + checkpoint_path.parent.mkdir(parents=True, exist_ok=True) + payload = asdict(state) if is_dataclass(state) else state + checkpoint_path.write_text( + json.dumps(redact_mapping(payload), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + return checkpoint_path + + +def load_checkpoint(path: str | Path) -> dict[str, Any]: + return json.loads(Path(path).read_text(encoding="utf-8")) + diff --git a/pam_deploy_graph/cli.py b/pam_deploy_graph/cli.py new file mode 100644 index 0000000..e6fb7b0 --- /dev/null +++ b/pam_deploy_graph/cli.py @@ -0,0 +1,41 @@ +"""Command line interface for the PAM deploy agent.""" + +from __future__ import annotations + +import argparse +import json + +from .agent import PamDeployAgent +from .params_loader import load_params_file + + +def main() -> None: + parser = argparse.ArgumentParser(prog="pam-deploy-agent") + sub = parser.add_subparsers(dest="command", required=True) + + preview = sub.add_parser("preview") + preview.add_argument("--config", required=True) + preview.add_argument("--strategy", default="hybrid_node_mcp", choices=["hybrid_node_mcp", "script_only", "fake"]) + + run = sub.add_parser("run-global") + run.add_argument("--config", required=True) + run.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"]) + run.add_argument("--confirm", action="store_true") + + args = parser.parse_args() + params = load_params_file(args.config) + agent = PamDeployAgent() + + if args.command == "preview": + print(agent.preview(params, args.strategy)) + return + + if not args.confirm: + raise SystemExit("Refusing to execute actions without --confirm.") + state = agent.create_state(params=params, execution_strategy=args.strategy) + state = agent.run_global_flow(state) + print(json.dumps({"events": state.events}, ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/pam_deploy_graph/config_writer.py b/pam_deploy_graph/config_writer.py new file mode 100644 index 0000000..38de0fe --- /dev/null +++ b/pam_deploy_graph/config_writer.py @@ -0,0 +1,29 @@ +"""Write script config files for PAM HOME action calls.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +CONFIG_KEYS = ( + "HOME_BASE_URL", + "CLIENT_ID", + "CLIENT_SECRET", + "AIRPORT_CODE", + "APP_NAME", + "MODULE_NAME", + "VERSION_NUMBER", + "ZIP_FILE_PATH", + "ACTION_TYPE", + "TIMEOUT", + "LOG_NAME", +) + + +def write_config(params: dict[str, Any], path: str | Path) -> Path: + config_path = Path(path) + config_path.parent.mkdir(parents=True, exist_ok=True) + lines = [f"{key}={params.get(key, '')}" for key in CONFIG_KEYS] + config_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + return config_path + diff --git a/pam_deploy_graph/constants.py b/pam_deploy_graph/constants.py new file mode 100644 index 0000000..4d2b0ef --- /dev/null +++ b/pam_deploy_graph/constants.py @@ -0,0 +1,69 @@ +"""Constants for PAM deploy action routing.""" + +HOME_ACTIONS = ( + "get-token", + "create-version", + "upload-package", + "publish-version", + "get-node-url", +) + +NODE_ACTIONS = ( + "get-online-ips", + "create-download-task", + "poll-download-progress", + "upgrade-ip", + "poll-upgrade-progress", + "start-ip", + "stop-ip", + "verify-ip", + "download-log", + "rollback-ip", +) + +GLOBAL_ACTION_SEQUENCE = ( + "get-token", + "create-version", + "upload-package", + "publish-version", + "get-node-url", + "get-online-ips", + "create-download-task", + "poll-download-progress", +) + +IP_ACTION_SEQUENCE = ( + "upgrade-ip", + "poll-upgrade-progress", + "start-ip", + "verify-ip", + "download-log", +) + +ALLOWED_ACTIONS = HOME_ACTIONS + NODE_ACTIONS + +REQUIRED_PARAMS = ( + "HOME_BASE_URL", + "CLIENT_ID", + "CLIENT_SECRET", + "AIRPORT_CODE", + "APP_NAME", + "MODULE_NAME", + "VERSION_NUMBER", + "ZIP_FILE_PATH", +) + +DEFAULT_PARAMS = { + "ACTION_TYPE": "FULL", + "TIMEOUT": 120, + "LOG_NAME": "app.log", +} + +SENSITIVE_KEYS = { + "CLIENT_SECRET", + "TOKEN", + "Authorization", + "access_token", + "ACCESS_TOKEN", +} + diff --git a/pam_deploy_graph/fake_runner.py b/pam_deploy_graph/fake_runner.py new file mode 100644 index 0000000..7088bd1 --- /dev/null +++ b/pam_deploy_graph/fake_runner.py @@ -0,0 +1,44 @@ +"""Fake action runner for graph and agent tests.""" + +from __future__ import annotations + +from typing import Any + +from .models import ActionResult + + +class FakeActionRunner: + def __init__(self, fixtures: dict[str, dict[str, Any]] | None = None) -> None: + self.fixtures = fixtures or {} + self.calls: list[tuple[str, dict[str, Any]]] = [] + + def run(self, action: str, *, params: dict[str, Any], **kwargs: Any) -> ActionResult: + self.calls.append((action, kwargs)) + values = self.fixtures.get(action, {}).copy() + if not values: + values = self._default_values(action, kwargs) + ok = not values.pop("_fail", False) + return ActionResult( + action=action, + backend="fake", + tool_name=f"fake:{action}", + ok=ok, + values=values, + exit_code=0 if ok else 1, + raw_output=str(values), + error_summary="" if ok else str(values.get("MESSAGE", "Fake action failed")), + ) + + def _default_values(self, action: str, kwargs: dict[str, Any]) -> dict[str, Any]: + if action == "get-token": + return {"ACTION": action, "TOKEN": "***"} + if action == "upload-package": + return {"ACTION": action, "HASH_CODE": "fake-hash"} + if action == "get-node-url": + return {"ACTION": action, "NODE_URL": "https://fake-node.local"} + if action == "get-online-ips": + return {"ACTION": action, "COUNT": "2", "IP": ["192.168.1.10", "192.168.1.11"]} + if action == "download-log": + return {"ACTION": action, "IP": kwargs.get("ip", ""), "LOG_FILE": "logs/fake.zip"} + return {"ACTION": action, "RESULT": "OK"} + diff --git a/pam_deploy_graph/graph.py b/pam_deploy_graph/graph.py new file mode 100644 index 0000000..607b272 --- /dev/null +++ b/pam_deploy_graph/graph.py @@ -0,0 +1,24 @@ +"""Optional LangGraph integration. + +The runtime works without LangGraph installed. This module exposes a factory for +projects that install the optional dependency. +""" + +from __future__ import annotations + + +def build_langgraph(): + try: + from langgraph.graph import END, START, StateGraph + except ImportError as exc: # pragma: no cover - depends on optional package + raise RuntimeError( + "langgraph is not installed. Install the optional dependency with " + "`pip install -e .[langgraph]`." + ) from exc + + graph = StateGraph(dict) + graph.add_node("start", lambda state: state) + graph.add_edge(START, "start") + graph.add_edge("start", END) + return graph.compile() + diff --git a/pam_deploy_graph/mcp_runner.py b/pam_deploy_graph/mcp_runner.py new file mode 100644 index 0000000..8cff615 --- /dev/null +++ b/pam_deploy_graph/mcp_runner.py @@ -0,0 +1,93 @@ +"""Runner wrapper for PAM_NODE MCP tools.""" + +from __future__ import annotations + +from typing import Any, Protocol + +from .models import ActionResult +from .output_parser import parse_mcp_result + + +class McpToolClient(Protocol): + def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: + ... + + +DEFAULT_NODE_MCP_TOOLS = { + "get-online-ips": "pam_get_online_ips", + "create-download-task": "pam_create_download_task", + "poll-download-progress": "pam_poll_download_progress", + "upgrade-ip": "pam_upgrade_ip", + "poll-upgrade-progress": "pam_poll_upgrade_progress", + "start-ip": "pam_start_ip", + "stop-ip": "pam_stop_ip", + "verify-ip": "pam_verify_ip", + "download-log": "pam_download_log", + "rollback-ip": "pam_rollback_ip", +} + + +class McpActionRunner: + def __init__( + self, + client: McpToolClient | None = None, + tool_names: dict[str, str] | None = None, + ) -> None: + self.client = client + self.tool_names = tool_names or DEFAULT_NODE_MCP_TOOLS.copy() + + def run( + self, + action: str, + *, + params: dict[str, Any], + ip: str | None = None, + hash_code: str | None = None, + stop_first: bool = False, + **_: Any, + ) -> ActionResult: + if self.client is None: + raise RuntimeError("MCP client is not configured") + tool_name = self.tool_names.get(action) + if not tool_name: + raise ValueError(f"No MCP tool mapped for action: {action}") + arguments = self._build_arguments( + action, + params=params, + ip=ip, + hash_code=hash_code, + stop_first=stop_first, + ) + try: + payload = self.client.call_tool(tool_name, arguments) + except Exception as exc: # pragma: no cover - defensive wrapper + return parse_mcp_result(action, {}, ok=False, tool_name=tool_name, error=str(exc)) + return parse_mcp_result(action, payload, ok=True, tool_name=tool_name) + + def _build_arguments( + self, + action: str, + *, + params: dict[str, Any], + ip: str | None, + hash_code: str | None, + stop_first: bool, + ) -> dict[str, Any]: + arguments = { + "homeBaseUrl": params.get("HOME_BASE_URL"), + "airportCode": params.get("AIRPORT_CODE"), + "applicationName": params.get("APP_NAME"), + "moduleName": params.get("MODULE_NAME"), + "versionNumber": params.get("VERSION_NUMBER"), + "actionType": params.get("ACTION_TYPE"), + "timeOut": params.get("TIMEOUT"), + "logName": params.get("LOG_NAME"), + } + if ip: + arguments["targetIp"] = ip + if hash_code: + arguments["hashCode"] = hash_code + if action == "rollback-ip": + arguments["stopFirst"] = stop_first + return {key: value for key, value in arguments.items() if value not in (None, "")} + diff --git a/pam_deploy_graph/models.py b/pam_deploy_graph/models.py new file mode 100644 index 0000000..fd67e6f --- /dev/null +++ b/pam_deploy_graph/models.py @@ -0,0 +1,71 @@ +"""Shared dataclasses for the PAM deploy agent.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal + +BackendName = Literal["mcp", "script", "fake"] +ExecutionStrategy = Literal["hybrid_node_mcp", "script_only", "fake"] + + +@dataclass(slots=True) +class ActionResult: + action: str + backend: BackendName + ok: bool + values: dict[str, Any] = field(default_factory=dict) + exit_code: int = 0 + tool_name: str = "" + stdout: str = "" + stderr: str = "" + raw_output: str = "" + error_summary: str = "" + + +@dataclass(slots=True) +class SkillPolicy: + name: str + source_path: str + description: str = "" + allowed_modes: tuple[str, ...] = ("MCP", "API脚本") + allowed_actions: tuple[str, ...] = () + required_confirmations: tuple[str, ...] = ( + "params", + "target_scope", + "rollback", + ) + required_params: tuple[str, ...] = () + optional_params: dict[str, Any] = field(default_factory=dict) + action_sequence: tuple[str, ...] = () + ip_action_sequence: tuple[str, ...] = () + forbidden_actions: tuple[str, ...] = ( + "script-main-flow", + "auto-rollback", + "modify-deploy-scripts", + ) + + +@dataclass(slots=True) +class AgentState: + run_id: str + params: dict[str, Any] + execution_strategy: ExecutionStrategy + action_backends: dict[str, BackendName] + script_entry: str = "" + script_base_dir: str = "." + config_path: str = "" + trace_file_path: str = "" + node_mcp_server_name: str = "" + node_mcp_tool_names: dict[str, str] = field(default_factory=dict) + completed_global_steps: list[str] = field(default_factory=list) + hash_code: str = "" + node_url: str = "" + online_ips: list[str] = field(default_factory=list) + target_ips: list[str] = field(default_factory=list) + ip_states: dict[str, dict[str, Any]] = field(default_factory=dict) + pending_confirmation: str = "" + last_success_step: str = "" + last_failed_step: str = "" + events: list[dict[str, Any]] = field(default_factory=list) + diff --git a/pam_deploy_graph/output_parser.py b/pam_deploy_graph/output_parser.py new file mode 100644 index 0000000..d3a48d7 --- /dev/null +++ b/pam_deploy_graph/output_parser.py @@ -0,0 +1,133 @@ +"""Normalize script stdout and MCP tool returns into ActionResult objects.""" + +from __future__ import annotations + +import json +import re +from typing import Any + +from .constants import SENSITIVE_KEYS +from .models import ActionResult, BackendName + +PENDING_CONFIRMATION_RE = re.compile(r"PENDING_AGENT_CONFIRMATION\((?P[^)]*)\)") +KEY_VALUE_RE = re.compile(r"^(?P[A-Za-z_][A-Za-z0-9_]*)=(?P.*)$") + + +def redact_text(text: str) -> str: + redacted = text + for key in SENSITIVE_KEYS: + redacted = re.sub( + rf"({re.escape(key)}\s*[:=]\s*)([^\s&]+)", + rf"\1***", + redacted, + flags=re.IGNORECASE, + ) + return redacted + + +def parse_key_values(text: str) -> dict[str, Any]: + values: dict[str, Any] = {} + for raw_line in text.splitlines(): + line = raw_line.strip() + match = KEY_VALUE_RE.match(line) + if not match: + continue + key = match.group("key") + value = match.group("value") + if key == "IP": + values.setdefault("IP", []).append(value) + else: + values[key] = value + return values + + +def normalize_mcp_values(payload: Any) -> dict[str, Any]: + if isinstance(payload, str): + try: + payload = json.loads(payload) + except json.JSONDecodeError: + return parse_key_values(payload) + if not isinstance(payload, dict): + return {"RESULT": payload} + + values: dict[str, Any] = dict(payload) + aliases = { + "hashCode": "HASH_CODE", + "nodeUrl": "NODE_URL", + "rateOfProgress": "RATE_OF_PROGRESS", + "logFile": "LOG_FILE", + "action": "ACTION", + "result": "RESULT", + "success": "SUCCESS", + "message": "MESSAGE", + } + for source, target in aliases.items(): + if source in values and target not in values: + values[target] = values[source] + return values + + +def parse_script_result( + action: str, + stdout: str, + stderr: str, + exit_code: int, + backend: BackendName = "script", + tool_name: str = "", +) -> ActionResult: + raw_output = redact_text("\n".join(part for part in (stdout, stderr) if part)) + values = parse_key_values(stdout) + pending = PENDING_CONFIRMATION_RE.search(stdout) or PENDING_CONFIRMATION_RE.search(stderr) + if pending: + values["PENDING_AGENT_CONFIRMATION"] = pending.group(0) + + ok = exit_code == 0 and not pending + error_summary = "" if ok else _summarize_error(stderr, stdout, pending.group(0) if pending else "") + return ActionResult( + action=action, + backend=backend, + tool_name=tool_name or action, + ok=ok, + values=values, + exit_code=exit_code, + stdout=redact_text(stdout), + stderr=redact_text(stderr), + raw_output=raw_output, + error_summary=error_summary, + ) + + +def parse_mcp_result( + action: str, + payload: Any, + *, + ok: bool = True, + tool_name: str = "", + error: str = "", +) -> ActionResult: + values = normalize_mcp_values(payload) + raw_output = redact_text(json.dumps(payload, ensure_ascii=False, default=str)) + return ActionResult( + action=action, + backend="mcp", + tool_name=tool_name or action, + ok=ok, + values=values, + exit_code=0 if ok else 1, + stdout=raw_output if ok else "", + stderr=redact_text(error), + raw_output=raw_output, + error_summary="" if ok else redact_text(error or "MCP tool failed"), + ) + + +def _summarize_error(stderr: str, stdout: str, pending: str) -> str: + if pending: + return pending + for text in (stderr, stdout): + for line in reversed(text.splitlines()): + stripped = line.strip() + if stripped: + return redact_text(stripped) + return "Action failed" + diff --git a/pam_deploy_graph/params_loader.py b/pam_deploy_graph/params_loader.py new file mode 100644 index 0000000..759c8c9 --- /dev/null +++ b/pam_deploy_graph/params_loader.py @@ -0,0 +1,27 @@ +"""Load deploy parameters from JSON or config.txt style files.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + + +def load_params_file(path: str | Path) -> dict[str, Any]: + config_path = Path(path) + text = config_path.read_text(encoding="utf-8") + stripped = text.lstrip() + if stripped.startswith("{"): + return json.loads(text) + + values: dict[str, Any] = {} + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, value = line.split("=", 1) + values[key.strip()] = value.strip() + return values + diff --git a/pam_deploy_graph/script_runner.py b/pam_deploy_graph/script_runner.py new file mode 100644 index 0000000..3f87872 --- /dev/null +++ b/pam_deploy_graph/script_runner.py @@ -0,0 +1,114 @@ +"""Subprocess runner for deploy.sh and deploy.ps1 action calls.""" + +from __future__ import annotations + +import subprocess +from pathlib import Path +from typing import Any + +from .models import ActionResult +from .output_parser import parse_script_result + + +class ScriptActionRunner: + def __init__(self, script_base_dir: str | Path = "doc_scripts") -> None: + self.script_base_dir = Path(script_base_dir) + + def run( + self, + action: str, + *, + params: dict[str, Any], + script_entry: str, + config_path: str, + ip: str | None = None, + hash_code: str | None = None, + stop_first: bool = False, + trace_file_path: str | None = None, + timeout_sec: int | None = None, + ) -> ActionResult: + command = self.build_command( + action, + script_entry=script_entry, + config_path=config_path, + ip=ip, + hash_code=hash_code, + stop_first=stop_first, + trace_file_path=trace_file_path, + ) + completed = subprocess.run( + command, + cwd=str(self.script_base_dir), + capture_output=True, + text=True, + timeout=timeout_sec, + check=False, + ) + return parse_script_result( + action=action, + stdout=completed.stdout, + stderr=completed.stderr, + exit_code=completed.returncode, + backend="script", + tool_name=script_entry, + ) + + def build_command( + self, + action: str, + *, + script_entry: str, + config_path: str, + ip: str | None = None, + hash_code: str | None = None, + stop_first: bool = False, + trace_file_path: str | None = None, + ) -> list[str]: + if script_entry == "deploy.sh": + command = [ + "bash", + "./deploy.sh", + "--config", + config_path, + "--action", + action, + ] + if ip: + command.extend(["--ip", ip]) + if hash_code: + command.extend(["--hash-code", hash_code]) + if stop_first: + command.append("--stop-first") + if trace_file_path: + command.extend(["--trace-file", trace_file_path]) + return command + + if script_entry == "deploy.ps1": + command = [ + "powershell", + "-File", + ".\\deploy.ps1", + "-ConfigPath", + config_path, + "-Action", + action, + ] + if ip: + command.extend(["-Ip", ip]) + if hash_code: + command.extend(["-HashCode", hash_code]) + if stop_first: + command.append("-RollbackStopFirst") + return command + + raise ValueError(f"Unsupported script entry: {script_entry}") + + +def select_script_entry(os_name: str | None = None) -> str: + import platform + + name = (os_name or platform.system()).lower() + if "windows" in name: + return "deploy.ps1" + return "deploy.sh" + diff --git a/pam_deploy_graph/skill_policy.py b/pam_deploy_graph/skill_policy.py new file mode 100644 index 0000000..5ce3d19 --- /dev/null +++ b/pam_deploy_graph/skill_policy.py @@ -0,0 +1,42 @@ +"""Load the PAM deploy Skill document into a compact policy object.""" + +from __future__ import annotations + +from pathlib import Path + +from .constants import ( + ALLOWED_ACTIONS, + DEFAULT_PARAMS, + GLOBAL_ACTION_SEQUENCE, + IP_ACTION_SEQUENCE, + REQUIRED_PARAMS, +) +from .models import SkillPolicy + + +def load_skill_policy(path: str | Path) -> SkillPolicy: + skill_path = Path(path) + text = skill_path.read_text(encoding="utf-8") + name = "pam-auto-deply" + description = "" + + if text.startswith("---"): + parts = text.split("---", 2) + if len(parts) >= 3: + for line in parts[1].splitlines(): + if line.startswith("name:"): + name = line.split(":", 1)[1].strip() + elif line.startswith("description:"): + description = line.split(":", 1)[1].strip() + + return SkillPolicy( + name=name, + source_path=str(skill_path), + description=description, + allowed_actions=ALLOWED_ACTIONS, + required_params=REQUIRED_PARAMS, + optional_params=DEFAULT_PARAMS.copy(), + action_sequence=GLOBAL_ACTION_SEQUENCE, + ip_action_sequence=IP_ACTION_SEQUENCE, + ) + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..84ffe84 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "pam-deploy-graph" +version = "0.1.0" +description = "LangGraph-style PAM deploy agent with Skill policy, mixed HOME script actions, and NODE MCP routing." +requires-python = ">=3.11" +dependencies = [] + +[project.optional-dependencies] +langgraph = ["langgraph"] +test = ["pytest"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["."] + diff --git a/tests/test_action_router.py b/tests/test_action_router.py new file mode 100644 index 0000000..26e361b --- /dev/null +++ b/tests/test_action_router.py @@ -0,0 +1,15 @@ +from pam_deploy_graph.action_router import build_action_backends + + +def test_hybrid_routes_home_to_script_and_node_to_mcp(): + routes = build_action_backends("hybrid_node_mcp") + assert routes["get-token"] == "script" + assert routes["publish-version"] == "script" + assert routes["get-online-ips"] == "mcp" + assert routes["verify-ip"] == "mcp" + + +def test_script_only_routes_everything_to_script(): + routes = build_action_backends("script_only") + assert set(routes.values()) == {"script"} + diff --git a/tests/test_output_parser.py b/tests/test_output_parser.py new file mode 100644 index 0000000..c4954d7 --- /dev/null +++ b/tests/test_output_parser.py @@ -0,0 +1,24 @@ +from pam_deploy_graph.output_parser import parse_key_values, parse_script_result, parse_mcp_result + + +def test_parse_key_values_collects_repeated_ips(): + values = parse_key_values("ACTION=get-online-ips\nCOUNT=2\nIP=1.1.1.1\nIP=2.2.2.2\n") + assert values["ACTION"] == "get-online-ips" + assert values["IP"] == ["1.1.1.1", "2.2.2.2"] + + +def test_parse_script_result_detects_pending_confirmation(): + result = parse_script_result( + "verify-ip", + "PENDING_AGENT_CONFIRMATION(stopFirst=true)\n", + "", + 0, + ) + assert not result.ok + assert result.values["PENDING_AGENT_CONFIRMATION"] == "PENDING_AGENT_CONFIRMATION(stopFirst=true)" + + +def test_parse_mcp_result_normalizes_aliases(): + result = parse_mcp_result("upload-package", {"hashCode": "abc"}) + assert result.values["HASH_CODE"] == "abc" + diff --git a/tests/test_params_loader.py b/tests/test_params_loader.py new file mode 100644 index 0000000..d5cc027 --- /dev/null +++ b/tests/test_params_loader.py @@ -0,0 +1,18 @@ +from pathlib import Path + +from pam_deploy_graph.params_loader import load_params_file + + +def test_load_key_value_config(tmp_path: Path): + path = tmp_path / "config.txt" + path.write_text("HOME_BASE_URL=https://x\nTIMEOUT=120\n", encoding="utf-8") + params = load_params_file(path) + assert params["HOME_BASE_URL"] == "https://x" + assert params["TIMEOUT"] == "120" + + +def test_load_json_config(tmp_path: Path): + path = tmp_path / "params.json" + path.write_text('{"HOME_BASE_URL": "https://x"}', encoding="utf-8") + params = load_params_file(path) + assert params["HOME_BASE_URL"] == "https://x" diff --git a/tests/test_script_runner.py b/tests/test_script_runner.py new file mode 100644 index 0000000..f438897 --- /dev/null +++ b/tests/test_script_runner.py @@ -0,0 +1,48 @@ +from pam_deploy_graph.script_runner import ScriptActionRunner + + +def test_build_shell_action_command(): + runner = ScriptActionRunner() + command = runner.build_command( + "publish-version", + script_entry="deploy.sh", + config_path="./config.txt", + hash_code="abc", + trace_file_path="./logs/trace.log", + ) + assert command == [ + "bash", + "./deploy.sh", + "--config", + "./config.txt", + "--action", + "publish-version", + "--hash-code", + "abc", + "--trace-file", + "./logs/trace.log", + ] + + +def test_build_powershell_action_command(): + runner = ScriptActionRunner() + command = runner.build_command( + "rollback-ip", + script_entry="deploy.ps1", + config_path=".\\config.txt", + ip="192.168.1.10", + stop_first=True, + ) + assert command == [ + "powershell", + "-File", + ".\\deploy.ps1", + "-ConfigPath", + ".\\config.txt", + "-Action", + "rollback-ip", + "-Ip", + "192.168.1.10", + "-RollbackStopFirst", + ] + diff --git a/tests/test_skill_policy.py b/tests/test_skill_policy.py new file mode 100644 index 0000000..11201bf --- /dev/null +++ b/tests/test_skill_policy.py @@ -0,0 +1,11 @@ +from pathlib import Path + +from pam_deploy_graph.skill_policy import load_skill_policy + + +def test_load_skill_policy_from_doc(): + policy = load_skill_policy(Path("doc_scripts/PAM_AUTO_DEPLY_SKILL.md")) + assert policy.name == "pam-auto-deply" + assert "get-token" in policy.allowed_actions + assert "CLIENT_SECRET" in policy.required_params +