diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f90f166 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +*.py[cod] +.pytest_cache/ +*.egg-info/ +runtime/ +logs/ diff --git a/README.md b/README.md index c6ba8e0..b14994b 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ - PAM_NODE action 可通过 MCP runner 调用。 - 默认执行策略为 `hybrid_node_mcp`,即 HOME 脚本 action + NODE MCP。 - 离线策略为 `script_only`,全部 action 走脚本 action。 -- `langgraph` 当前作为可选依赖;本地未安装时,核心 Agent、runner、router 和 parser 仍可独立测试。 +- `langgraph` 已作为正式依赖引入;核心 Agent、runner、router 和 parser 也可独立测试。 ## 当前代码骨架 @@ -26,7 +26,9 @@ pam_deploy_graph/ config_writer.py # 生成脚本 action 所需 config 文件 checkpoint_store.py # 业务 checkpoint JSON 读写 params_loader.py # 读取 JSON 或 config.txt 风格参数文件 - graph.py # 可选 LangGraph 集成入口 + llm/ # LLM structured output 接口、规则 fallback 和 guardrails + graph.py # LangGraph StateGraph 集成入口 + mcp_client.py # MCP session/callable adapter cli.py # CLI 入口 tests/ @@ -48,14 +50,22 @@ tests/ - 实现 MCP runner 抽象和 PAM_NODE action 到 MCP tool 的默认映射。 - 实现脚本/MCP/fake action 结果统一为 `ActionResult`。 - 实现 `config.txt.example` 风格和 JSON 风格参数读取。 -- 实现 fake 全局流程,便于不触碰真实环境地验证 Agent 路由。 -- 添加基础测试,当前 `10 passed`。 +- 实现 fake 全局流程和完整部署流程,便于不触碰真实环境地验证 Agent 路由。 +- 实现逐 IP 处理骨架:升级、轮询、启动、校验、日志下载。 +- 实现单 IP 失败后的待回滚确认状态,不自动执行回滚。 +- 实现 LLM structured output 骨架:意图识别、参数抽取、部署计划生成。 +- 增加规则 fallback `RuleBasedLlmClient`,用于本地开发和测试。 +- 增加 LLM 输出 guardrails,禁止计划中出现可执行脚本命令和非法 action。 +- 引入 `langgraph` 依赖,并提供 `build_langgraph()` 图工厂。 +- 引入 MCP client adapter,可包装 SDK session 或普通 callable。 +- 本地已安装 `langgraph` 和 `mcp`,并完成 LangGraph fake 全局流程 smoke。 +- CLI `analyze` 输出已做敏感字段脱敏。 +- 添加基础测试,当前本地结果为 `22 passed, 1 skipped`。 未完成: - 尚未接入真实 MCP client。 -- 尚未安装并接入真实 LangGraph `StateGraph` 主图。 -- 尚未实现 LLM 结构化意图识别、参数抽取和计划生成。 +- 尚未接入真实 LLM 服务,目前使用规则 fallback。 - 尚未实现人工确认 interrupt、断点续跑完整图流程和单 IP 子流程。 - 尚未执行真实脚本 action 或真实 PAM_NODE MCP 调用。 @@ -73,6 +83,18 @@ fake 全局流程验证: python -m pam_deploy_graph.cli run-global --config doc_scripts/config.txt.example --strategy fake --confirm ``` +fake 完整部署流程验证: + +```bash +python -m pam_deploy_graph.cli run-deploy --config doc_scripts/config.txt.example --strategy fake --confirm +``` + +结构化理解和计划生成: + +```bash +python -m pam_deploy_graph.cli analyze --config doc_scripts/config.txt.example --text "请用 MCP 预演部署 HET PAM Node 版本 2.0.5,不要动环境" +``` + 测试: ```bash @@ -81,10 +103,10 @@ pytest -q ## 下一步建议 -1. 接入真实 PAM_NODE MCP client,实现 `McpToolClient.call_tool()`。 +1. 接入真实 PAM_NODE MCP session,并用 `SessionMcpToolClient` 包装。 2. 用 fake runner 补齐完整部署主流程和单 IP 子流程测试。 3. 引入 LangGraph,把当前 Agent 节点接入 `StateGraph`。 4. 增加人工确认节点:参数确认、IP 范围确认、回滚确认。 -5. 增加 LLM structured output:意图识别、参数抽取、部署计划、失败解释。 +5. 接入真实 LLM 服务,实现 `RuleBasedLlmClient` 同协议替换。 6. 完善 checkpoint 恢复:全局步骤跳过、成功 IP 跳过、pending rollback 恢复。 7. 在测试环境中做 smoke:HOME 脚本 `get-token/get-node-url` + NODE MCP `get-online-ips`。 diff --git a/pam_deploy_graph/agent.py b/pam_deploy_graph/agent.py index 4f48c93..b89912a 100644 --- a/pam_deploy_graph/agent.py +++ b/pam_deploy_graph/agent.py @@ -12,10 +12,11 @@ from typing import Any from .action_router import ActionRouter, build_action_backends from .config_writer import write_config -from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS +from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, REQUIRED_PARAMS from .fake_runner import FakeActionRunner +from .llm import RuleBasedLlmClient, validate_deploy_plan, validate_intent_result from .mcp_runner import McpActionRunner -from .models import AgentState, ExecutionStrategy +from .models import AgentState, ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult from .script_runner import ScriptActionRunner, select_script_entry from .skill_policy import load_skill_policy @@ -28,18 +29,54 @@ class PamDeployAgent: script_base_dir: str | Path = "doc_scripts", mcp_runner: McpActionRunner | None = None, fake_runner: FakeActionRunner | None = None, + llm_client: RuleBasedLlmClient | None = None, ) -> None: self.skill_policy = load_skill_policy(skill_path) self.script_base_dir = Path(script_base_dir) self.script_runner = ScriptActionRunner(self.script_base_dir) self.fake_runner = fake_runner or FakeActionRunner() self.mcp_runner = mcp_runner + self.llm_client = llm_client or RuleBasedLlmClient() self.router = ActionRouter( script_runner=self.script_runner, mcp_runner=mcp_runner, fake_runner=self.fake_runner, ) + def understand_request(self, text: str) -> LlmIntentResult: + result = self.llm_client.understand_request(text) + validate_intent_result(result) + return result + + def extract_params(self, text: str, base_params: dict[str, Any] | None = None) -> LlmParamResult: + return self.llm_client.extract_params(text, base_params) + + def generate_plan( + self, + *, + params: dict[str, Any], + intent: str, + strategy: ExecutionStrategy, + ) -> LlmDeployPlan: + plan = self.llm_client.generate_plan(params=params, intent=intent, strategy=strategy) + validate_deploy_plan(plan) + return plan + + def analyze_request(self, text: str, base_params: dict[str, Any] | None = None) -> dict[str, Any]: + intent = self.understand_request(text) + params = self.extract_params(text, base_params) + strategy = self._choose_strategy(intent.strategy_preference) + plan = self.generate_plan( + params={**DEFAULT_PARAMS, **params.extracted_params}, + intent=intent.intent, + strategy=strategy, + ) + return { + "intent": intent, + "params": params, + "plan": plan, + } + def normalize_params(self, params: dict[str, Any]) -> dict[str, Any]: normalized = {**DEFAULT_PARAMS, **params} missing = [key for key in REQUIRED_PARAMS if not normalized.get(key)] @@ -47,6 +84,11 @@ class PamDeployAgent: raise ValueError(f"Missing required params: {', '.join(missing)}") return normalized + def _choose_strategy(self, preference: str) -> ExecutionStrategy: + if preference in ("hybrid_node_mcp", "script_only", "fake"): + return preference # type: ignore[return-value] + return "hybrid_node_mcp" + def create_state( self, *, @@ -56,6 +98,7 @@ class PamDeployAgent: script_entry: str | None = None, config_path: str | None = None, trace_file_path: str | None = None, + target_ips: list[str] | None = None, ) -> AgentState: normalized = self.normalize_params(params) actual_run_id = run_id or time.strftime("%Y%m%d_%H%M%S") @@ -73,6 +116,7 @@ class PamDeployAgent: script_base_dir=str(self.script_base_dir), config_path=actual_config_path, trace_file_path=actual_trace_path, + target_ips=target_ips or [], ) def preview(self, params: dict[str, Any], strategy: ExecutionStrategy = "hybrid_node_mcp") -> str: @@ -127,6 +171,53 @@ class PamDeployAgent: state.last_success_step = action return state + def run_deploy_flow(self, state: AgentState) -> AgentState: + self.run_global_flow(state) + self.run_ip_flow(state) + return state + + def run_ip_flow(self, state: AgentState) -> AgentState: + self._resolve_target_ips(state) + for ip in state.target_ips: + state.events.append({"type": "IP_START", "ip": ip, "message": "start"}) + ip_state = { + "status": "RUNNING", + "completed_steps": [], + "failed_stage": "", + "failure_reason": "", + "rollback_status": "ROLLBACK_NOT_RUN", + "rollback_stop_first": False, + "log_file": "", + } + state.ip_states[ip] = ip_state + + for action in IP_ACTION_SEQUENCE: + result = self.router.run_action(state, action, ip=ip) + failed = (not result.ok) or self._business_failed(action, result.values) + state.events.append( + { + "type": "ACTION_FAIL" if failed else "ACTION_DONE", + "stage": action, + "backend": result.backend, + "ip": ip, + "message": result.error_summary or result.values.get("MESSAGE", "ok"), + } + ) + + if failed: + self._record_ip_failure(state, ip, action, result.error_summary or str(result.values)) + if action != "download-log": + self._download_log_best_effort(state, ip) + state.pending_confirmation = f"rollback-ip:{ip}" + return state + + self._apply_ip_result(ip_state, action, result.values) + ip_state["completed_steps"].append(action) + + ip_state["status"] = "SUCCESS" + state.events.append({"type": "IP_DONE", "ip": ip, "message": "success"}) + return state + def _apply_result(self, state: AgentState, action: str, values: dict[str, Any]) -> None: if "HASH_CODE" in values: state.hash_code = str(values["HASH_CODE"]) @@ -138,3 +229,113 @@ class PamDeployAgent: ips = [ips] state.online_ips = list(ips) state.target_ips = state.target_ips or state.online_ips.copy() + + def _resolve_target_ips(self, state: AgentState) -> None: + if not state.target_ips: + state.target_ips = state.online_ips.copy() + return + online = set(state.online_ips) + requested = state.target_ips + state.target_ips = [ip for ip in requested if ip in online] + missing = [ip for ip in requested if ip not in online] + if missing: + state.events.append( + { + "type": "TARGET_SCOPE_CHANGED", + "message": "some requested IPs are not online", + "missing_ips": missing, + "target_ips": state.target_ips, + } + ) + + def _business_failed(self, action: str, values: dict[str, Any]) -> bool: + if action == "verify-ip": + success = values.get("SUCCESS") + if success is None: + return False + return str(success).lower() not in ("true", "1", "yes") + return False + + def _apply_ip_result(self, ip_state: dict[str, Any], action: str, values: dict[str, Any]) -> None: + if action == "download-log": + ip_state["log_file"] = str(values.get("LOG_FILE", "")) + + def _record_ip_failure(self, state: AgentState, ip: str, action: str, reason: str) -> None: + ip_state = state.ip_states[ip] + stop_first = action in ("start-ip", "verify-ip") + ip_state.update( + { + "status": "FAILED", + "failed_stage": action, + "failure_reason": reason, + "rollback_status": "PENDING_AGENT_CONFIRMATION", + "rollback_stop_first": stop_first, + } + ) + state.last_failed_step = action + state.events.append( + { + "type": "CONFIRMATION_REQUIRED", + "stage": "rollback-ip", + "ip": ip, + "stop_first": stop_first, + "message": f"{action} failed; rollback confirmation required", + } + ) + + def _download_log_best_effort(self, state: AgentState, ip: str) -> None: + result = self.router.run_action(state, "download-log", ip=ip) + ip_state = state.ip_states[ip] + if result.ok: + ip_state["log_file"] = str(result.values.get("LOG_FILE", "")) + state.events.append( + { + "type": "ACTION_DONE", + "stage": "download-log", + "backend": result.backend, + "ip": ip, + "message": "best effort log downloaded", + } + ) + else: + state.events.append( + { + "type": "ACTION_FAIL", + "stage": "download-log", + "backend": result.backend, + "ip": ip, + "message": result.error_summary or "best effort log download failed", + } + ) + + def render_report(self, state: AgentState) -> str: + success = sum(1 for item in state.ip_states.values() if item.get("status") == "SUCCESS") + failed = sum(1 for item in state.ip_states.values() if item.get("status") == "FAILED") + lines = [ + "## PAM 智能部署报告", + "", + f"- 执行策略: {state.execution_strategy}", + f"- 机场: {state.params['AIRPORT_CODE']}", + f"- 应用: {state.params['APP_NAME']}", + f"- 模块: {state.params['MODULE_NAME']}", + f"- 版本: {state.params['VERSION_NUMBER']}", + f"- 在线工作站数: {len(state.online_ips)}", + f"- 目标工作站数: {len(state.target_ips)}", + f"- 成功: {success}", + f"- 失败: {failed}", + f"- 待确认: {state.pending_confirmation or '-'}", + "", + "| IP | 状态 | 失败阶段 | 回滚状态 | 日志 |", + "| --- | --- | --- | --- | --- |", + ] + for ip, ip_state in state.ip_states.items(): + lines.append( + "| {ip} | {status} | {failed_stage} | {rollback_status} | {log_file} |".format( + ip=ip, + status=ip_state.get("status", ""), + failed_stage=ip_state.get("failed_stage") or "-", + rollback_status=ip_state.get("rollback_status") or "-", + log_file=ip_state.get("log_file") or "-", + ) + ) + return "\n".join(lines) diff --git a/pam_deploy_graph/cli.py b/pam_deploy_graph/cli.py index e6fb7b0..2c807f9 100644 --- a/pam_deploy_graph/cli.py +++ b/pam_deploy_graph/cli.py @@ -4,8 +4,10 @@ from __future__ import annotations import argparse import json +from dataclasses import asdict from .agent import PamDeployAgent +from .checkpoint_store import redact_mapping from .params_loader import load_params_file @@ -17,24 +19,50 @@ def main() -> None: preview.add_argument("--config", required=True) preview.add_argument("--strategy", default="hybrid_node_mcp", choices=["hybrid_node_mcp", "script_only", "fake"]) + analyze = sub.add_parser("analyze") + analyze.add_argument("--text", required=True) + analyze.add_argument("--config") + run = sub.add_parser("run-global") run.add_argument("--config", required=True) run.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"]) run.add_argument("--confirm", action="store_true") + deploy = sub.add_parser("run-deploy") + deploy.add_argument("--config", required=True) + deploy.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"]) + deploy.add_argument("--target-ip", action="append", default=[]) + deploy.add_argument("--confirm", action="store_true") + args = parser.parse_args() - params = load_params_file(args.config) + params = load_params_file(args.config) if getattr(args, "config", None) else {} agent = PamDeployAgent() + if args.command == "analyze": + result = agent.analyze_request(args.text, params) + payload = redact_mapping({key: asdict(value) for key, value in result.items()}) + print(json.dumps(payload, ensure_ascii=False, indent=2)) + return + if args.command == "preview": print(agent.preview(params, args.strategy)) return if not args.confirm: raise SystemExit("Refusing to execute actions without --confirm.") - state = agent.create_state(params=params, execution_strategy=args.strategy) - state = agent.run_global_flow(state) - print(json.dumps({"events": state.events}, ensure_ascii=False, indent=2)) + if args.command == "run-global": + state = agent.create_state(params=params, execution_strategy=args.strategy) + state = agent.run_global_flow(state) + print(json.dumps({"events": state.events}, ensure_ascii=False, indent=2)) + return + + state = agent.create_state( + params=params, + execution_strategy=args.strategy, + target_ips=args.target_ip, + ) + state = agent.run_deploy_flow(state) + print(agent.render_report(state)) if __name__ == "__main__": diff --git a/pam_deploy_graph/fake_runner.py b/pam_deploy_graph/fake_runner.py index 7088bd1..90bd789 100644 --- a/pam_deploy_graph/fake_runner.py +++ b/pam_deploy_graph/fake_runner.py @@ -14,7 +14,7 @@ class FakeActionRunner: def run(self, action: str, *, params: dict[str, Any], **kwargs: Any) -> ActionResult: self.calls.append((action, kwargs)) - values = self.fixtures.get(action, {}).copy() + values = self._fixture_for(action, kwargs) if not values: values = self._default_values(action, kwargs) ok = not values.pop("_fail", False) @@ -38,7 +38,27 @@ class FakeActionRunner: return {"ACTION": action, "NODE_URL": "https://fake-node.local"} if action == "get-online-ips": return {"ACTION": action, "COUNT": "2", "IP": ["192.168.1.10", "192.168.1.11"]} + if action == "upgrade-ip": + return {"ACTION": action, "IP": kwargs.get("ip", ""), "RESULT": "TASK_CREATED"} + if action == "poll-upgrade-progress": + return { + "ACTION": action, + "IP": kwargs.get("ip", ""), + "STEP": "DONE", + "RATE_OF_PROGRESS": "100", + "MESSAGE": "success", + } + if action == "start-ip": + return {"ACTION": action, "IP": kwargs.get("ip", ""), "RESULT": "OK"} + if action == "verify-ip": + return {"ACTION": action, "IP": kwargs.get("ip", ""), "SUCCESS": "true", "MESSAGE": "ok"} if action == "download-log": return {"ACTION": action, "IP": kwargs.get("ip", ""), "LOG_FILE": "logs/fake.zip"} return {"ACTION": action, "RESULT": "OK"} + def _fixture_for(self, action: str, kwargs: dict[str, Any]) -> dict[str, Any]: + ip = kwargs.get("ip") + ip_key = f"{action}:{ip}" if ip else "" + if ip_key and ip_key in self.fixtures: + return self.fixtures[ip_key].copy() + return self.fixtures.get(action, {}).copy() diff --git a/pam_deploy_graph/graph.py b/pam_deploy_graph/graph.py index 607b272..8642c15 100644 --- a/pam_deploy_graph/graph.py +++ b/pam_deploy_graph/graph.py @@ -1,24 +1,67 @@ -"""Optional LangGraph integration. - -The runtime works without LangGraph installed. This module exposes a factory for -projects that install the optional dependency. -""" +"""LangGraph integration for the PAM deploy Agent.""" from __future__ import annotations +from typing import Any, Literal -def build_langgraph(): +from .agent import PamDeployAgent + +GraphFlow = Literal["global", "deploy"] + + +def build_langgraph(agent: PamDeployAgent | None = None, flow: GraphFlow = "deploy"): try: from langgraph.graph import END, START, StateGraph except ImportError as exc: # pragma: no cover - depends on optional package raise RuntimeError( - "langgraph is not installed. Install the optional dependency with " - "`pip install -e .[langgraph]`." + "langgraph is not installed. Install project dependencies with " + "`pip install -e .`." ) from exc + runtime = agent or PamDeployAgent() + + def create_state_node(state: dict[str, Any]) -> dict[str, Any]: + agent_state = runtime.create_state( + params=state["params"], + execution_strategy=state.get("execution_strategy", "hybrid_node_mcp"), + run_id=state.get("run_id"), + script_entry=state.get("script_entry"), + config_path=state.get("config_path"), + trace_file_path=state.get("trace_file_path"), + target_ips=state.get("target_ips"), + ) + return {"agent_state": agent_state} + + def run_global_node(state: dict[str, Any]) -> dict[str, Any]: + agent_state = runtime.run_global_flow(state["agent_state"]) + return {"agent_state": agent_state} + + def run_ip_node(state: dict[str, Any]) -> dict[str, Any]: + agent_state = runtime.run_ip_flow(state["agent_state"]) + return {"agent_state": agent_state} + + def report_node(state: dict[str, Any]) -> dict[str, Any]: + return {"report": runtime.render_report(state["agent_state"])} + graph = StateGraph(dict) - graph.add_node("start", lambda state: state) - graph.add_edge(START, "start") - graph.add_edge("start", END) + graph.add_node("create_state", create_state_node) + graph.add_node("run_global", run_global_node) + graph.add_node("run_ip", run_ip_node) + graph.add_node("report", report_node) + + graph.add_edge(START, "create_state") + graph.add_edge("create_state", "run_global") + if flow == "global": + graph.add_edge("run_global", END) + else: + graph.add_edge("run_global", "run_ip") + graph.add_edge("run_ip", "report") + graph.add_edge("report", END) return graph.compile() + +def build_graph_or_none(agent: PamDeployAgent | None = None, flow: GraphFlow = "deploy"): + try: + return build_langgraph(agent=agent, flow=flow) + except RuntimeError: + return None diff --git a/pam_deploy_graph/llm/__init__.py b/pam_deploy_graph/llm/__init__.py new file mode 100644 index 0000000..ef479b7 --- /dev/null +++ b/pam_deploy_graph/llm/__init__.py @@ -0,0 +1,7 @@ +"""LLM integration surfaces for PAM deploy Agent.""" + +from .rule_based import RuleBasedLlmClient +from .validators import validate_deploy_plan, validate_intent_result + +__all__ = ["RuleBasedLlmClient", "validate_deploy_plan", "validate_intent_result"] + diff --git a/pam_deploy_graph/llm/rule_based.py b/pam_deploy_graph/llm/rule_based.py new file mode 100644 index 0000000..b78f859 --- /dev/null +++ b/pam_deploy_graph/llm/rule_based.py @@ -0,0 +1,167 @@ +"""Deterministic fallback for LLM structured outputs. + +This class is intentionally not a replacement for a real model. It gives the +Agent stable structured outputs for local development and tests. A real LLM +client should implement the same methods. +""" + +from __future__ import annotations + +import re +from typing import Any + +from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS +from pam_deploy_graph.models import ( + ExecutionStrategy, + LlmDeployPlan, + LlmIntentResult, + LlmParamResult, +) + +KEY_ALIASES = { + "home_base_url": "HOME_BASE_URL", + "HOME_BASE_URL": "HOME_BASE_URL", + "client_id": "CLIENT_ID", + "CLIENT_ID": "CLIENT_ID", + "client_secret": "CLIENT_SECRET", + "CLIENT_SECRET": "CLIENT_SECRET", + "airportCode": "AIRPORT_CODE", + "AIRPORT_CODE": "AIRPORT_CODE", + "applicationName": "APP_NAME", + "APP_NAME": "APP_NAME", + "moduleName": "MODULE_NAME", + "MODULE_NAME": "MODULE_NAME", + "versionNumber": "VERSION_NUMBER", + "VERSION_NUMBER": "VERSION_NUMBER", + "zipFilePath": "ZIP_FILE_PATH", + "ZIP_FILE_PATH": "ZIP_FILE_PATH", + "actionType": "ACTION_TYPE", + "ACTION_TYPE": "ACTION_TYPE", + "timeOut": "TIMEOUT", + "TIMEOUT": "TIMEOUT", + "logName": "LOG_NAME", + "LOG_NAME": "LOG_NAME", +} + + +class RuleBasedLlmClient: + def understand_request(self, text: str) -> LlmIntentResult: + lowered = text.lower() + reasons: list[str] = [] + intent = "deploy" + + if any(word in lowered for word in ("用法", "怎么用", "生成脚本", "给我脚本", "usage")): + intent = "show_usage" + reasons.append("用户在询问脚本用法或脚本生成") + elif any(word in lowered for word in ("预演", "计划", "不执行", "不要动环境", "dry-run", "preview")): + intent = "preview" + reasons.append("用户要求只预演或不触碰环境") + elif any(word in lowered for word in ("在线ip", "在线 ip", "查询ip", "查询 ip", "node", "工作站")): + intent = "query_node_ips" + reasons.append("用户要求查询 Node 或在线工作站") + elif any(word in lowered for word in ("回滚", "rollback")): + intent = "rollback" + reasons.append("用户要求回滚") + else: + reasons.append("默认识别为部署请求") + + mode_preference = "未指定" + strategy_preference = "未指定" + if any(word in lowered for word in ("mcp", "在线执行", "直接在线")): + mode_preference = "MCP" + strategy_preference = "hybrid_node_mcp" + reasons.append("用户倾向 MCP;PAM_HOME 仍需脚本 action") + if any(word in lowered for word in ("脚本", "离线", "script", "shell", "powershell")): + mode_preference = "API脚本" + strategy_preference = "script_only" + reasons.append("用户倾向脚本或离线执行") + if intent == "preview": + strategy_preference = strategy_preference if strategy_preference != "未指定" else "hybrid_node_mcp" + + return LlmIntentResult( + intent=intent, # type: ignore[arg-type] + mode_preference=mode_preference, # type: ignore[arg-type] + strategy_preference=strategy_preference, # type: ignore[arg-type] + confidence=0.72 if intent != "deploy" else 0.6, + reasons=reasons, + ) + + def extract_params(self, text: str, base_params: dict[str, Any] | None = None) -> LlmParamResult: + params = dict(base_params or {}) + params.update(self._extract_key_values(text)) + params.update(self._extract_chinese_patterns(text)) + + control: dict[str, Any] = {} + ips = re.findall(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", text) + if ips: + control["user_specified_ips"] = ips + + missing = [key for key in REQUIRED_PARAMS if not params.get(key)] + sensitive = [key for key in ("CLIENT_SECRET", "CLIENT_ID") if params.get(key)] + return LlmParamResult( + extracted_params=params, + extracted_control=control, + missing_required_params=missing, + sensitive_fields_present=sensitive, + ) + + def generate_plan( + self, + *, + params: dict[str, Any], + intent: str, + strategy: ExecutionStrategy, + ) -> LlmDeployPlan: + if strategy == "hybrid_node_mcp": + strategy_text = "PAM_HOME 使用脚本 action,PAM_NODE 使用 MCP" + elif strategy == "script_only": + strategy_text = "全部 action 使用脚本 action" + else: + strategy_text = "全部 action 使用 fake runner" + + summary = ( + f"计划处理 {params.get('AIRPORT_CODE', '-')}/" + f"{params.get('APP_NAME', '-')}/" + f"{params.get('MODULE_NAME', '-')}/" + f"{params.get('VERSION_NUMBER', '-')},执行策略为 {strategy_text}。" + ) + risk_notes = [ + "真实部署前必须确认参数。", + "发布版本、创建下载任务、升级和回滚属于高风险动作。", + "回滚只能在用户确认后执行。", + ] + if strategy == "hybrid_node_mcp": + risk_notes.append("PAM_HOME 当前没有 MCP 能力,HOME 阶段仍会调用脚本 action。") + + return LlmDeployPlan( + summary=summary, + risk_notes=risk_notes, + planned_actions=list(GLOBAL_ACTION_SEQUENCE), + requires_confirmation=intent in ("deploy", "query_node_ips", "rollback"), + execution_strategy=strategy, + ) + + def _extract_key_values(self, text: str) -> dict[str, str]: + params: dict[str, str] = {} + for match in re.finditer(r"([A-Za-z_][A-Za-z0-9_]*)\s*=\s*([^\s,;]+)", text): + raw_key, value = match.groups() + key = KEY_ALIASES.get(raw_key) + if key: + params[key] = value.strip() + return params + + def _extract_chinese_patterns(self, text: str) -> dict[str, str]: + patterns = { + "AIRPORT_CODE": r"(?:机场|三字码)\s*[::]?\s*([A-Z]{3})", + "APP_NAME": r"(?:应用|应用名)\s*[::]?\s*([A-Za-z0-9_.-]+)", + "MODULE_NAME": r"(?:模块|模块名)\s*[::]?\s*([A-Za-z0-9_.-]+)", + "VERSION_NUMBER": r"(?:版本|版本号)\s*[::]?\s*([A-Za-z0-9_.-]+)", + "ZIP_FILE_PATH": r"(?:包|软件包|zip)\s*[::]?\s*([A-Za-z]:[\\/][^\s,;]+|/[^\s,;]+)", + } + params: dict[str, str] = {} + for key, pattern in patterns.items(): + match = re.search(pattern, text) + if match: + params[key] = match.group(1) + return params + diff --git a/pam_deploy_graph/llm/validators.py b/pam_deploy_graph/llm/validators.py new file mode 100644 index 0000000..2549e8b --- /dev/null +++ b/pam_deploy_graph/llm/validators.py @@ -0,0 +1,27 @@ +"""Validation and guardrails for LLM structured outputs.""" + +from __future__ import annotations + +from pam_deploy_graph.constants import ALLOWED_ACTIONS +from pam_deploy_graph.models import LlmDeployPlan, LlmIntentResult + +VALID_INTENTS = {"deploy", "show_usage", "preview", "query_node_ips", "rollback"} +FORBIDDEN_TEXT = ("bash ", "powershell ", "deploy.sh", "deploy.ps1", "CLIENT_SECRET=") + + +def validate_intent_result(result: LlmIntentResult) -> None: + if result.intent not in VALID_INTENTS: + raise ValueError(f"Invalid intent: {result.intent}") + if not 0 <= result.confidence <= 1: + raise ValueError("Intent confidence must be between 0 and 1") + + +def validate_deploy_plan(plan: LlmDeployPlan) -> None: + invalid = [action for action in plan.planned_actions if action not in ALLOWED_ACTIONS] + if invalid: + raise ValueError(f"Plan contains invalid actions: {', '.join(invalid)}") + combined_text = "\n".join([plan.summary, *plan.risk_notes]) + lowered = combined_text.lower() + forbidden = [item for item in FORBIDDEN_TEXT if item.lower() in lowered] + if forbidden: + raise ValueError(f"Plan contains forbidden executable text: {', '.join(forbidden)}") diff --git a/pam_deploy_graph/mcp_client.py b/pam_deploy_graph/mcp_client.py new file mode 100644 index 0000000..180bf39 --- /dev/null +++ b/pam_deploy_graph/mcp_client.py @@ -0,0 +1,65 @@ +"""MCP client adapters. + +The Agent only needs a synchronous `call_tool(name, arguments)` surface. This +module adapts simple callables or SDK-like sessions to that surface without +forcing the rest of the codebase to import a concrete MCP SDK. +""" + +from __future__ import annotations + +import json +from collections.abc import Callable +from typing import Any + + +class FunctionMcpToolClient: + """Wrap a plain Python callable as an MCP tool client.""" + + def __init__(self, caller: Callable[[str, dict[str, Any]], Any]) -> None: + self.caller = caller + + def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: + return self.caller(tool_name, arguments) + + +class SessionMcpToolClient: + """Adapt SDK-like sessions exposing `call_tool`. + + The adapter accepts common result shapes: + + - raw dict/list/string + - object with `structuredContent` + - object with `content`, where text content may contain JSON + """ + + def __init__(self, session: Any) -> None: + if not hasattr(session, "call_tool"): + raise TypeError("MCP session must expose call_tool") + self.session = session + + def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: + result = self.session.call_tool(tool_name, arguments) + return normalize_mcp_sdk_result(result) + + +def normalize_mcp_sdk_result(result: Any) -> Any: + if hasattr(result, "structuredContent"): + structured = getattr(result, "structuredContent") + if structured is not None: + return structured + + if hasattr(result, "content"): + content = getattr(result, "content") + text_parts: list[str] = [] + for item in content or []: + text = getattr(item, "text", None) + if text is not None: + text_parts.append(text) + if text_parts: + joined = "\n".join(text_parts) + try: + return json.loads(joined) + except json.JSONDecodeError: + return joined + + return result diff --git a/pam_deploy_graph/models.py b/pam_deploy_graph/models.py index fd67e6f..4e60be8 100644 --- a/pam_deploy_graph/models.py +++ b/pam_deploy_graph/models.py @@ -7,6 +7,9 @@ from typing import Any, Literal BackendName = Literal["mcp", "script", "fake"] ExecutionStrategy = Literal["hybrid_node_mcp", "script_only", "fake"] +IntentName = Literal["deploy", "show_usage", "preview", "query_node_ips", "rollback"] +ModePreference = Literal["MCP", "API脚本", "未指定"] +StrategyPreference = Literal["hybrid_node_mcp", "script_only", "fake", "未指定"] @dataclass(slots=True) @@ -46,6 +49,35 @@ class SkillPolicy: ) +@dataclass(slots=True) +class LlmIntentResult: + intent: IntentName + mode_preference: ModePreference = "未指定" + strategy_preference: StrategyPreference = "未指定" + confidence: float = 0.0 + reasons: list[str] = field(default_factory=list) + needs_clarification: bool = False + clarification_questions: list[str] = field(default_factory=list) + + +@dataclass(slots=True) +class LlmParamResult: + extracted_params: dict[str, Any] = field(default_factory=dict) + extracted_control: dict[str, Any] = field(default_factory=dict) + missing_required_params: list[str] = field(default_factory=list) + ambiguous_fields: list[str] = field(default_factory=list) + sensitive_fields_present: list[str] = field(default_factory=list) + + +@dataclass(slots=True) +class LlmDeployPlan: + summary: str + risk_notes: list[str] = field(default_factory=list) + planned_actions: list[str] = field(default_factory=list) + requires_confirmation: bool = True + execution_strategy: StrategyPreference = "未指定" + + @dataclass(slots=True) class AgentState: run_id: str @@ -68,4 +100,3 @@ class AgentState: last_success_step: str = "" last_failed_step: str = "" events: list[dict[str, Any]] = field(default_factory=list) - diff --git a/pyproject.toml b/pyproject.toml index 84ffe84..8036ab8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,13 +3,17 @@ name = "pam-deploy-graph" version = "0.1.0" description = "LangGraph-style PAM deploy agent with Skill policy, mixed HOME script actions, and NODE MCP routing." requires-python = ">=3.11" -dependencies = [] +dependencies = [ + "langgraph>=0.2", +] [project.optional-dependencies] -langgraph = ["langgraph"] +mcp = ["mcp>=1"] test = ["pytest"] [tool.pytest.ini_options] testpaths = ["tests"] pythonpath = ["."] +[tool.setuptools.packages.find] +include = ["pam_deploy_graph*"] diff --git a/tests/test_agent_flow.py b/tests/test_agent_flow.py new file mode 100644 index 0000000..1d0da52 --- /dev/null +++ b/tests/test_agent_flow.py @@ -0,0 +1,58 @@ +from pathlib import Path + +from pam_deploy_graph.agent import PamDeployAgent +from pam_deploy_graph.fake_runner import FakeActionRunner + + +PARAMS = { + "HOME_BASE_URL": "https://pam.home.example.com", + "CLIENT_ID": "client", + "CLIENT_SECRET": "secret", + "AIRPORT_CODE": "HET", + "APP_NAME": "PAM", + "MODULE_NAME": "Node", + "VERSION_NUMBER": "2.0.5", + "ZIP_FILE_PATH": "C:/pkg.zip", +} + + +def test_run_deploy_flow_success(tmp_path: Path): + agent = PamDeployAgent(fake_runner=FakeActionRunner()) + state = agent.create_state( + params=PARAMS, + execution_strategy="fake", + config_path=str(tmp_path / "config.txt"), + ) + + agent.run_deploy_flow(state) + + assert state.pending_confirmation == "" + assert set(state.ip_states) == {"192.168.1.10", "192.168.1.11"} + assert all(item["status"] == "SUCCESS" for item in state.ip_states.values()) + + +def test_run_deploy_flow_stops_on_verify_failure(tmp_path: Path): + fake = FakeActionRunner( + { + "verify-ip:192.168.1.10": { + "ACTION": "verify-ip", + "IP": "192.168.1.10", + "SUCCESS": "false", + "MESSAGE": "health check failed", + } + } + ) + agent = PamDeployAgent(fake_runner=fake) + state = agent.create_state( + params=PARAMS, + execution_strategy="fake", + config_path=str(tmp_path / "config.txt"), + ) + + agent.run_deploy_flow(state) + + assert state.pending_confirmation == "rollback-ip:192.168.1.10" + assert state.ip_states["192.168.1.10"]["status"] == "FAILED" + assert state.ip_states["192.168.1.10"]["rollback_status"] == "PENDING_AGENT_CONFIRMATION" + assert "192.168.1.11" not in state.ip_states + assert any(event["type"] == "CONFIRMATION_REQUIRED" for event in state.events) diff --git a/tests/test_graph.py b/tests/test_graph.py new file mode 100644 index 0000000..c66881c --- /dev/null +++ b/tests/test_graph.py @@ -0,0 +1,37 @@ +import importlib.util + +import pytest + +from pam_deploy_graph.graph import build_graph_or_none, build_langgraph +from pam_deploy_graph.params_loader import load_params_file + + +def test_build_graph_or_none_without_langgraph_is_safe(): + graph = build_graph_or_none() + if importlib.util.find_spec("langgraph"): + assert graph is not None + else: + assert graph is None + + +def test_build_langgraph_error_without_dependency_is_clear(): + if importlib.util.find_spec("langgraph"): + pytest.skip("langgraph installed") + with pytest.raises(RuntimeError, match="langgraph is not installed"): + build_langgraph() + + +def test_langgraph_invokes_global_flow_when_installed(tmp_path): + if not importlib.util.find_spec("langgraph"): + pytest.skip("langgraph not installed") + graph = build_langgraph(flow="global") + result = graph.invoke( + { + "params": load_params_file("doc_scripts/config.txt.example"), + "execution_strategy": "fake", + "config_path": str(tmp_path / "config.txt"), + } + ) + state = result["agent_state"] + assert state.completed_global_steps[-1] == "poll-download-progress" + assert state.action_backends["get-online-ips"] == "fake" diff --git a/tests/test_llm_structured.py b/tests/test_llm_structured.py new file mode 100644 index 0000000..7cb96e8 --- /dev/null +++ b/tests/test_llm_structured.py @@ -0,0 +1,73 @@ +from dataclasses import asdict + +from pam_deploy_graph.agent import PamDeployAgent +from pam_deploy_graph.checkpoint_store import redact_mapping +from pam_deploy_graph.llm.rule_based import RuleBasedLlmClient +from pam_deploy_graph.llm.validators import validate_deploy_plan +from pam_deploy_graph.models import LlmDeployPlan + + +def test_understand_request_prefers_hybrid_for_mcp(): + result = RuleBasedLlmClient().understand_request("请用 MCP 部署 HET") + assert result.intent == "deploy" + assert result.mode_preference == "MCP" + assert result.strategy_preference == "hybrid_node_mcp" + + +def test_extract_params_from_key_value_text(): + result = RuleBasedLlmClient().extract_params( + "HOME_BASE_URL=https://x CLIENT_ID=id CLIENT_SECRET=s AIRPORT_CODE=HET " + "APP_NAME=PAM MODULE_NAME=Node VERSION_NUMBER=2.0.5 ZIP_FILE_PATH=C:/pkg.zip" + ) + assert result.extracted_params["AIRPORT_CODE"] == "HET" + assert result.missing_required_params == [] + assert "CLIENT_SECRET" in result.sensitive_fields_present + + +def test_analyze_request_returns_structured_objects(): + agent = PamDeployAgent() + result = agent.analyze_request( + "不要动环境,预演部署", + { + "HOME_BASE_URL": "https://x", + "CLIENT_ID": "id", + "CLIENT_SECRET": "s", + "AIRPORT_CODE": "HET", + "APP_NAME": "PAM", + "MODULE_NAME": "Node", + "VERSION_NUMBER": "2.0.5", + "ZIP_FILE_PATH": "C:/pkg.zip", + }, + ) + payload = {key: asdict(value) for key, value in result.items()} + assert payload["intent"]["intent"] == "preview" + assert payload["plan"]["execution_strategy"] == "hybrid_node_mcp" + + +def test_analyze_payload_can_be_redacted(): + agent = PamDeployAgent() + result = agent.analyze_request( + "帮我部署", + { + "HOME_BASE_URL": "https://x", + "CLIENT_ID": "id", + "CLIENT_SECRET": "super-secret", + "AIRPORT_CODE": "HET", + "APP_NAME": "PAM", + "MODULE_NAME": "Node", + "VERSION_NUMBER": "2.0.5", + "ZIP_FILE_PATH": "C:/pkg.zip", + }, + ) + payload = redact_mapping({key: asdict(value) for key, value in result.items()}) + assert payload["params"]["extracted_params"]["CLIENT_SECRET"] == "***" + + +def test_plan_guardrails_reject_executable_text(): + plan = LlmDeployPlan(summary="run bash ./deploy.sh", planned_actions=["get-token"]) + try: + validate_deploy_plan(plan) + except ValueError as exc: + assert "forbidden" in str(exc) + else: + raise AssertionError("expected guardrail failure") diff --git a/tests/test_mcp_client.py b/tests/test_mcp_client.py new file mode 100644 index 0000000..7a9f209 --- /dev/null +++ b/tests/test_mcp_client.py @@ -0,0 +1,28 @@ +from pam_deploy_graph.mcp_client import ( + FunctionMcpToolClient, + SessionMcpToolClient, + normalize_mcp_sdk_result, +) + + +def test_function_mcp_client_wraps_callable(): + client = FunctionMcpToolClient(lambda name, args: {"tool": name, "args": args}) + assert client.call_tool("pam_get_online_ips", {"airportCode": "HET"})["tool"] == "pam_get_online_ips" + + +def test_normalize_mcp_sdk_result_structured_content(): + result = type("Result", (), {"structuredContent": {"ok": True}})() + assert normalize_mcp_sdk_result(result) == {"ok": True} + + +def test_session_mcp_client_normalizes_text_json_content(): + content = [type("Text", (), {"text": '{"ok": true}'})()] + result = type("Result", (), {"content": content})() + + class Session: + def call_tool(self, tool_name, arguments): + return result + + client = SessionMcpToolClient(Session()) + assert client.call_tool("tool", {}) == {"ok": True} +