"""PAM_NODE MCP 工具的 action runner 封装。""" from __future__ import annotations from typing import Any, Protocol from .models import ActionResult from .output_parser import parse_mcp_result class McpToolClient(Protocol): """MCP 工具客户端需要实现的最小同步接口。""" def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: """调用指定 MCP tool,并返回工具原始输出。""" ... def list_tools(self) -> list[str]: """返回 MCP server 暴露的 tool 名称列表。""" ... DEFAULT_NODE_MCP_TOOLS = { "get-online-ips": "pam_get_online_ips", "create-download-task": "pam_create_download_task", "poll-download-progress": "pam_poll_download_progress", "upgrade-ip": "pam_upgrade_ip", "poll-upgrade-progress": "pam_poll_upgrade_progress", "start-ip": "pam_start_ip", "stop-ip": "pam_stop_ip", "verify-ip": "pam_verify_ip", "download-log": "pam_download_log", "rollback-ip": "pam_rollback_ip", } class McpActionRunner: """把 Agent action 转换为 MCP tool 调用。""" def __init__( self, client: McpToolClient | None = None, tool_names: dict[str, str] | None = None, ) -> None: """保存 MCP client 和 action 到 tool name 的映射。""" self.client = client self.tool_names = tool_names or {} self._discovered_tools: list[str] | None = None def run( self, action: str, *, params: dict[str, Any], ip: str | None = None, hash_code: str | None = None, stop_first: bool = False, **_: Any, ) -> ActionResult: """执行一个 PAM_NODE action,并归一化为 ActionResult。""" if self.client is None: raise RuntimeError("尚未配置 MCP client") tool_name = self._resolve_tool_name(action) arguments = self._build_arguments( action, params=params, ip=ip, hash_code=hash_code, stop_first=stop_first, ) try: payload = self.client.call_tool(tool_name, arguments) except Exception as exc: # pragma: no cover - 防御性异常包装 return parse_mcp_result(action, {}, ok=False, tool_name=tool_name, error=str(exc)) return parse_mcp_result(action, payload, ok=True, tool_name=tool_name) def _resolve_tool_name(self, action: str) -> str: """根据显式映射、server tools 自动发现和默认约定解析 tool name。""" explicit = self.tool_names.get(action) if explicit: return explicit discovered = self._list_discovered_tools() if discovered: candidates = _tool_name_candidates(action) by_lower = {name.lower(): name for name in discovered} for candidate in candidates: matched = by_lower.get(candidate.lower()) if matched: return matched available = ", ".join(discovered) raise ValueError(f"MCP server 未发现 action 对应 tool: {action}; 已发现: {available}") fallback = DEFAULT_NODE_MCP_TOOLS.get(action) if fallback: return fallback raise ValueError(f"action 未映射 MCP tool: {action}") def _list_discovered_tools(self) -> list[str]: """读取并缓存 MCP server 暴露的 tool 名称。""" if self._discovered_tools is not None: return self._discovered_tools if self.client is None or not hasattr(self.client, "list_tools"): self._discovered_tools = [] return self._discovered_tools try: self._discovered_tools = list(self.client.list_tools()) except Exception: self._discovered_tools = [] return self._discovered_tools def _build_arguments( self, action: str, *, params: dict[str, Any], ip: str | None, hash_code: str | None, stop_first: bool, ) -> dict[str, Any]: """把 Agent 参数转换为 MCP tool 所需的入参。""" arguments = { "homeBaseUrl": params.get("HOME_BASE_URL"), "airportCode": params.get("AIRPORT_CODE"), "applicationName": params.get("APP_NAME"), "moduleName": params.get("MODULE_NAME"), "versionNumber": params.get("VERSION_NUMBER"), "actionType": params.get("ACTION_TYPE"), "timeOut": params.get("TIMEOUT"), "logName": params.get("LOG_NAME"), } if ip: arguments["targetIp"] = ip if hash_code: arguments["hashCode"] = hash_code if action == "rollback-ip": arguments["stopFirst"] = stop_first return {key: value for key, value in arguments.items() if value not in (None, "")} def _tool_name_candidates(action: str) -> list[str]: """生成 action 自动匹配 MCP tool 的候选名称。""" snake = action.replace("-", "_") return [ action, snake, f"pam_{snake}", f"pam_node_{snake}", f"pam.node.{snake}", f"pam-node.{action}", ]