agent_deply/pam_deploy_graph/mcp_factory.py
dark 05ece1bffc feat: 标准化 LangGraph 运行链路并完善 MCP 接入
- 将 CLI/chat 部署执行切换为 action 级 LangGraph runtime
- 接入 LangGraph interrupt/checkpointer 处理人工确认与恢复
- 保留业务 checkpoint JSON 用于跨进程断点续跑
- 增加 MCP HTTP/SSE server_url 配置支持
- 增加 MCP 独立 OAuth token 鉴权,复用 HOME 的 client_credentials 方式
- 支持从 MCP server list_tools 自动发现 tools,action_tools 仅作为可选覆盖
- 更新 MCP 配置示例、README、打包说明和整体流程图
- 补充 MCP 配置、鉴权和 tool 自动发现测试
2026-06-02 10:44:42 +08:00

49 lines
1.6 KiB
Python

"""根据配置文件构造 MCP runner。"""
from __future__ import annotations
from pathlib import Path
from .mcp_client import (
HttpMcpToolClient,
McpClientConfig,
OAuthTokenProvider,
StdioMcpToolClient,
load_mcp_client_config,
)
from .mcp_runner import McpActionRunner
def build_mcp_runner_from_config(path: str | Path) -> McpActionRunner:
"""读取 MCP 配置文件,并构造可直接给 Agent 使用的 runner。"""
config = load_mcp_client_config(path)
client = build_mcp_client(config)
return McpActionRunner(client=client, tool_names=config.tool_names or None)
def build_mcp_client(config: McpClientConfig):
"""根据 transport 类型创建 MCP client。"""
if config.transport == "stdio":
return StdioMcpToolClient(
command=config.command,
args=config.args,
env=config.env,
cwd=config.cwd or None,
timeout_seconds=config.timeout_seconds,
)
if config.transport in ("streamable_http", "sse"):
auth_provider = (
OAuthTokenProvider(config.auth, timeout_seconds=config.timeout_seconds)
if config.auth is not None
else None
)
return HttpMcpToolClient(
url=config.server_url,
transport=config.transport,
headers=config.headers,
auth_provider=auth_provider,
timeout_seconds=config.timeout_seconds,
sse_read_timeout_seconds=config.sse_read_timeout_seconds,
)
raise ValueError(f"不支持的 MCP transport: {config.transport}")