agent_deply/pam_deploy_graph/action_router.py
dark d3f5c82d98 feat: 补充 Agent 运行日志并增加 LLM 测试命令
- 新增统一日志工具,支持日志文件路径和级别配置
- 记录 CLI/chat、Agent、LLM、action、MCP、LangGraph、checkpoint 等关键流程
- 对日志中的 token、secret、api_key、Authorization 等敏感信息做脱敏
- chat 新增 llm test 命令,用于验证当前 LLM client 是否正常加载
- 同步 README、打包文档和 run.sh 帮助说明
- 补充日志脱敏和 llm test 相关测试
2026-06-04 10:51:59 +08:00

80 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""按照执行策略把 action 路由到脚本、MCP 或 fake runner。"""
from __future__ import annotations
import logging
from .constants import ALLOWED_ACTIONS, HOME_ACTIONS, NODE_ACTIONS
from .logging_utils import json_for_log
from .models import AgentState, BackendName, ExecutionStrategy, ActionResult
logger = logging.getLogger(__name__)
def build_action_backends(strategy: ExecutionStrategy) -> dict[str, BackendName]:
"""根据执行策略生成每个 action 对应的后端类型。"""
if strategy == "fake":
return {action: "fake" for action in ALLOWED_ACTIONS}
if strategy == "script_only":
return {action: "script" for action in ALLOWED_ACTIONS}
if strategy == "hybrid_node_mcp":
routes: dict[str, BackendName] = {action: "script" for action in HOME_ACTIONS}
routes.update({action: "mcp" for action in NODE_ACTIONS})
return routes
raise ValueError(f"未知执行策略: {strategy}")
class ActionRouter:
"""统一的 action 调度器屏蔽脚本、MCP 和 fake 后端差异。"""
def __init__(self, *, script_runner, mcp_runner=None, fake_runner=None) -> None:
"""保存各类 runner运行时按 state 中的路由表选择后端。"""
self.script_runner = script_runner
self.mcp_runner = mcp_runner
self.fake_runner = fake_runner
def run_action(self, state: AgentState, action: str, **kwargs) -> ActionResult:
"""执行一个 action并返回统一的 ActionResult。"""
backend = state.action_backends.get(action)
if not backend:
raise ValueError(f"action 未配置路由: {action}")
logger.info(
"ActionRouter 路由 action run_id=%s action=%s backend=%s kwargs=%s",
state.run_id,
action,
backend,
json_for_log(kwargs),
)
if backend == "script":
return self.script_runner.run(
action,
params=state.params,
script_entry=state.script_entry,
config_path=state.config_path,
trace_file_path=state.trace_file_path,
**kwargs,
)
if backend == "mcp":
if self.mcp_runner is None:
raise RuntimeError(f"action 需要 MCP runner: {action}")
mcp_kwargs = dict(kwargs)
hash_code = mcp_kwargs.pop("hash_code", None) or state.hash_code
node_url = mcp_kwargs.pop("node_url", None) or state.node_url
logger.info(
"ActionRouter 调用 MCP action run_id=%s action=%s hash_code_present=%s node_url_present=%s",
state.run_id,
action,
bool(hash_code),
bool(node_url),
)
return self.mcp_runner.run(
action,
params=state.params,
hash_code=hash_code,
node_url=node_url,
**mcp_kwargs,
)
if self.fake_runner is None:
raise RuntimeError(f"action 需要 fake runner: {action}")
return self.fake_runner.run(action, params=state.params, **kwargs)