From d3f5c82d984132924cd7e601203d6669fffc24c0 Mon Sep 17 00:00:00 2001 From: dark Date: Thu, 4 Jun 2026 10:51:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=A1=A5=E5=85=85=20Agent=20=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E6=97=A5=E5=BF=97=E5=B9=B6=E5=A2=9E=E5=8A=A0=20LLM=20?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E5=91=BD=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增统一日志工具,支持日志文件路径和级别配置 - 记录 CLI/chat、Agent、LLM、action、MCP、LangGraph、checkpoint 等关键流程 - 对日志中的 token、secret、api_key、Authorization 等敏感信息做脱敏 - chat 新增 llm test 命令,用于验证当前 LLM client 是否正常加载 - 同步 README、打包文档和 run.sh 帮助说明 - 补充日志脱敏和 llm test 相关测试 --- README.md | 26 ++- packaging/README_linux_package.md | 3 + packaging/README_packaged_agent.md | 18 ++ packaging/build_linux_self_contained.sh | 21 +- pam_deploy_graph/action_router.py | 19 ++ pam_deploy_graph/agent.py | 226 +++++++++++++++++++++- pam_deploy_graph/cli.py | 18 ++ pam_deploy_graph/constants.py | 6 + pam_deploy_graph/interactive.py | 170 +++++++++++++++- pam_deploy_graph/langgraph_runtime.py | 53 ++++- pam_deploy_graph/llm/factory.py | 25 ++- pam_deploy_graph/llm/openai_compatible.py | 71 ++++++- pam_deploy_graph/llm/rule_based.py | 41 +++- pam_deploy_graph/logging_utils.py | 116 +++++++++++ pam_deploy_graph/mcp_client.py | 120 +++++++++++- pam_deploy_graph/mcp_factory.py | 24 ++- pam_deploy_graph/mcp_runner.py | 33 +++- pam_deploy_graph/script_runner.py | 53 ++++- tests/test_interactive_cli.py | 44 ++++- tests/test_logging_utils.py | 28 +++ 20 files changed, 1066 insertions(+), 49 deletions(-) create mode 100644 pam_deploy_graph/logging_utils.py create mode 100644 tests/test_logging_utils.py diff --git a/README.md b/README.md index 831f268..e286086 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,9 @@ packaging/ - chat 支持执行中按 `Ctrl+C` 中断,保存 checkpoint 后再 `resume`。 - chat 支持 `set KEY=VALUE` 和 `load params <路径>` 热更新当前运行参数,并同步回写运行中的 `config.txt` 与 checkpoint。 - 支持通过 `--llm-action-analysis-prompt-file`、`PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。 -- 添加基础测试,当前本地结果为 `57 passed, 2 skipped`。 +- 增加统一运行日志,默认写入 `logs/pam_deploy_agent.log`,覆盖 CLI/chat、LLM 调用、action 路由、脚本/MCP 调用、LangGraph、checkpoint 等关键流程。 +- chat 支持 `llm test [文本]`,可用当前 LLM client 做一次轻量调用,确认真实 LLM 或规则 fallback 是否正常加载。 +- 添加基础测试,当前本地结果为 `59 passed, 2 skipped`。 未完成: @@ -133,6 +135,12 @@ python -m pam_deploy_graph.cli analyze \ 真实 LLM 调用位置在 `pam_deploy_graph/llm/openai_compatible.py`,提示词在 `pam_deploy_graph/llm/prompts.py`。发送给 LLM 的 `base_params` 会脱敏,`CLIENT_SECRET` 不会进入 prompt;本地生成计划后仍会执行 guardrails 校验。 +chat 内可以用当前 client 做一次轻量测试,确认真实 LLM 或规则 fallback 是否正常加载: + +```text +PAM> llm test 请返回一次连通性测试结果 +``` + 如果服务需要鉴权,再补充: ```bash @@ -280,6 +288,7 @@ PAM> run PAM> status PAM> params PAM> events 5 +PAM> llm test PAM> llm action-analysis on PAM> llm config action_analysis_prompt_file=prompts/action_review.txt PAM> mcp config mcp_client.example.json @@ -290,7 +299,20 @@ PAM> resume PAM> exit ``` -`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。每个 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;如果审核建议停止或审核本身失败,流程会暂停并输出建议,等待用户决定是否 `resume`。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断,chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted`。`set KEY=VALUE` 和 `load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file`、`--mcp-config` 和 `--analyze-actions`。 +`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。每个 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;如果审核建议停止或审核本身失败,流程会暂停并输出建议,等待用户决定是否 `resume`。`llm test [文本]` 可测试当前 LLM client 是否可用。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断,chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted`。`set KEY=VALUE` 和 `load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file`、`--mcp-config` 和 `--analyze-actions`。 + +## 日志 + +Agent 默认写入运行日志到 `logs/pam_deploy_agent.log`。日志覆盖 CLI/chat 输入、LLM 请求和响应摘要、action 路由、脚本/MCP 调用、LangGraph 节点、checkpoint 保存、暂停/续跑等关键流程。日志会递归脱敏 `CLIENT_SECRET`、`MCP_CLIENT_SECRET`、token、Authorization、api_key、password 等字段,并截断长文本。 + +可通过环境变量调整日志位置和级别: + +```bash +export PAM_AGENT_LOG_FILE=logs/pam_deploy_agent.log +export PAM_AGENT_LOG_LEVEL=INFO +``` + +调试 LLM 或 MCP 调用时可临时设为 `DEBUG`,但仍建议把日志目录放在受控位置。 预演: diff --git a/packaging/README_linux_package.md b/packaging/README_linux_package.md index 6a56c92..fcc6ea6 100644 --- a/packaging/README_linux_package.md +++ b/packaging/README_linux_package.md @@ -75,6 +75,9 @@ cd pam-deploy-agent-linux-x86_64 - chat 支持执行中 `Ctrl+C` 中断后保存 checkpoint,再通过 `resume` 继续。 - chat 支持 `set KEY=VALUE` 和 `load params <路径>` 热更新当前运行任务参数。 - 支持通过 `--llm-action-analysis-prompt-file` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。 +- chat 支持 `llm test [文本]` 测试当前 LLM client 是否正常加载。 +- 默认运行日志写入 `logs/pam_deploy_agent.log`,可通过 `PAM_AGENT_LOG_FILE` 和 `PAM_AGENT_LOG_LEVEL` 调整。 +- 日志会脱敏 token、secret、api_key、Authorization 等字段;checkpoint 仍保存完整运行参数,请放在受控目录。 ## 包大小评估 diff --git a/packaging/README_packaged_agent.md b/packaging/README_packaged_agent.md index 630acf6..810a2d7 100644 --- a/packaging/README_packaged_agent.md +++ b/packaging/README_packaged_agent.md @@ -71,6 +71,7 @@ PAM> run PAM> status PAM> params PAM> events 5 +PAM> llm test PAM> llm action-analysis on PAM> llm config action_analysis_prompt_file=prompts/action_review.txt PAM> mcp config mcp_client.example.json @@ -176,10 +177,26 @@ chat 内也可以热加载 LLM: ```text PAM> llm config base_url=https://your-llm.example.com/v1 api_key=your-api-key model=your-model-name PAM> llm config action_analysis_prompt_file=prompts/action_review.txt +PAM> llm test 请返回一次连通性测试结果 PAM> llm action-analysis on PAM> llm fallback ``` +`llm test [文本]` 会使用当前 LLM client 做一次轻量意图识别调用,并输出 client 类型、intent、strategy 和 confidence,便于确认真实 LLM 或规则 fallback 是否正常加载。 + +## 日志 + +Agent 默认写入运行日志到 `logs/pam_deploy_agent.log`。日志覆盖 chat/CLI 输入、LLM 请求和响应摘要、action 路由、脚本/MCP 调用、LangGraph 节点、checkpoint 保存、暂停/续跑等关键流程。 + +可通过环境变量调整日志位置和级别: + +```bash +export PAM_AGENT_LOG_FILE=logs/pam_deploy_agent.log +export PAM_AGENT_LOG_LEVEL=INFO +``` + +日志会递归脱敏 `CLIENT_SECRET`、`MCP_CLIENT_SECRET`、token、Authorization、api_key、password 等字段,并截断长文本。checkpoint 仍会保存完整运行参数,请放在受控目录。 + ## 策略说明 - `fake`:全部使用 fake runner,不访问真实环境。 @@ -221,6 +238,7 @@ MCP token 获取方式与 HOME 一致,默认按 `client_credentials` POST 到 - 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL`、`CLIENT_ID`、`CLIENT_SECRET`、`AIRPORT_CODE`、`APP_NAME`、`MODULE_NAME`、`VERSION_NUMBER`、`ZIP_FILE_PATH`。 - `chat` 中输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时请直接描述部署任务,或显式使用 `analyze <需求>`。 - 每个 action 完成后都会自动执行一次 LLM/规则审核;`--analyze-actions` 和 `llm action-analysis on` 只控制是否把详细审核结果写入 `events`。 +- `llm test [文本]` 可测试当前 LLM client 是否可用。 - 如果审核建议停止、审核本身失败,或用户在执行中按下 `Ctrl+C`,流程都会保存 checkpoint 并进入暂停状态;后续可使用 `resume` 继续。 - `set KEY=VALUE` 和 `load params <路径>` 会热更新当前运行任务的参数,并回写运行中的 `config.txt` 和 checkpoint。 - `checkpoint` 会保存完整运行参数,请放在受控目录。 diff --git a/packaging/build_linux_self_contained.sh b/packaging/build_linux_self_contained.sh index 83da61e..1db16fb 100644 --- a/packaging/build_linux_self_contained.sh +++ b/packaging/build_linux_self_contained.sh @@ -127,8 +127,8 @@ PAM 部署 Agent 解压即用包 chat 模式会在会话中要求输入 run,并分别确认参数、目标范围和最终执行。 --analyze-actions - 每个 action 完成后追加 LLM/规则诊断建议。诊断只作为辅助建议, - 不会自动决定继续、回滚或修改参数。 + 每个 action 完成后的 LLM/规则审核默认都会执行;该参数只控制 + 是否把详细审核结果写入 events。审核建议停止时流程会暂停。 LLM 参数: --llm-base-url @@ -140,10 +140,22 @@ LLM 参数: --llm-model <模型名> LLM 模型名称。也可通过环境变量 PAM_LLM_MODEL 提供。 + --llm-action-analysis-prompt-file <路径> + 自定义 action 审核提示词文件。打包内置基线: + prompts/action_review.txt + LLM 环境变量: PAM_LLM_BASE_URL PAM_LLM_API_KEY PAM_LLM_MODEL + PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE + +日志环境变量: + PAM_AGENT_LOG_FILE + 运行日志路径,默认 logs/pam_deploy_agent.log。 + + PAM_AGENT_LOG_LEVEL + 日志级别,默认 INFO。排查 LLM/MCP 时可临时设为 DEBUG。 示例: ./run.sh chat --config doc_scripts/config.txt.example --strategy fake --checkpoint runtime/checkpoints/demo.json @@ -170,8 +182,9 @@ LLM 环境变量: 5. confirm 会通过 LangGraph interrupt resume 处理确认,并继续后续图节点;进程中断时再使用 resume。 6. chat 会在执行前归一化并展示实际写入脚本配置的参数;script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。 7. chat 执行过程中会播报每个 action 的开始、完成或失败;普通问候不会触发 LLM/结构化分析。 - 8. chat 内可使用 params、events、list checkpoints、load checkpoint、load params、llm config、mcp config 等命令。 - 9. checkpoint 会保存完整运行参数,请放在受控目录。 + 8. chat 内可使用 params、events、list checkpoints、load checkpoint、load params、llm config、llm test、mcp config 等命令。 + 9. 日志默认写入 logs/pam_deploy_agent.log,并会脱敏 token、secret、api_key、Authorization 等字段。 + 10. checkpoint 会保存完整运行参数,请放在受控目录。 HELP_TEXT } diff --git a/pam_deploy_graph/action_router.py b/pam_deploy_graph/action_router.py index 48622cb..11afd35 100644 --- a/pam_deploy_graph/action_router.py +++ b/pam_deploy_graph/action_router.py @@ -2,9 +2,14 @@ from __future__ import annotations +import logging + from .constants import ALLOWED_ACTIONS, HOME_ACTIONS, NODE_ACTIONS +from .logging_utils import json_for_log from .models import AgentState, BackendName, ExecutionStrategy, ActionResult +logger = logging.getLogger(__name__) + def build_action_backends(strategy: ExecutionStrategy) -> dict[str, BackendName]: """根据执行策略生成每个 action 对应的后端类型。""" @@ -33,6 +38,13 @@ class ActionRouter: backend = state.action_backends.get(action) if not backend: raise ValueError(f"action 未配置路由: {action}") + logger.info( + "ActionRouter 路由 action run_id=%s action=%s backend=%s kwargs=%s", + state.run_id, + action, + backend, + json_for_log(kwargs), + ) if backend == "script": return self.script_runner.run( action, @@ -48,6 +60,13 @@ class ActionRouter: mcp_kwargs = dict(kwargs) hash_code = mcp_kwargs.pop("hash_code", None) or state.hash_code node_url = mcp_kwargs.pop("node_url", None) or state.node_url + logger.info( + "ActionRouter 调用 MCP action run_id=%s action=%s hash_code_present=%s node_url_present=%s", + state.run_id, + action, + bool(hash_code), + bool(node_url), + ) return self.mcp_runner.run( action, params=state.params, diff --git a/pam_deploy_graph/agent.py b/pam_deploy_graph/agent.py index 4684bfa..d7cabfa 100644 --- a/pam_deploy_graph/agent.py +++ b/pam_deploy_graph/agent.py @@ -8,6 +8,7 @@ from __future__ import annotations import re import time +import logging from dataclasses import asdict from pathlib import Path from typing import Any, Callable @@ -18,11 +19,14 @@ from .config_writer import write_config from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, REQUIRED_PARAMS from .fake_runner import FakeActionRunner from .llm import LlmClient, RuleBasedLlmClient, validate_deploy_plan, validate_intent_result +from .logging_utils import configure_logging, json_for_log from .mcp_runner import McpActionRunner from .models import ActionResult, AgentState, ExecutionStrategy, LlmActionAnalysis, LlmDeployPlan, LlmIntentResult, LlmParamResult from .script_runner import ScriptActionRunner, select_script_entry from .skill_policy import load_skill_policy +logger = logging.getLogger(__name__) + REQUIRED_ACTION_VALUES = { "upload-package": ("HASH_CODE",), "get-node-url": ("NODE_URL",), @@ -44,6 +48,7 @@ class PamDeployAgent: progress_callback: Callable[[dict[str, Any]], None] | None = None, ) -> None: """初始化策略、脚本 runner、MCP runner、fake runner 和 LLM client。""" + self.log_path = configure_logging() self.skill_policy = load_skill_policy(skill_path) self.script_base_dir = Path(script_base_dir) self.script_runner = ScriptActionRunner(self.script_base_dir) @@ -57,16 +62,42 @@ class PamDeployAgent: mcp_runner=mcp_runner, fake_runner=self.fake_runner, ) + logger.info( + "Agent 初始化完成 skill=%s script_base_dir=%s llm_client=%s mcp_runner=%s action_analysis_events=%s", + skill_path, + self.script_base_dir, + type(self.llm_client).__name__, + type(self.mcp_runner).__name__ if self.mcp_runner else "", + self.action_analysis_enabled, + ) def understand_request(self, text: str) -> LlmIntentResult: """调用 LLM 识别用户意图,并执行基础校验。""" - result = self.llm_client.understand_request(text) + logger.info("LLM 意图识别开始 client=%s text_len=%s", type(self.llm_client).__name__, len(text)) + try: + result = self.llm_client.understand_request(text) + except Exception: + logger.exception("LLM 意图识别失败 client=%s", type(self.llm_client).__name__) + raise validate_intent_result(result) + logger.info("LLM 意图识别完成 result=%s", json_for_log(asdict(result))) return result def extract_params(self, text: str, base_params: dict[str, Any] | None = None) -> LlmParamResult: """从自然语言中抽取部署参数和控制参数。""" - return self.llm_client.extract_params(text, base_params) + logger.info( + "LLM 参数抽取开始 client=%s text_len=%s base_params=%s", + type(self.llm_client).__name__, + len(text), + json_for_log(base_params or {}), + ) + try: + result = self.llm_client.extract_params(text, base_params) + except Exception: + logger.exception("LLM 参数抽取失败 client=%s", type(self.llm_client).__name__) + raise + logger.info("LLM 参数抽取完成 result=%s", json_for_log(asdict(result))) + return result def generate_plan( self, @@ -76,12 +107,25 @@ class PamDeployAgent: strategy: ExecutionStrategy, ) -> LlmDeployPlan: """根据参数、意图和执行策略生成部署计划。""" - plan = self.llm_client.generate_plan(params=params, intent=intent, strategy=strategy) + logger.info( + "LLM 计划生成开始 client=%s intent=%s strategy=%s params=%s", + type(self.llm_client).__name__, + intent, + strategy, + json_for_log(params), + ) + try: + plan = self.llm_client.generate_plan(params=params, intent=intent, strategy=strategy) + except Exception: + logger.exception("LLM 计划生成失败 client=%s intent=%s strategy=%s", type(self.llm_client).__name__, intent, strategy) + raise validate_deploy_plan(plan) + logger.info("LLM 计划生成完成 result=%s", json_for_log(asdict(plan))) return plan def analyze_request(self, text: str, base_params: dict[str, Any] | None = None) -> dict[str, Any]: """完成意图识别、参数抽取和计划生成,供 analyze/chat 使用。""" + logger.info("需求分析开始 text_len=%s base_params=%s", len(text), json_for_log(base_params or {})) intent = self.understand_request(text) params = self.extract_params(text, base_params) strategy = self._choose_strategy(intent.strategy_preference) @@ -90,11 +134,19 @@ class PamDeployAgent: intent=intent.intent, strategy=strategy, ) - return { + result = { "intent": intent, "params": params, "plan": plan, } + logger.info( + "需求分析完成 intent=%s strategy=%s missing=%s plan_actions=%s", + intent.intent, + strategy, + params.missing_required_params, + plan.planned_actions, + ) + return result def normalize_params(self, params: dict[str, Any]) -> dict[str, Any]: """合并默认参数并校验必填参数是否齐全。""" @@ -124,6 +176,13 @@ class PamDeployAgent: target_ips: list[str] | None = None, ) -> AgentState: """创建一次运行所需的 AgentState,并写入脚本配置文件。""" + logger.info( + "创建 AgentState 开始 strategy=%s checkpoint=%s target_ips=%s params=%s", + execution_strategy, + checkpoint_path or "", + target_ips or [], + json_for_log(params), + ) normalized = self.normalize_params(params) actual_run_id = run_id or time.strftime("%Y%m%d_%H%M%S") actual_script_entry = script_entry or select_script_entry() @@ -131,7 +190,7 @@ class PamDeployAgent: actual_config_path = _absolute_path(config_path or runtime_dir / f"config_{actual_run_id}.txt") actual_trace_path = _absolute_path(trace_file_path or Path("logs") / f"api_trace_{actual_run_id}.log") write_config(normalized, actual_config_path) - return AgentState( + state = AgentState( run_id=actual_run_id, params=normalized, execution_strategy=execution_strategy, @@ -143,6 +202,15 @@ class PamDeployAgent: checkpoint_path=checkpoint_path or "", target_ips=target_ips or [], ) + logger.info( + "创建 AgentState 完成 run_id=%s config=%s trace=%s script_entry=%s backends=%s", + state.run_id, + state.config_path, + state.trace_file_path, + state.script_entry, + json_for_log(state.action_backends), + ) + return state def pause_state( self, @@ -152,6 +220,13 @@ class PamDeployAgent: review_context: dict[str, Any] | None = None, ) -> AgentState: """将当前 state 标记为暂停,并持久化 checkpoint。""" + logger.info( + "暂停 state run_id=%s reason=%s checkpoint=%s context=%s", + state.run_id, + reason, + state.checkpoint_path, + json_for_log(review_context or {}), + ) state.paused = True state.pause_reason = reason state.review_context = dict(review_context or {}) @@ -160,6 +235,7 @@ class PamDeployAgent: def resume_state(self, state: AgentState) -> AgentState: """清理暂停标记,允许后续继续执行。""" + logger.info("恢复 state run_id=%s previous_reason=%s checkpoint=%s", state.run_id, state.pause_reason, state.checkpoint_path) state.paused = False state.pause_reason = "" state.review_context = {} @@ -168,12 +244,14 @@ class PamDeployAgent: def update_state_params(self, state: AgentState, updates: dict[str, Any]) -> AgentState: """热更新 state 中的参数,并回写 config 文件。""" + logger.info("热更新 state 参数开始 run_id=%s updates=%s", state.run_id, json_for_log(updates)) merged = {**state.params, **updates} normalized = self.normalize_params(merged) state.params = normalized if state.config_path: write_config(normalized, state.config_path) self._save_checkpoint(state) + logger.info("热更新 state 参数完成 run_id=%s config=%s params=%s", state.run_id, state.config_path, json_for_log(state.params)) return state def preview(self, params: dict[str, Any], strategy: ExecutionStrategy = "hybrid_node_mcp") -> str: @@ -209,12 +287,19 @@ class PamDeployAgent: def run_global_flow(self, state: AgentState) -> AgentState: """执行全局部署阶段,并跳过 checkpoint 中已完成的步骤。""" + logger.info( + "全局流程开始 run_id=%s paused=%s completed=%s", + state.run_id, + state.paused, + state.completed_global_steps, + ) if state.paused: self._save_checkpoint(state) return state while True: action = self.next_global_action(state) if action is None: + logger.info("全局流程完成 run_id=%s completed=%s", state.run_id, state.completed_global_steps) return state self.run_global_action(state, action) @@ -231,6 +316,7 @@ class PamDeployAgent: def run_global_action(self, state: AgentState, action: str) -> AgentState: """执行一个全局 action,并把结果写回 AgentState。""" if action in state.completed_global_steps: + logger.info("跳过已完成全局 action run_id=%s action=%s", state.run_id, action) return state kwargs: dict[str, Any] = {} if action == "publish-version": @@ -240,16 +326,25 @@ class PamDeployAgent: raise RuntimeError("publish-version 缺少 HASH_CODE,请确认 upload-package 是否成功返回 HASH_CODE") kwargs["hash_code"] = state.hash_code backend = state.action_backends.get(action, "script") + logger.info( + "全局 action 开始 run_id=%s action=%s backend=%s kwargs=%s", + state.run_id, + action, + backend, + json_for_log(kwargs), + ) self._emit_progress({"type": "ACTION_START", "stage": action, "backend": backend}) try: result = self.router.run_action(state, action, **kwargs) except Exception as exc: + logger.exception("全局 action 调用异常 run_id=%s action=%s backend=%s", state.run_id, action, backend) result = ActionResult( action=action, backend=backend, ok=False, error_summary=str(exc), ) + logger.info("全局 action 返回 run_id=%s result=%s", state.run_id, _action_result_for_log(result)) state.events.append( { "type": "ACTION_DONE" if result.ok else "ACTION_FAIL", @@ -318,8 +413,10 @@ class PamDeployAgent: reason="llm_review_blocked", review_context=self._review_context(action=action, analysis=analysis, result=result), ) + logger.info("全局 action 被 LLM 审核拦截 run_id=%s action=%s analysis=%s", state.run_id, action, json_for_log(asdict(analysis))) return state self._save_checkpoint(state) + logger.info("全局 action 完成 run_id=%s action=%s completed=%s", state.run_id, action, state.completed_global_steps) return state def _missing_required_values(self, action: str, values: dict[str, Any]) -> list[str]: @@ -329,21 +426,37 @@ class PamDeployAgent: def run_deploy_flow(self, state: AgentState) -> AgentState: """执行完整部署流程:全局阶段后进入逐 IP 阶段。""" + logger.info( + "部署流程开始 run_id=%s paused=%s pending=%s strategy=%s", + state.run_id, + state.paused, + state.pending_confirmation, + state.execution_strategy, + ) if state.pending_confirmation or state.paused: self._save_checkpoint(state) return state self.run_global_flow(state) self.run_ip_flow(state) + logger.info("部署流程结束 run_id=%s paused=%s pending=%s", state.run_id, state.paused, state.pending_confirmation) return state def run_ip_flow(self, state: AgentState) -> AgentState: """执行逐 IP 部署流程,失败时停在人工确认点。""" + logger.info( + "逐 IP 流程开始 run_id=%s paused=%s target_ips=%s online_ips=%s", + state.run_id, + state.paused, + state.target_ips, + state.online_ips, + ) if state.paused: self._save_checkpoint(state) return state while True: work = self.next_ip_action(state) if work is None: + logger.info("逐 IP 流程完成或等待确认 run_id=%s pending=%s ip_states=%s", state.run_id, state.pending_confirmation, json_for_log(state.ip_states)) return state ip, action = work self.run_ip_action(state, ip, action) @@ -354,6 +467,7 @@ class PamDeployAgent: self._save_checkpoint(state) return None self._resolve_target_ips(state) + logger.info("计算下一个 IP action run_id=%s target_ips=%s", state.run_id, state.target_ips) for ip in state.target_ips: ip_state = state.ip_states.get(ip) if ip_state and ip_state.get("status") == "SUCCESS": @@ -365,6 +479,7 @@ class PamDeployAgent: return None continue if not ip_state: + logger.info("初始化 IP 状态 run_id=%s ip=%s", state.run_id, ip) state.events.append({"type": "IP_START", "ip": ip, "message": "start"}) ip_state = { "status": "RUNNING", @@ -385,6 +500,7 @@ class PamDeployAgent: ip_state["status"] = "SUCCESS" state.events.append({"type": "IP_DONE", "ip": ip, "message": "success"}) self._save_checkpoint(state) + logger.info("IP 部署完成 run_id=%s ip=%s", state.run_id, ip) return None def run_ip_action(self, state: AgentState, ip: str, action: str) -> AgentState: @@ -392,7 +508,16 @@ class PamDeployAgent: ip_state = state.ip_states[ip] completed_steps = ip_state.setdefault("completed_steps", []) if action in completed_steps: + logger.info("跳过已完成 IP action run_id=%s ip=%s action=%s", state.run_id, ip, action) return state + logger.info( + "IP action 开始 run_id=%s ip=%s action=%s backend=%s ip_state=%s", + state.run_id, + ip, + action, + state.action_backends.get(action, ""), + json_for_log(ip_state), + ) self._emit_progress( { "type": "ACTION_START", @@ -405,6 +530,7 @@ class PamDeployAgent: try: result = self.router.run_action(state, action, ip=ip) except Exception as exc: + logger.exception("IP action 调用异常 run_id=%s ip=%s action=%s backend=%s", state.run_id, ip, action, backend) result = ActionResult( action=action, backend=backend, @@ -412,6 +538,14 @@ class PamDeployAgent: error_summary=str(exc), ) failed = (not result.ok) or self._business_failed(action, result.values) + logger.info( + "IP action 返回 run_id=%s ip=%s action=%s failed=%s result=%s", + state.run_id, + ip, + action, + failed, + _action_result_for_log(result), + ) state.events.append( { "type": "ACTION_FAIL" if failed else "ACTION_DONE", @@ -443,6 +577,7 @@ class PamDeployAgent: self._download_log_best_effort(state, ip) state.pending_confirmation = f"rollback-ip:{ip}" self._save_checkpoint(state) + logger.info("IP action 失败并进入确认 run_id=%s ip=%s action=%s pending=%s", state.run_id, ip, action, state.pending_confirmation) return state self._apply_ip_result(ip_state, action, result.values) @@ -462,8 +597,10 @@ class PamDeployAgent: reason="llm_review_blocked", review_context=self._review_context(action=action, analysis=analysis, result=result, ip=ip), ) + logger.info("IP action 被 LLM 审核拦截 run_id=%s ip=%s action=%s analysis=%s", state.run_id, ip, action, json_for_log(asdict(analysis))) return state self._save_checkpoint(state) + logger.info("IP action 完成 run_id=%s ip=%s action=%s completed=%s", state.run_id, ip, action, completed_steps) return state def build_confirmation_request(self, state: AgentState) -> dict[str, Any]: @@ -497,6 +634,13 @@ class PamDeployAgent: ip = request["ip"] ip_state = state.ip_states[ip] + logger.info( + "人工确认开始 run_id=%s approved=%s request=%s note_len=%s", + state.run_id, + approved, + json_for_log(request), + len(operator_note), + ) if not approved: ip_state["rollback_status"] = "REJECTED_BY_OPERATOR" state.events.append( @@ -512,6 +656,7 @@ class PamDeployAgent: state.pause_reason = "" state.review_context = {} self._save_checkpoint(state) + logger.info("人工确认拒绝回滚 run_id=%s ip=%s", state.run_id, ip) return state backend = state.action_backends.get("rollback-ip", "script") @@ -531,12 +676,14 @@ class PamDeployAgent: stop_first=bool(ip_state.get("rollback_stop_first", False)), ) except Exception as exc: + logger.exception("rollback action 调用异常 run_id=%s ip=%s backend=%s", state.run_id, ip, backend) result = ActionResult( action="rollback-ip", backend=backend, ok=False, error_summary=str(exc), ) + logger.info("rollback action 返回 run_id=%s ip=%s result=%s", state.run_id, ip, _action_result_for_log(result)) ip_state["rollback_status"] = "ROLLBACK_DONE" if result.ok else "ROLLBACK_FAILED" state.events.append( { @@ -579,6 +726,14 @@ class PamDeployAgent: } ) self._save_checkpoint(state) + logger.info( + "人工确认处理完成 run_id=%s ip=%s rollback_status=%s pending=%s paused=%s", + state.run_id, + ip, + ip_state.get("rollback_status"), + state.pending_confirmation, + state.paused, + ) return state def _emit_progress(self, payload: dict[str, Any]) -> None: @@ -607,6 +762,7 @@ class PamDeployAgent: """根据在线 IP 和用户指定 IP 计算最终目标 IP。""" if not state.target_ips: state.target_ips = state.online_ips.copy() + logger.info("目标 IP 未指定,使用全部在线 IP run_id=%s target_ips=%s", state.run_id, state.target_ips) return online = set(state.online_ips) requested = state.target_ips @@ -621,6 +777,13 @@ class PamDeployAgent: "target_ips": state.target_ips, } ) + logger.info( + "目标 IP 范围调整 run_id=%s requested=%s target_ips=%s missing=%s", + state.run_id, + requested, + state.target_ips, + missing, + ) def _business_failed(self, action: str, values: dict[str, Any]) -> bool: """识别 exit code 之外的业务失败条件。""" @@ -640,6 +803,14 @@ class PamDeployAgent: """记录单 IP 失败,并设置待回滚确认状态。""" ip_state = state.ip_states[ip] stop_first = action in ("start-ip", "verify-ip") + logger.info( + "记录 IP 失败 run_id=%s ip=%s action=%s reason=%s stop_first=%s", + state.run_id, + ip, + action, + reason, + stop_first, + ) ip_state.update( { "status": "FAILED", @@ -663,6 +834,7 @@ class PamDeployAgent: def _download_log_best_effort(self, state: AgentState, ip: str) -> None: """失败后尽力下载日志,日志失败不覆盖原失败原因。""" backend = state.action_backends.get("download-log", "script") + logger.info("失败后尝试下载日志 run_id=%s ip=%s backend=%s", state.run_id, ip, backend) self._emit_progress( { "type": "ACTION_START", @@ -675,6 +847,7 @@ class PamDeployAgent: try: result = self.router.run_action(state, "download-log", ip=ip) except Exception as exc: + logger.exception("失败后下载日志 action 调用异常 run_id=%s ip=%s backend=%s", state.run_id, ip, backend) result = ActionResult( action="download-log", backend=backend, @@ -702,6 +875,7 @@ class PamDeployAgent: "message": result.values.get("MESSAGE", "已尽力下载日志"), } ) + logger.info("失败后下载日志完成 run_id=%s ip=%s result=%s", state.run_id, ip, _action_result_for_log(result)) else: state.events.append( { @@ -721,11 +895,21 @@ class PamDeployAgent: "message": result.error_summary or "尽力下载日志失败", } ) + logger.info("失败后下载日志失败 run_id=%s ip=%s result=%s", state.run_id, ip, _action_result_for_log(result)) self._append_action_analysis(state, "download-log", result, ip=ip) def _save_checkpoint(self, state: AgentState) -> None: """如果配置了 checkpoint 路径,则保存完整运行状态。""" if state.checkpoint_path: + logger.info( + "保存 checkpoint run_id=%s path=%s paused=%s pending=%s last_success=%s last_failed=%s", + state.run_id, + state.checkpoint_path, + state.paused, + state.pending_confirmation, + state.last_success_step, + state.last_failed_step, + ) save_checkpoint(state, state.checkpoint_path, redact=False) def _append_action_analysis( @@ -737,6 +921,13 @@ class PamDeployAgent: ip: str | None = None, ) -> Any: """启用 action 后分析时,把诊断结果追加到 events。""" + logger.info( + "LLM action 审核开始 run_id=%s action=%s ip=%s result=%s", + state.run_id, + action, + ip or "", + _action_result_for_log(result), + ) self._emit_progress( { "type": "ACTION_REVIEW_START", @@ -752,6 +943,7 @@ class PamDeployAgent: state_summary=self._state_summary_for_llm(state, ip=ip), ) except Exception as exc: # pragma: no cover - 审核失败时也要显式暂停,避免黑盒继续执行 + logger.exception("LLM action 审核失败 run_id=%s action=%s ip=%s", state.run_id, action, ip or "") state.events.append( { "type": "ACTION_ANALYSIS_FAIL", @@ -795,6 +987,13 @@ class PamDeployAgent: "should_continue": analysis.should_continue, } ) + logger.info( + "LLM action 审核完成 run_id=%s action=%s ip=%s analysis=%s", + state.run_id, + action, + ip or "", + json_for_log(payload), + ) return analysis def _state_summary_for_llm(self, state: AgentState, *, ip: str | None = None) -> dict[str, Any]: @@ -895,3 +1094,20 @@ def _normalize_local_file_path(path: str) -> str: if value.is_absolute(): return str(value) return str(value.resolve()) + + +def _action_result_for_log(result: ActionResult) -> str: + """生成 action 结果日志摘要,避免写入完整 stdout/stderr。""" + return json_for_log( + { + "action": result.action, + "backend": result.backend, + "ok": result.ok, + "exit_code": result.exit_code, + "tool_name": result.tool_name, + "values": result.values, + "stderr": result.stderr, + "error_summary": result.error_summary, + }, + max_text_len=1000, + ) diff --git a/pam_deploy_graph/cli.py b/pam_deploy_graph/cli.py index 9e6eafc..b28ba33 100644 --- a/pam_deploy_graph/cli.py +++ b/pam_deploy_graph/cli.py @@ -4,6 +4,7 @@ from __future__ import annotations import argparse import json +import logging from dataclasses import asdict from .agent import PamDeployAgent @@ -11,9 +12,12 @@ from .checkpoint_store import load_agent_state, redact_mapping from .interactive import run_interactive_chat from .langgraph_runtime import LangGraphDeploymentRuntime, LangGraphRunResult from .llm import build_llm_client +from .logging_utils import configure_logging, json_for_log from .mcp_factory import build_mcp_runner_from_config from .params_loader import load_params_file +logger = logging.getLogger(__name__) + def add_llm_args(parser: argparse.ArgumentParser) -> None: """为子命令追加真实 LLM 配置参数。""" @@ -125,7 +129,11 @@ def main() -> None: add_action_analysis_arg(confirm) args = parser.parse_args() + log_path = configure_logging() + logger.info("CLI 启动 command=%s args=%s log_path=%s", args.command, json_for_log(vars(args)), log_path) params = load_params_file(args.config) if getattr(args, "config", None) else {} + if getattr(args, "config", None): + logger.info("参数文件已加载 command=%s config=%s params=%s", args.command, args.config, json_for_log(params)) llm_client = None if args.command != "preview": llm_client = build_llm_client( @@ -136,7 +144,9 @@ def main() -> None: ) mcp_runner = None if getattr(args, "mcp_config", None): + logger.info("开始加载 MCP 配置 path=%s", args.mcp_config) mcp_runner = build_mcp_runner_from_config(args.mcp_config) + logger.info("MCP 配置加载完成 path=%s runner=%s", args.mcp_config, type(mcp_runner).__name__) agent = PamDeployAgent( llm_client=llm_client, mcp_runner=mcp_runner, @@ -144,12 +154,15 @@ def main() -> None: ) if args.command == "analyze": + logger.info("开始执行 analyze text_len=%s", len(args.text)) result = agent.analyze_request(args.text, params) payload = redact_mapping({key: asdict(value) for key, value in result.items()}) + logger.info("analyze 完成 result=%s", json_for_log(payload)) print(json.dumps(payload, ensure_ascii=False, indent=2)) return if args.command == "chat": + logger.info("进入 chat 模式 strategy=%s checkpoint=%s target_ips=%s", args.strategy, args.checkpoint, args.target_ip) run_interactive_chat( agent=agent, params=params, @@ -160,11 +173,13 @@ def main() -> None: return if args.command == "preview": + logger.info("执行 preview strategy=%s", args.strategy) print(agent.preview(params, args.strategy)) return require_confirm(args) if args.command == "run-global": + logger.info("开始 run-global strategy=%s checkpoint=%s", args.strategy, args.checkpoint) state = agent.create_state( params=params, execution_strategy=args.strategy, @@ -177,6 +192,7 @@ def main() -> None: return if args.command == "resume": + logger.info("开始 resume checkpoint=%s", args.checkpoint) state = load_agent_state(args.checkpoint) state.checkpoint_path = state.checkpoint_path or args.checkpoint if state.paused: @@ -186,6 +202,7 @@ def main() -> None: return if args.command == "confirm": + logger.info("开始 confirm checkpoint=%s decision=%s note_len=%s", args.checkpoint, args.decision, len(args.note)) state = load_agent_state(args.checkpoint) state.checkpoint_path = state.checkpoint_path or args.checkpoint runtime = LangGraphDeploymentRuntime(agent=agent, flow="deploy") @@ -197,6 +214,7 @@ def main() -> None: print_graph_result(agent, first) return + logger.info("开始 run-deploy strategy=%s checkpoint=%s target_ips=%s", args.strategy, args.checkpoint, args.target_ip) state = agent.create_state( params=params, execution_strategy=args.strategy, diff --git a/pam_deploy_graph/constants.py b/pam_deploy_graph/constants.py index 8eba2d8..1dcebf0 100644 --- a/pam_deploy_graph/constants.py +++ b/pam_deploy_graph/constants.py @@ -73,6 +73,12 @@ SENSITIVE_KEYS = { "MCP_TOKEN", "TOKEN", "Authorization", + "authorization", "access_token", "ACCESS_TOKEN", + "api_key", + "API_KEY", + "PAM_LLM_API_KEY", + "password", + "PASSWORD", } diff --git a/pam_deploy_graph/interactive.py b/pam_deploy_graph/interactive.py index fa83198..ce94f02 100644 --- a/pam_deploy_graph/interactive.py +++ b/pam_deploy_graph/interactive.py @@ -6,6 +6,7 @@ import time import json import shlex import builtins +import logging import os import sys from dataclasses import asdict @@ -17,12 +18,14 @@ from .checkpoint_store import load_agent_state, redact_mapping from .langgraph_runtime import LangGraphDeploymentRuntime, LangGraphRunResult from .llm import build_llm_client from .llm.rule_based import RuleBasedLlmClient +from .logging_utils import configure_logging, json_for_log, redact_for_log from .mcp_factory import build_mcp_runner_from_config from .models import AgentState, ExecutionStrategy from .params_loader import load_params_file InputFunc = Callable[[str], str] OutputFunc = Callable[[str], None] +logger = logging.getLogger(__name__) COMMAND_HELP = """可用命令: help 显示帮助 @@ -32,6 +35,7 @@ COMMAND_HELP = """可用命令: events [数量] 查看最近 action 事件,默认 10 条 set KEY=VALUE 修改当前会话参数 llm config KEY=VALUE 配置真实 LLM,支持 base_url/api_key/model/action_analysis_prompt_file + llm test [文本] 测试当前 LLM client 是否可正常调用 llm fallback 切回本地规则 fallback llm action-analysis on|off 开关 action 审核详情写入 events mcp config <路径> 加载 MCP client JSON 配置 @@ -79,9 +83,20 @@ class InteractiveCliSession: self.mcp_config_path: str = "" self.graph_runtime: LangGraphDeploymentRuntime | None = None self.agent.progress_callback = self._on_progress + self.log_path = configure_logging() + logger.info( + "chat 会话已初始化 strategy=%s checkpoint=%s target_ips=%s llm_client=%s log_path=%s params=%s", + self.strategy, + self.checkpoint_path, + self.target_ips, + type(self.agent.llm_client).__name__, + self.log_path, + json_for_log(self.params), + ) def run(self) -> None: """启动 REPL 循环,直到用户 exit 或输入流结束。""" + logger.info("chat REPL 启动 checkpoint=%s", self.checkpoint_path) self.output("PAM 部署 Agent 交互式会话") self.output("输入 help 查看命令,输入 exit 退出。") self._load_existing_checkpoint_if_any() @@ -89,9 +104,11 @@ class InteractiveCliSession: try: line = self.input("pam-deploy-agent> ") except KeyboardInterrupt: + logger.info("chat 输入被用户中断") self.output("已取消当前输入。输入 exit 退出,或继续输入命令。") continue except EOFError: + logger.info("chat 输入流结束") self.output("bye") return if not self.handle_line(line): @@ -105,8 +122,14 @@ class InteractiveCliSession: command, _, rest = text.partition(" ") normalized = command.lower() + logger.info( + "chat 收到输入 command=%s text=%s", + normalized, + redact_for_log(text, max_text_len=500), + ) if normalized in ("exit", "quit", "q"): + logger.info("chat 会话退出") self.output("bye") return False if normalized in ("help", "?"): @@ -162,9 +185,11 @@ class InteractiveCliSession: return True if _is_small_talk(text): + logger.info("chat 输入识别为寒暄,跳过结构化分析") self.output("你好。可以输入 help 查看命令,或直接描述部署需求;执行前仍需输入 run 并确认。") return True if not _looks_like_deploy_request(text): + logger.info("chat 输入未命中部署需求粗筛,跳过结构化分析") self.output("我没有识别到明确的部署需求。可以输入 help 查看命令,或用 analyze <需求> 明确触发需求分析。") return True @@ -179,8 +204,10 @@ class InteractiveCliSession: return try: + logger.info("chat 开始需求分析 text_len=%s base_params=%s", len(text), json_for_log(self.params)) result = self.agent.analyze_request(text, self.params) except Exception as exc: + logger.exception("chat 需求分析失败 text=%s", redact_for_log(text, max_text_len=500)) self.output(f"需求分析失败: {exc}") return self.last_analysis = result @@ -206,6 +233,13 @@ class InteractiveCliSession: self.output("- target_ips: " + ", ".join(self.target_ips)) self.output("执行请输 run;查看完整 JSON 可用一次性 analyze 命令。") self.output(_format_redacted_params(safe_payload["params"]["extracted_params"])) + logger.info( + "chat 需求分析完成 intent=%s strategy=%s target_ips=%s result=%s", + intent_result.intent, + self.strategy, + self.target_ips, + json_for_log(safe_payload), + ) def _set_param(self, assignment: str) -> None: """处理 `set KEY=VALUE` 命令,更新当前会话参数。""" @@ -220,6 +254,7 @@ class InteractiveCliSession: self.params[key] = value.strip() self._sync_params_to_state() self.output(f"已设置 {key}") + logger.info("chat 参数已设置 key=%s params=%s", key, json_for_log(self.params)) def _show_params(self) -> None: """脱敏展示当前会话参数。""" @@ -241,13 +276,23 @@ class InteractiveCliSession: def _configure_llm(self, text: str) -> None: """热加载 LLM 配置,或开关 action 后诊断。""" if not text: - self.output("格式:llm config base_url=... api_key=... model=... action_analysis_prompt_file=... | llm fallback | llm action-analysis on|off") + self.output("格式:llm config base_url=... api_key=... model=... action_analysis_prompt_file=... | llm test [文本] | llm fallback | llm action-analysis on|off") + return + try: + parts = shlex.split(text) + except ValueError as exc: + logger.exception("chat LLM 命令解析失败 text=%s", redact_for_log(text, max_text_len=500)) + self.output(f"LLM 命令解析失败: {exc}") return - parts = shlex.split(text) if parts[0] == "fallback": self.agent.llm_client = RuleBasedLlmClient() self.llm_config = {} self.output("已切回本地规则 LLM fallback。") + logger.info("chat LLM 已切回 fallback") + return + if parts[0] == "test": + test_text = text.partition("test")[2].strip() + self._test_llm(test_text) return if parts[0] == "action-analysis": if len(parts) < 2 or parts[1] not in ("on", "off"): @@ -255,6 +300,7 @@ class InteractiveCliSession: return self.agent.action_analysis_enabled = parts[1] == "on" self.output(f"action 审核详情写入 events 已{'开启' if self.agent.action_analysis_enabled else '关闭'}。") + logger.info("chat action 审核事件写入开关=%s", self.agent.action_analysis_enabled) return if parts[0] != "config": self.output("未知 llm 命令。") @@ -262,6 +308,7 @@ class InteractiveCliSession: updates = _parse_key_values(parts[1:]) self.llm_config.update(updates) try: + logger.info("chat 开始加载 LLM 配置 updates=%s merged=%s", json_for_log(updates), json_for_log(self.llm_config)) self.agent.llm_client = build_llm_client( base_url=self.llm_config.get("base_url"), api_key=self.llm_config.get("api_key"), @@ -269,12 +316,35 @@ class InteractiveCliSession: action_analysis_prompt_path=self.llm_config.get("action_analysis_prompt_file"), ) except Exception as exc: + logger.exception("chat LLM 配置失败 config=%s", json_for_log(self.llm_config)) self.output(f"LLM 配置失败: {exc}") return safe = {**self.llm_config} if safe.get("api_key"): safe["api_key"] = "***" self.output("LLM 配置已加载: " + json.dumps(safe, ensure_ascii=False)) + logger.info("chat LLM 配置已加载 client=%s config=%s", type(self.agent.llm_client).__name__, json_for_log(self.llm_config)) + + def _test_llm(self, text: str) -> None: + """使用当前 LLM client 做一次轻量调用,便于确认配置是否生效。""" + prompt = text or "请判断这是一次 LLM 连通性测试,并返回结构化意图。" + client_name = type(self.agent.llm_client).__name__ + self.output(f"正在测试 LLM: {client_name}") + logger.info("chat LLM 测试开始 client=%s prompt=%s", client_name, redact_for_log(prompt, max_text_len=500)) + try: + result = self.agent.understand_request(prompt) + except Exception as exc: + logger.exception("chat LLM 测试失败 client=%s", client_name) + self.output(f"LLM 测试失败: {exc}") + return + self.output("LLM 测试通过") + self.output(f"- client: {client_name}") + self.output(f"- intent: {result.intent}") + self.output(f"- strategy: {result.strategy_preference}") + self.output(f"- confidence: {result.confidence}") + if result.reasons: + self.output("- reasons: " + "; ".join(result.reasons)) + logger.info("chat LLM 测试通过 client=%s result=%s", client_name, json_for_log(asdict(result))) def _configure_mcp(self, text: str) -> None: """热加载 MCP client 配置。""" @@ -284,18 +354,22 @@ class InteractiveCliSession: return path = path.strip().strip('"') try: + logger.info("chat 开始加载 MCP 配置 path=%s", path) runner = build_mcp_runner_from_config(path) except Exception as exc: + logger.exception("chat MCP 配置失败 path=%s", path) self.output(f"MCP 配置失败: {exc}") return self.agent.mcp_runner = runner self.agent.router.mcp_runner = runner self.mcp_config_path = path self.output(f"MCP 配置已加载: {path}") + logger.info("chat MCP 配置已加载 path=%s runner=%s", path, type(runner).__name__) def _list_checkpoints(self) -> None: """列出当前 checkpoint 目录下的 JSON 文件。""" checkpoint_dir = Path(self.checkpoint_path).parent + logger.info("chat 查询 checkpoint 列表 dir=%s", checkpoint_dir) if not checkpoint_dir.exists(): self.output(f"checkpoint 目录不存在: {checkpoint_dir}") return @@ -314,6 +388,7 @@ class InteractiveCliSession: self.output("格式:load checkpoint <路径>") return checkpoint = Path(path_text) + logger.info("chat 开始加载 checkpoint path=%s", checkpoint) if not checkpoint.exists(): self.output(f"checkpoint 不存在: {checkpoint}") return @@ -325,6 +400,14 @@ class InteractiveCliSession: self.target_ips = list(self.state.target_ips) self.graph_runtime = None self.output(f"已加载 checkpoint: {checkpoint}") + logger.info( + "chat checkpoint 已加载 path=%s run_id=%s strategy=%s paused=%s pending=%s", + checkpoint, + self.state.run_id, + self.strategy, + self.state.paused, + self.state.pending_confirmation, + ) if self.state.pending_confirmation: self._print_confirmation() self._print_pause_context() @@ -335,35 +418,43 @@ class InteractiveCliSession: self.output("格式:load params <路径>") return path = Path(path_text) + logger.info("chat 开始加载参数文件 path=%s", path) if not path.exists(): self.output(f"参数文件不存在: {path}") return try: updates = load_params_file(path) except Exception as exc: + logger.exception("chat 参数文件加载失败 path=%s", path) self.output(f"参数文件加载失败: {exc}") return self.params.update(updates) try: self.params = self.agent.normalize_params(self.params) except ValueError as exc: + logger.exception("chat 参数热更新归一化失败 path=%s updates=%s", path, json_for_log(updates)) self.output(f"参数热更新失败: {exc}") return self._sync_params_to_state() self.output(f"已加载参数文件: {path}") self.output(_format_redacted_params(redact_mapping(self.params))) + logger.info("chat 参数文件已加载 path=%s updates=%s params=%s", path, json_for_log(updates), json_for_log(self.params)) def _run_deploy(self) -> None: """在用户确认后创建状态并执行完整部署流程。""" + logger.info("chat run 请求开始 strategy=%s checkpoint=%s target_ips=%s", self.strategy, self.checkpoint_path, self.target_ips) if self.state and self.state.pending_confirmation: + logger.info("chat run 命中待确认事项 pending=%s", self.state.pending_confirmation) self._print_confirmation() return if not self._prepare_params_for_run(): + logger.info("chat run 参数准备失败") return problems = self._validate_run_prerequisites(self.params) if problems: + logger.info("chat run 前置检查失败 problems=%s", problems) self.output("执行前检查未通过:") for problem in problems: self.output(f"- {problem}") @@ -371,10 +462,12 @@ class InteractiveCliSession: return if not self._confirm_params_and_scope(): + logger.info("chat run 用户取消参数或目标范围确认") self.output("已取消执行。") return if not self._ask_yes_no("即将执行真实 action;确认执行请输入 yes: "): + logger.info("chat run 用户取消最终执行确认") self.output("已取消执行。") return @@ -385,6 +478,14 @@ class InteractiveCliSession: target_ips=self.target_ips, ) self.graph_runtime = None + logger.info( + "chat run state 已创建 run_id=%s strategy=%s checkpoint=%s config=%s target_ips=%s", + self.state.run_id, + self.state.execution_strategy, + self.state.checkpoint_path, + self.state.config_path, + self.state.target_ips, + ) self._execute_current_state() def _confirm_params_and_scope(self) -> bool: @@ -404,14 +505,19 @@ class InteractiveCliSession: checkpoint = Path(self.checkpoint_path) if not checkpoint.exists(): self.output("当前没有可续跑的 checkpoint。") + logger.info("chat resume 未找到 checkpoint path=%s", checkpoint) return + logger.info("chat resume 从 checkpoint 加载 path=%s", checkpoint) self.state = load_agent_state(checkpoint) self.state.checkpoint_path = self.state.checkpoint_path or str(checkpoint) if self.state.paused: + logger.info("chat resume 清理暂停状态 run_id=%s reason=%s", self.state.run_id, self.state.pause_reason) self.state = self.agent.resume_state(self.state) if self.graph_runtime and self.graph_runtime.waiting_confirmation: + logger.info("chat resume 停在 LangGraph 确认点 pending=%s", self.state.pending_confirmation if self.state else "") self._print_confirmation() return + logger.info("chat resume 开始执行 run_id=%s checkpoint=%s", self.state.run_id if self.state else "", self.checkpoint_path) self._execute_current_state() def _execute_current_state(self) -> None: @@ -419,10 +525,19 @@ class InteractiveCliSession: if self.state is None: self.output("当前没有运行状态。") return + logger.info( + "chat 开始执行当前 state run_id=%s strategy=%s checkpoint=%s graph_runtime=%s waiting_confirmation=%s", + self.state.run_id, + self.state.execution_strategy, + self.state.checkpoint_path, + type(self.graph_runtime).__name__ if self.graph_runtime else "", + self.graph_runtime.waiting_confirmation if self.graph_runtime else False, + ) if self.graph_runtime is None or not self.graph_runtime.waiting_confirmation: try: self.graph_runtime = LangGraphDeploymentRuntime(agent=self.agent) except RuntimeError as exc: + logger.exception("chat LangGraph runtime 不可用,降级本地执行 run_id=%s", self.state.run_id) self.output(f"LangGraph 确认运行器不可用,降级为本地执行: {exc}") self.graph_runtime = None try: @@ -434,6 +549,7 @@ class InteractiveCliSession: self._handle_execution_error(fallback_exc) return self._print_state_report_and_checkpoint() + logger.info("chat 本地执行完成 run_id=%s checkpoint=%s", self.state.run_id, self.state.checkpoint_path) return try: result = self.graph_runtime.start(self.state) @@ -444,14 +560,22 @@ class InteractiveCliSession: self._handle_execution_error(exc) return self._apply_graph_result(result) + logger.info( + "chat LangGraph 执行返回 interrupted=%s pending=%s checkpoint=%s", + result.interrupted, + self.state.pending_confirmation if self.state else "", + self.state.checkpoint_path if self.state else self.checkpoint_path, + ) def _prepare_params_for_run(self) -> bool: """执行前归一化参数,确保确认值和实际写入脚本配置一致。""" try: self.params = self.agent.normalize_params(self.params) except ValueError as exc: + logger.exception("chat 参数检查失败 params=%s", json_for_log(self.params)) self.output(f"参数检查失败: {exc}") return False + logger.info("chat 参数检查通过 params=%s", json_for_log(self.params)) return True def _validate_run_prerequisites(self, params: dict[str, Any]) -> list[str]: @@ -473,6 +597,11 @@ class InteractiveCliSession: def _handle_execution_error(self, exc: Exception) -> None: """输出 action 执行失败后的可恢复提示,不再误报 LangGraph 不可用。""" self.output(f"执行已停止: {exc}") + logger.exception( + "chat 执行停止 run_id=%s checkpoint=%s", + self.state.run_id if self.state else "", + self.state.checkpoint_path if self.state else self.checkpoint_path, + ) if self.state is None: return if self.state.last_failed_step: @@ -487,8 +616,10 @@ class InteractiveCliSession: """处理执行中的用户中断,并保留断点。""" if self.state is None: self.output("执行已中断。") + logger.info("chat 执行中断时没有 state") return self.graph_runtime = None + logger.info("chat 执行被用户中断 run_id=%s checkpoint=%s", self.state.run_id, self.state.checkpoint_path) self.state = self.agent.pause_state( self.state, reason="user_interrupted", @@ -505,6 +636,13 @@ class InteractiveCliSession: if self.state is None: self.output("当前没有运行状态。") return + logger.info( + "chat 应用 LangGraph 结果 run_id=%s interrupted=%s pending=%s paused=%s", + self.state.run_id, + result.interrupted, + self.state.pending_confirmation, + self.state.paused, + ) self.output(result.report or self.agent.render_report(self.state)) if result.interrupted and result.confirmation: self._print_confirmation_request(result.confirmation) @@ -539,25 +677,42 @@ class InteractiveCliSession: if self.state is None: checkpoint = Path(self.checkpoint_path) if checkpoint.exists(): + logger.info("chat confirm 从 checkpoint 加载 path=%s", checkpoint) self.state = load_agent_state(checkpoint) self.state.checkpoint_path = self.state.checkpoint_path or str(checkpoint) else: self.output("当前没有待确认任务。") + logger.info("chat confirm 无 state 且 checkpoint 不存在 path=%s", checkpoint) return if not self.state.pending_confirmation: self.output("当前没有待确认任务。") + logger.info("chat confirm 无待确认事项 run_id=%s", self.state.run_id) return + logger.info( + "chat confirm 开始 run_id=%s approved=%s pending=%s note_len=%s", + self.state.run_id, + approved, + self.state.pending_confirmation, + len(note), + ) if self.graph_runtime and self.graph_runtime.waiting_confirmation: try: result = self.graph_runtime.resume(approved=approved, note=note) except RuntimeError as exc: + logger.exception("chat LangGraph 确认恢复失败,降级本地确认 run_id=%s", self.state.run_id) self.output(f"LangGraph 确认恢复失败,降级为本地确认: {exc}") else: self._apply_graph_result(result) return self.state = self.agent.confirm_pending(self.state, approved=approved, operator_note=note) + logger.info( + "chat confirm 完成 run_id=%s pending=%s paused=%s", + self.state.run_id, + self.state.pending_confirmation, + self.state.paused, + ) self.output(self.agent.render_report(self.state)) self._print_pause_context() if self.state.pending_confirmation: @@ -570,11 +725,19 @@ class InteractiveCliSession: try: self.state = self.agent.update_state_params(self.state, self.params) except ValueError as exc: + logger.exception("chat 参数同步到 state 失败 run_id=%s params=%s", self.state.run_id, json_for_log(self.params)) self.output(f"参数同步到当前任务失败: {exc}") return self.params = dict(self.state.params) if self.target_ips: self.state.target_ips = list(self.target_ips) + logger.info( + "chat 参数已同步到 state run_id=%s checkpoint=%s params=%s target_ips=%s", + self.state.run_id, + self.state.checkpoint_path, + json_for_log(self.params), + self.target_ips, + ) def _print_pause_context(self) -> None: """输出暂停原因和审核建议,避免黑盒暂停。""" @@ -631,6 +794,7 @@ class InteractiveCliSession: elif event_type == "ACTION_REVIEW_FAIL": detail = f": {message}" if message else "" self.output(f"分析失败: {stage}{suffix}{detail}") + logger.info("chat progress event=%s", json_for_log(payload)) def _print_confirmation(self) -> None: """输出当前待人工确认事项。""" @@ -666,6 +830,7 @@ class InteractiveCliSession: checkpoint = Path(self.checkpoint_path) if not checkpoint.exists(): return + logger.info("chat 启动时自动加载已有 checkpoint path=%s", checkpoint) self.state = load_agent_state(checkpoint) self.state.checkpoint_path = self.state.checkpoint_path or str(checkpoint) self.output(f"已加载 checkpoint: {checkpoint}") @@ -807,6 +972,7 @@ def _build_prompt_input(input_func: InputFunc) -> InputFunc: "events", "set", "llm config", + "llm test", "llm fallback", "llm action-analysis on", "llm action-analysis off", diff --git a/pam_deploy_graph/langgraph_runtime.py b/pam_deploy_graph/langgraph_runtime.py index e5736d7..e6d1240 100644 --- a/pam_deploy_graph/langgraph_runtime.py +++ b/pam_deploy_graph/langgraph_runtime.py @@ -2,14 +2,17 @@ from __future__ import annotations +import logging from dataclasses import dataclass, field from typing import Any, Literal from uuid import uuid4 from .agent import PamDeployAgent +from .logging_utils import json_for_log from .models import AgentState GraphFlow = Literal["global", "deploy"] +logger = logging.getLogger(__name__) @dataclass(slots=True) @@ -39,6 +42,12 @@ class LangGraphDeploymentRuntime: self.flow = flow self._waiting_confirmation = False self._graph = build_deployment_graph(agent=self.agent, flow=self.flow) + logger.info( + "LangGraph runtime 初始化 thread_id=%s flow=%s agent=%s", + self.thread_id, + self.flow, + type(self.agent).__name__, + ) @property def waiting_confirmation(self) -> bool: @@ -48,6 +57,14 @@ class LangGraphDeploymentRuntime: def start(self, state: AgentState) -> LangGraphRunResult: """从给定 AgentState 开始执行,直到结束或遇到人工确认点。""" self._waiting_confirmation = False + logger.info( + "LangGraph start run_id=%s thread_id=%s flow=%s paused=%s pending=%s", + state.run_id, + self.thread_id, + self.flow, + state.paused, + state.pending_confirmation, + ) return self._consume(self._graph.stream({"agent_state": state}, self._config())) def resume(self, *, approved: bool, note: str = "") -> LangGraphRunResult: @@ -58,6 +75,7 @@ class LangGraphDeploymentRuntime: raise RuntimeError("未安装 langgraph,无法恢复 interrupt。") from exc decision = {"approved": approved, "note": note} + logger.info("LangGraph resume thread_id=%s decision=%s note_len=%s", self.thread_id, approved, len(note)) return self._consume(self._graph.stream(Command(resume=decision), self._config())) def _config(self) -> dict[str, Any]: @@ -69,9 +87,11 @@ class LangGraphDeploymentRuntime: result = LangGraphRunResult() for chunk in chunks: result.chunks.append(chunk) + logger.info("LangGraph chunk=%s", json_for_log(chunk, max_text_len=1600)) if "__interrupt__" in chunk: result.interrupted = True result.confirmation = _extract_interrupt_value(chunk["__interrupt__"]) + logger.info("LangGraph interrupt thread_id=%s confirmation=%s", self.thread_id, json_for_log(result.confirmation)) continue for value in chunk.values(): @@ -83,11 +103,20 @@ class LangGraphDeploymentRuntime: result.report = value["report"] self._waiting_confirmation = result.interrupted + logger.info( + "LangGraph consume 完成 thread_id=%s interrupted=%s waiting=%s state_run_id=%s report_len=%s", + self.thread_id, + result.interrupted, + self._waiting_confirmation, + result.state.run_id if result.state else "", + len(result.report), + ) return result def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy"): """构建 action 级别的 LangGraph 部署图。""" + logger.info("开始构建 LangGraph 部署图 flow=%s", flow) try: from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import END, START, StateGraph @@ -97,6 +126,8 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy") def entry_node(state: dict[str, Any]) -> dict[str, Any]: """保留入口节点,便于统一路由已有 state 或恢复 state。""" + agent_state = state["agent_state"] + logger.info("LangGraph entry_node run_id=%s pending=%s paused=%s", agent_state.run_id, agent_state.pending_confirmation, agent_state.paused) return {"agent_state": state["agent_state"]} def global_action_node(state: dict[str, Any]) -> dict[str, Any]: @@ -104,6 +135,7 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy") agent_state = state["agent_state"] action = agent.next_global_action(agent_state) if action: + logger.info("LangGraph global_action_node run_id=%s action=%s", agent_state.run_id, action) agent.run_global_action(agent_state, action) return {"agent_state": agent_state} @@ -112,8 +144,10 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy") agent_state = state["agent_state"] work = agent.next_ip_action(agent_state) if work is None: + logger.info("LangGraph prepare_ip_node 无待执行 IP action run_id=%s", agent_state.run_id) return {"agent_state": agent_state, "current_ip": "", "current_ip_action": ""} ip, action = work + logger.info("LangGraph prepare_ip_node run_id=%s ip=%s action=%s", agent_state.run_id, ip, action) return {"agent_state": agent_state, "current_ip": ip, "current_ip_action": action} def ip_action_node(state: dict[str, Any]) -> dict[str, Any]: @@ -122,6 +156,7 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy") ip = str(state.get("current_ip", "")) action = str(state.get("current_ip_action", "")) if ip and action: + logger.info("LangGraph ip_action_node run_id=%s ip=%s action=%s", agent_state.run_id, ip, action) agent.run_ip_action(agent_state, ip, action) return {"agent_state": agent_state, "current_ip": "", "current_ip_action": ""} @@ -129,8 +164,10 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy") """把确认请求交给 LangGraph interrupt,并在恢复后执行确认动作。""" agent_state = state["agent_state"] request = agent.build_confirmation_request(agent_state) + logger.info("LangGraph confirm_node interrupt run_id=%s request=%s", agent_state.run_id, json_for_log(request)) decision = interrupt(request) approved, note = _parse_confirmation_decision(decision) + logger.info("LangGraph confirm_node resume run_id=%s approved=%s note_len=%s", agent_state.run_id, approved, len(note)) agent_state = agent.confirm_pending( agent_state, approved=approved, @@ -140,6 +177,8 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy") def report_node(state: dict[str, Any]) -> dict[str, Any]: """渲染当前状态报告。""" + agent_state = state["agent_state"] + logger.info("LangGraph report_node run_id=%s pending=%s paused=%s", agent_state.run_id, agent_state.pending_confirmation, agent_state.paused) return { "agent_state": state["agent_state"], "report": agent.render_report(state["agent_state"]), @@ -149,29 +188,39 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy") """从入口决定进入全局、IP、确认或报告节点。""" agent_state = state["agent_state"] if agent_state.pending_confirmation: + logger.info("LangGraph route_entry -> confirm run_id=%s", agent_state.run_id) return "confirm" if agent.next_global_action(agent_state): + logger.info("LangGraph route_entry -> global_action run_id=%s", agent_state.run_id) return "global_action" if flow == "global": + logger.info("LangGraph route_entry -> report run_id=%s", agent_state.run_id) return "report" + logger.info("LangGraph route_entry -> prepare_ip run_id=%s", agent_state.run_id) return "prepare_ip" def route_after_global(state: dict[str, Any]) -> str: """全局 action 后继续全局循环或进入 IP 阶段。""" agent_state = state["agent_state"] if agent.next_global_action(agent_state): + logger.info("LangGraph route_after_global -> global_action run_id=%s", agent_state.run_id) return "global_action" if flow == "global": + logger.info("LangGraph route_after_global -> report run_id=%s", agent_state.run_id) return "report" + logger.info("LangGraph route_after_global -> prepare_ip run_id=%s", agent_state.run_id) return "prepare_ip" def route_after_prepare_ip(state: dict[str, Any]) -> str: """IP 准备节点后进入确认、单 IP action 或报告。""" agent_state = state["agent_state"] if agent_state.pending_confirmation: + logger.info("LangGraph route_after_prepare_ip -> confirm run_id=%s", agent_state.run_id) return "confirm" if state.get("current_ip_action"): + logger.info("LangGraph route_after_prepare_ip -> ip_action run_id=%s ip=%s action=%s", agent_state.run_id, state.get("current_ip"), state.get("current_ip_action")) return "ip_action" + logger.info("LangGraph route_after_prepare_ip -> report run_id=%s", agent_state.run_id) return "report" graph = StateGraph(dict) @@ -210,7 +259,9 @@ def build_deployment_graph(*, agent: PamDeployAgent, flow: GraphFlow = "deploy") graph.add_edge("ip_action", "prepare_ip") graph.add_edge("confirm", "entry") graph.add_edge("report", END) - return graph.compile(checkpointer=InMemorySaver()) + compiled = graph.compile(checkpointer=InMemorySaver()) + logger.info("LangGraph 部署图构建完成 flow=%s", flow) + return compiled def _extract_interrupt_value(interrupts: Any) -> dict[str, Any]: diff --git a/pam_deploy_graph/llm/factory.py b/pam_deploy_graph/llm/factory.py index f85d676..25a5c3b 100644 --- a/pam_deploy_graph/llm/factory.py +++ b/pam_deploy_graph/llm/factory.py @@ -3,11 +3,15 @@ from __future__ import annotations import os +import logging +from pam_deploy_graph.logging_utils import json_for_log from .base import LlmClient from .openai_compatible import OpenAICompatibleLlmClient, load_prompt_text from .rule_based import RuleBasedLlmClient +logger = logging.getLogger(__name__) + def build_llm_client( *, @@ -25,8 +29,24 @@ def build_llm_client( if action_analysis_prompt_path is not None else os.getenv("PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE", "") ) + logger.info( + "构建 LLM client base_url=%s model=%s has_api_key=%s action_prompt_path=%s explicit=%s", + actual_base_url, + actual_model, + bool(actual_api_key), + actual_action_prompt_path, + json_for_log( + { + "base_url": base_url, + "api_key": api_key, + "model": model, + "action_analysis_prompt_path": action_analysis_prompt_path, + } + ), + ) if not actual_base_url and not actual_api_key and not actual_model: + logger.info("未配置真实 LLM,使用 RuleBasedLlmClient fallback") return RuleBasedLlmClient() missing = [] @@ -35,11 +55,14 @@ def build_llm_client( if not actual_model: missing.append("model") if missing: + logger.info("LLM 配置不完整 missing=%s", missing) raise ValueError(f"LLM 配置不完整,缺少: {', '.join(missing)}") - return OpenAICompatibleLlmClient( + client = OpenAICompatibleLlmClient( base_url=actual_base_url, api_key=actual_api_key, model=actual_model, action_analysis_prompt=load_prompt_text(actual_action_prompt_path), ) + logger.info("真实 LLM client 构建完成 client=%s model=%s has_api_key=%s", type(client).__name__, actual_model, bool(actual_api_key)) + return client diff --git a/pam_deploy_graph/llm/openai_compatible.py b/pam_deploy_graph/llm/openai_compatible.py index 45d1b77..8098ee7 100644 --- a/pam_deploy_graph/llm/openai_compatible.py +++ b/pam_deploy_graph/llm/openai_compatible.py @@ -7,6 +7,8 @@ from __future__ import annotations import json +import logging +import time from pathlib import Path import urllib.request from collections.abc import Callable @@ -20,12 +22,14 @@ from pam_deploy_graph.constants import ( REQUIRED_PARAMS, SENSITIVE_KEYS, ) +from pam_deploy_graph.logging_utils import json_for_log, redact_for_log from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult from pam_deploy_graph.models import ActionResult, LlmActionAnalysis from .prompts import ACTION_ANALYSIS_PROMPT, INTENT_PROMPT, PARAM_PROMPT, PLAN_PROMPT, SYSTEM_PROMPT JsonTransport = Callable[[str, dict[str, str], dict[str, Any], float], dict[str, Any]] +logger = logging.getLogger(__name__) class OpenAICompatibleLlmClient: @@ -54,10 +58,20 @@ class OpenAICompatibleLlmClient: self.timeout_sec = timeout_sec self.temperature = temperature self.transport = transport or _default_transport + logger.info( + "OpenAI-compatible LLM client 初始化 base_url=%s endpoint=%s model=%s has_api_key=%s timeout=%s temperature=%s custom_transport=%s", + self.base_url, + _chat_completions_url(self.base_url), + self.model, + bool(self.api_key), + self.timeout_sec, + self.temperature, + transport is not None, + ) def understand_request(self, text: str) -> LlmIntentResult: """调用 LLM 识别用户意图。""" - payload = self._complete_json(INTENT_PROMPT, {"user_text": text}) + payload = self._complete_json("understand_request", INTENT_PROMPT, {"user_text": text}) return LlmIntentResult( intent=_string(payload, "intent", "deploy"), # type: ignore[arg-type] mode_preference=_string(payload, "mode_preference", "未指定"), # type: ignore[arg-type] @@ -73,6 +87,7 @@ class OpenAICompatibleLlmClient: original_base = dict(base_params or {}) safe_base = _redact_sensitive(original_base) payload = self._complete_json( + "extract_params", PARAM_PROMPT, { "user_text": text, @@ -110,6 +125,7 @@ class OpenAICompatibleLlmClient: ) -> LlmDeployPlan: """调用 LLM 生成部署计划。""" payload = self._complete_json( + "generate_plan", PLAN_PROMPT, { "params": _redact_sensitive(params), @@ -138,6 +154,7 @@ class OpenAICompatibleLlmClient: ) -> LlmActionAnalysis: """调用 LLM 分析 action 结果,返回结构化诊断建议。""" payload = self._complete_json( + "analyze_action_result", self.action_analysis_prompt, { "action": action, @@ -164,8 +181,10 @@ class OpenAICompatibleLlmClient: notes=_string_list(payload.get("notes")), ) - def _complete_json(self, instruction: str, input_payload: dict[str, Any]) -> dict[str, Any]: + def _complete_json(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> dict[str, Any]: """发送 chat/completions 请求,并解析 JSON 对象响应。""" + started_at = time.perf_counter() + endpoint = _chat_completions_url(self.base_url) request_payload = { "model": self.model, "temperature": self.temperature, @@ -183,16 +202,48 @@ class OpenAICompatibleLlmClient: headers = {"Content-Type": "application/json"} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" - response = self.transport( - _chat_completions_url(self.base_url), - headers, - request_payload, + logger.info( + "LLM 请求开始 operation=%s endpoint=%s model=%s timeout=%s has_api_key=%s input=%s", + operation, + endpoint, + self.model, self.timeout_sec, + bool(self.api_key), + json_for_log(input_payload, max_text_len=1600), + ) + try: + response = self.transport( + endpoint, + headers, + request_payload, + self.timeout_sec, + ) + content = _message_content(response) + logger.info( + "LLM 原始响应 operation=%s duration_ms=%s content=%s", + operation, + int((time.perf_counter() - started_at) * 1000), + redact_for_log(content, max_text_len=1600), + ) + parsed = _loads_json_object(content) + if not isinstance(parsed, dict): + raise ValueError("LLM 响应必须是 JSON object") + except Exception: + logger.exception( + "LLM 请求失败 operation=%s endpoint=%s duration_ms=%s input=%s", + operation, + endpoint, + int((time.perf_counter() - started_at) * 1000), + json_for_log(input_payload, max_text_len=1600), + ) + raise + logger.info( + "LLM 请求完成 operation=%s duration_ms=%s response_keys=%s response=%s", + operation, + int((time.perf_counter() - started_at) * 1000), + sorted(parsed.keys()), + json_for_log(parsed, max_text_len=1600), ) - content = _message_content(response) - parsed = _loads_json_object(content) - if not isinstance(parsed, dict): - raise ValueError("LLM 响应必须是 JSON object") return parsed diff --git a/pam_deploy_graph/llm/rule_based.py b/pam_deploy_graph/llm/rule_based.py index 0e55dc3..59a7e5b 100644 --- a/pam_deploy_graph/llm/rule_based.py +++ b/pam_deploy_graph/llm/rule_based.py @@ -6,10 +6,13 @@ from __future__ import annotations +import logging import re +from dataclasses import asdict from typing import Any from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS +from pam_deploy_graph.logging_utils import json_for_log, redact_for_log from pam_deploy_graph.models import ( ActionResult, ExecutionStrategy, @@ -19,6 +22,8 @@ from pam_deploy_graph.models import ( LlmParamResult, ) +logger = logging.getLogger(__name__) + KEY_ALIASES = { "home_base_url": "HOME_BASE_URL", "HOME_BASE_URL": "HOME_BASE_URL", @@ -50,6 +55,7 @@ class RuleBasedLlmClient: def understand_request(self, text: str) -> LlmIntentResult: """用关键词规则识别用户意图和执行策略偏好。""" + logger.info("规则 LLM 意图识别开始 text=%s", redact_for_log(text, max_text_len=800)) lowered = text.lower() reasons: list[str] = [] intent = "deploy" @@ -82,16 +88,19 @@ class RuleBasedLlmClient: if intent == "preview": strategy_preference = strategy_preference if strategy_preference != "未指定" else "hybrid_node_mcp" - return LlmIntentResult( + result = LlmIntentResult( intent=intent, # type: ignore[arg-type] mode_preference=mode_preference, # type: ignore[arg-type] strategy_preference=strategy_preference, # type: ignore[arg-type] confidence=0.72 if intent != "deploy" else 0.6, reasons=reasons, ) + logger.info("规则 LLM 意图识别完成 result=%s", json_for_log(asdict(result))) + return result def extract_params(self, text: str, base_params: dict[str, Any] | None = None) -> LlmParamResult: """从 key=value、中文短语和 IP 地址中抽取参数。""" + logger.info("规则 LLM 参数抽取开始 text=%s base_params=%s", redact_for_log(text, max_text_len=800), json_for_log(base_params or {})) params = dict(base_params or {}) params.update(self._extract_key_values(text)) params.update(self._extract_chinese_patterns(text)) @@ -103,12 +112,14 @@ class RuleBasedLlmClient: missing = [key for key in REQUIRED_PARAMS if not params.get(key)] sensitive = [key for key in ("CLIENT_SECRET", "CLIENT_ID") if params.get(key)] - return LlmParamResult( + result = LlmParamResult( extracted_params=params, extracted_control=control, missing_required_params=missing, sensitive_fields_present=sensitive, ) + logger.info("规则 LLM 参数抽取完成 result=%s", json_for_log(asdict(result))) + return result def generate_plan( self, @@ -118,6 +129,7 @@ class RuleBasedLlmClient: strategy: ExecutionStrategy, ) -> LlmDeployPlan: """生成确定性的部署计划和风险提示。""" + logger.info("规则 LLM 计划生成开始 intent=%s strategy=%s params=%s", intent, strategy, json_for_log(params)) if strategy == "hybrid_node_mcp": strategy_text = "PAM_HOME 使用脚本 action,PAM_NODE 使用 MCP" elif strategy == "script_only": @@ -139,13 +151,15 @@ class RuleBasedLlmClient: if strategy == "hybrid_node_mcp": risk_notes.append("PAM_HOME 当前没有 MCP 能力,HOME 阶段仍会调用脚本 action。") - return LlmDeployPlan( + result = LlmDeployPlan( summary=summary, risk_notes=risk_notes, planned_actions=list(GLOBAL_ACTION_SEQUENCE), requires_confirmation=intent in ("deploy", "query_node_ips", "rollback"), execution_strategy=strategy, ) + logger.info("规则 LLM 计划生成完成 result=%s", json_for_log(asdict(result))) + return result def analyze_action_result( self, @@ -155,6 +169,23 @@ class RuleBasedLlmClient: state_summary: dict[str, Any], ) -> LlmActionAnalysis: """用本地规则分析 action 结果,作为真实 LLM 不可用时的兜底。""" + logger.info( + "规则 LLM action 审核开始 action=%s result=%s state_summary=%s", + action, + json_for_log( + { + "backend": result.backend, + "ok": result.ok, + "exit_code": result.exit_code, + "tool_name": result.tool_name, + "values": result.values, + "stderr": result.stderr, + "error_summary": result.error_summary, + }, + max_text_len=1000, + ), + json_for_log(state_summary), + ) notes: list[str] = [] has_anomaly = not result.ok severity = "info" @@ -197,7 +228,7 @@ class RuleBasedLlmClient: notes.append("action 返回待人工确认标记。") should_continue = False - return LlmActionAnalysis( + analysis = LlmActionAnalysis( action=action, has_anomaly=has_anomaly, severity=severity, # type: ignore[arg-type] @@ -207,6 +238,8 @@ class RuleBasedLlmClient: should_continue=should_continue, notes=notes, ) + logger.info("规则 LLM action 审核完成 analysis=%s", json_for_log(asdict(analysis))) + return analysis def _extract_key_values(self, text: str) -> dict[str, str]: """抽取 KEY=VALUE 形式的参数。""" diff --git a/pam_deploy_graph/logging_utils.py b/pam_deploy_graph/logging_utils.py new file mode 100644 index 0000000..73f1eaa --- /dev/null +++ b/pam_deploy_graph/logging_utils.py @@ -0,0 +1,116 @@ +"""Agent 运行日志配置和脱敏工具。""" + +from __future__ import annotations + +import json +import logging +import os +import re +from dataclasses import asdict, is_dataclass +from pathlib import Path +from typing import Any + +from .constants import SENSITIVE_KEYS + +DEFAULT_LOG_FILE = Path("logs") / "pam_deploy_agent.log" +LOG_FILE_ENV = "PAM_AGENT_LOG_FILE" +LOG_LEVEL_ENV = "PAM_AGENT_LOG_LEVEL" +_HANDLER_MARKER = "_pam_deploy_agent_handler" +_SENSITIVE_NAME_PARTS = ("secret", "token", "authorization", "api_key", "apikey", "password") +_ASSIGNMENT_PATTERN = re.compile( + r"(?i)\b(client_secret|mcp_client_secret|api_key|pam_llm_api_key|token|access_token|authorization|password)\b" + r"\s*([:=])\s*([^\s,;]+)" +) +_AUTH_BEARER_ASSIGNMENT_PATTERN = re.compile(r"(?i)\b(authorization)\b\s*([:=])\s*bearer\s+[^\s,;]+") +_BEARER_PATTERN = re.compile(r"(?i)(bearer\s+)[A-Za-z0-9._~+\-/=]+") + + +def configure_logging( + log_file: str | Path | None = None, + level: str | int | None = None, +) -> Path: + """配置 Agent 文件日志;重复调用不会重复添加 handler。""" + actual_path = Path(log_file or os.getenv(LOG_FILE_ENV) or DEFAULT_LOG_FILE) + actual_path.parent.mkdir(parents=True, exist_ok=True) + actual_level = _resolve_level(level or os.getenv(LOG_LEVEL_ENV) or "INFO") + + package_logger = logging.getLogger("pam_deploy_graph") + package_logger.setLevel(actual_level) + package_logger.propagate = False + + marker = str(actual_path.resolve()) + for handler in package_logger.handlers: + if getattr(handler, _HANDLER_MARKER, "") == marker: + handler.setLevel(actual_level) + return actual_path + + handler = logging.FileHandler(actual_path, encoding="utf-8") + setattr(handler, _HANDLER_MARKER, marker) + handler.setLevel(actual_level) + handler.setFormatter( + logging.Formatter( + fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + ) + package_logger.addHandler(handler) + package_logger.info("日志已初始化 path=%s level=%s", actual_path, logging.getLevelName(actual_level)) + return actual_path + + +def redact_for_log(value: Any, *, max_text_len: int = 1200) -> Any: + """递归脱敏并截断日志对象,避免把密钥、token 或完整长文本写入日志。""" + if is_dataclass(value) and not isinstance(value, type): + return redact_for_log(asdict(value), max_text_len=max_text_len) + if isinstance(value, dict): + redacted: dict[str, Any] = {} + for key, item in value.items(): + text_key = str(key) + if _is_sensitive_key(text_key): + redacted[text_key] = "***" + else: + redacted[text_key] = redact_for_log(item, max_text_len=max_text_len) + return redacted + if isinstance(value, (list, tuple, set)): + return [redact_for_log(item, max_text_len=max_text_len) for item in value] + if isinstance(value, str): + return _truncate(_redact_string(value), max_text_len) + if value is None or isinstance(value, (bool, int, float)): + return value + return _truncate(_redact_string(str(value)), max_text_len) + + +def json_for_log(value: Any, *, max_text_len: int = 1200) -> str: + """把对象脱敏后序列化成适合单行日志的 JSON 文本。""" + redacted = redact_for_log(value, max_text_len=max_text_len) + return json.dumps(redacted, ensure_ascii=False, default=str, sort_keys=True) + + +def _resolve_level(value: str | int) -> int: + """解析日志级别字符串,非法值降级为 INFO。""" + if isinstance(value, int): + return value + resolved = getattr(logging, str(value).upper(), logging.INFO) + return resolved if isinstance(resolved, int) else logging.INFO + + +def _is_sensitive_key(key: str) -> bool: + """判断字段名是否应脱敏。""" + if key in SENSITIVE_KEYS: + return True + normalized = key.lower().replace("-", "_") + return any(part in normalized for part in _SENSITIVE_NAME_PARTS) + + +def _truncate(value: str, limit: int) -> str: + """截断过长字符串。""" + if len(value) <= limit: + return value + return value[:limit] + "...[已截断]" + + +def _redact_string(value: str) -> str: + """脱敏字符串中的常见 KEY=VALUE 和 Bearer token 片段。""" + value = _AUTH_BEARER_ASSIGNMENT_PATTERN.sub(lambda match: f"{match.group(1)}{match.group(2)}***", value) + value = _ASSIGNMENT_PATTERN.sub(lambda match: f"{match.group(1)}{match.group(2)}***", value) + return _BEARER_PATTERN.sub(lambda match: f"{match.group(1)}***", value) diff --git a/pam_deploy_graph/mcp_client.py b/pam_deploy_graph/mcp_client.py index 04710e1..55d6a87 100644 --- a/pam_deploy_graph/mcp_client.py +++ b/pam_deploy_graph/mcp_client.py @@ -7,6 +7,7 @@ callable 或 SDK session 适配成这个接口,避免业务代码绑定具体 from __future__ import annotations import json +import logging import time import urllib.parse import urllib.request @@ -16,6 +17,10 @@ from dataclasses import dataclass, field from pathlib import Path from typing import Any +from .logging_utils import json_for_log + +logger = logging.getLogger(__name__) + @dataclass(frozen=True) class McpAuthConfig: @@ -111,10 +116,21 @@ class McpClientConfig: def load_mcp_client_config(path: str | Path) -> McpClientConfig: """读取 MCP client JSON 配置文件。""" + logger.info("读取 MCP client 配置 path=%s", path) payload = json.loads(Path(path).read_text(encoding="utf-8")) if not isinstance(payload, dict): raise ValueError("MCP client 配置必须是 JSON object") - return McpClientConfig.from_mapping(payload) + config = McpClientConfig.from_mapping(payload) + logger.info( + "MCP client 配置读取完成 path=%s transport=%s server_url=%s command=%s has_auth=%s tool_names=%s", + path, + config.transport, + config.server_url, + config.command, + config.auth is not None, + json_for_log(config.tool_names), + ) + return config class FunctionMcpToolClient: @@ -126,6 +142,7 @@ class FunctionMcpToolClient: def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: """调用底层函数并返回原始结果。""" + logger.info("Function MCP tool 调用 tool=%s arguments=%s", tool_name, json_for_log(arguments)) return self.caller(tool_name, arguments) @@ -147,13 +164,19 @@ class SessionMcpToolClient: def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: """调用 SDK session,并把 SDK 返回值归一化。""" + logger.info("Session MCP tool 调用开始 tool=%s arguments=%s", tool_name, json_for_log(arguments)) result = self.session.call_tool(tool_name, arguments) - return normalize_mcp_sdk_result(result) + normalized = normalize_mcp_sdk_result(result) + logger.info("Session MCP tool 调用完成 tool=%s result=%s", tool_name, json_for_log(normalized, max_text_len=1600)) + return normalized def list_tools(self) -> list[str]: """从 SDK session 获取 tool 名称列表。""" + logger.info("Session MCP list_tools 开始") result = self.session.list_tools() - return normalize_mcp_tool_list(result) + tools = normalize_mcp_tool_list(result) + logger.info("Session MCP list_tools 完成 tools=%s", tools) + return tools class StdioMcpToolClient: @@ -176,9 +199,19 @@ class StdioMcpToolClient: self.env = env self.cwd = cwd or None self.timeout_seconds = timeout_seconds + logger.info( + "stdio MCP client 初始化 command=%s args=%s cwd=%s env_keys=%s timeout=%s", + self.command, + self.args, + self.cwd or "", + sorted((self.env or {}).keys()), + self.timeout_seconds, + ) def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: """创建一次 MCP stdio session,调用 tool 后关闭 session。""" + started_at = time.perf_counter() + logger.info("stdio MCP tool 调用开始 tool=%s arguments=%s", tool_name, json_for_log(arguments)) try: import anyio from mcp import ClientSession @@ -203,10 +236,23 @@ class StdioMcpToolClient: ) return normalize_mcp_sdk_result(result) - return anyio.run(call_once) + try: + result = anyio.run(call_once) + except Exception: + logger.exception("stdio MCP tool 调用失败 tool=%s duration_ms=%s", tool_name, int((time.perf_counter() - started_at) * 1000)) + raise + logger.info( + "stdio MCP tool 调用完成 tool=%s duration_ms=%s result=%s", + tool_name, + int((time.perf_counter() - started_at) * 1000), + json_for_log(result, max_text_len=1600), + ) + return result def list_tools(self) -> list[str]: """创建一次 MCP stdio session,读取 server 暴露的 tool 列表。""" + started_at = time.perf_counter() + logger.info("stdio MCP list_tools 开始") try: import anyio from mcp import ClientSession @@ -227,7 +273,13 @@ class StdioMcpToolClient: result = await session.list_tools() return normalize_mcp_tool_list(result) - return anyio.run(list_once) + try: + tools = anyio.run(list_once) + except Exception: + logger.exception("stdio MCP list_tools 失败 duration_ms=%s", int((time.perf_counter() - started_at) * 1000)) + raise + logger.info("stdio MCP list_tools 完成 duration_ms=%s tools=%s", int((time.perf_counter() - started_at) * 1000), tools) + return tools class OAuthTokenProvider: @@ -243,6 +295,12 @@ class OAuthTokenProvider: self.timeout_seconds = timeout_seconds self._token = "" self._expires_at = 0.0 + logger.info( + "MCP OAuth token provider 初始化 token_url=%s client_id=%s timeout=%s", + self.config.token_url, + self.config.client_id, + self.timeout_seconds, + ) def authorization_headers(self) -> dict[str, str]: """返回带 token 的请求头。""" @@ -255,7 +313,9 @@ class OAuthTokenProvider: """获取可用 token,未过期时复用缓存。""" now = time.time() if self._token and now < self._expires_at: + logger.info("MCP auth token 使用缓存 expires_in_sec=%s", int(self._expires_at - now)) return self._token + logger.info("MCP auth token 开始刷新 token_url=%s client_id=%s", self.config.token_url, self.config.client_id) payload = { "grant_type": self.config.grant_type, "client_id": self.config.client_id, @@ -278,6 +338,7 @@ class OAuthTokenProvider: expires_in = _safe_float(result.get(self.config.expires_in_field), 3600) self._token = token self._expires_at = now + max(expires_in - 60, 1) + logger.info("MCP auth token 刷新完成 expires_in=%s cached_until=%s", expires_in, int(self._expires_at)) return token @@ -305,14 +366,23 @@ class HttpMcpToolClient: self.auth_provider = auth_provider self.timeout_seconds = timeout_seconds self.sse_read_timeout_seconds = sse_read_timeout_seconds + logger.info( + "HTTP MCP client 初始化 url=%s transport=%s has_auth=%s headers=%s timeout=%s sse_read_timeout=%s", + self.url, + self.transport, + self.auth_provider is not None, + json_for_log(self.headers), + self.timeout_seconds, + self.sse_read_timeout_seconds, + ) def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: """连接 MCP server,调用 tool 后关闭 session。""" - return self._run_session(lambda session: session.call_tool(tool_name, arguments)) + return self._run_session(lambda session: session.call_tool(tool_name, arguments), operation_name=f"call_tool:{tool_name}", arguments=arguments) def list_tools(self) -> list[str]: """连接 MCP server,读取 server 暴露的 tool 名称。""" - return self._run_session(lambda session: session.list_tools(), normalize_tools=True) + return self._run_session(lambda session: session.list_tools(), normalize_tools=True, operation_name="list_tools") def _build_headers(self) -> dict[str, str]: """合并静态 headers 和动态鉴权 token。""" @@ -321,8 +391,23 @@ class HttpMcpToolClient: headers.update(self.auth_provider.authorization_headers()) return headers - def _run_session(self, operation: Callable[[Any], Any], *, normalize_tools: bool = False) -> Any: + def _run_session( + self, + operation: Callable[[Any], Any], + *, + normalize_tools: bool = False, + operation_name: str = "operation", + arguments: dict[str, Any] | None = None, + ) -> Any: """创建一次 HTTP/SSE MCP session 并执行指定操作。""" + started_at = time.perf_counter() + logger.info( + "HTTP MCP session 开始 operation=%s url=%s transport=%s arguments=%s", + operation_name, + self.url, + self.transport, + json_for_log(arguments or {}), + ) try: import anyio from mcp import ClientSession @@ -357,7 +442,24 @@ class HttpMcpToolClient: result = await operation(session) return normalize_mcp_tool_list(result) if normalize_tools else normalize_mcp_sdk_result(result) - return anyio.run(call_once) + try: + result = anyio.run(call_once) + except Exception: + logger.exception( + "HTTP MCP session 失败 operation=%s url=%s transport=%s duration_ms=%s", + operation_name, + self.url, + self.transport, + int((time.perf_counter() - started_at) * 1000), + ) + raise + logger.info( + "HTTP MCP session 完成 operation=%s duration_ms=%s result=%s", + operation_name, + int((time.perf_counter() - started_at) * 1000), + json_for_log(result, max_text_len=1600), + ) + return result def normalize_mcp_sdk_result(result: Any) -> Any: diff --git a/pam_deploy_graph/mcp_factory.py b/pam_deploy_graph/mcp_factory.py index dc29ba6..8d7de34 100644 --- a/pam_deploy_graph/mcp_factory.py +++ b/pam_deploy_graph/mcp_factory.py @@ -2,8 +2,10 @@ from __future__ import annotations +import logging from pathlib import Path +from .logging_utils import json_for_log from .mcp_client import ( HttpMcpToolClient, McpClientConfig, @@ -13,16 +15,36 @@ from .mcp_client import ( ) from .mcp_runner import McpActionRunner +logger = logging.getLogger(__name__) + def build_mcp_runner_from_config(path: str | Path) -> McpActionRunner: """读取 MCP 配置文件,并构造可直接给 Agent 使用的 runner。""" + logger.info("开始构建 MCP runner config_path=%s", path) config = load_mcp_client_config(path) client = build_mcp_client(config) - return McpActionRunner(client=client, tool_names=config.tool_names or None) + runner = McpActionRunner(client=client, tool_names=config.tool_names or None) + logger.info( + "MCP runner 构建完成 config_path=%s transport=%s server_url=%s client=%s tool_names=%s", + path, + config.transport, + config.server_url, + type(client).__name__, + json_for_log(config.tool_names), + ) + return runner def build_mcp_client(config: McpClientConfig): """根据 transport 类型创建 MCP client。""" + logger.info( + "开始构建 MCP client transport=%s server_url=%s command=%s has_auth=%s headers=%s", + config.transport, + config.server_url, + config.command, + config.auth is not None, + json_for_log(config.headers), + ) if config.transport == "stdio": return StdioMcpToolClient( command=config.command, diff --git a/pam_deploy_graph/mcp_runner.py b/pam_deploy_graph/mcp_runner.py index 700ad59..6a00882 100644 --- a/pam_deploy_graph/mcp_runner.py +++ b/pam_deploy_graph/mcp_runner.py @@ -2,11 +2,15 @@ from __future__ import annotations +import logging from typing import Any, Protocol +from .logging_utils import json_for_log from .models import ActionResult from .output_parser import parse_mcp_result +logger = logging.getLogger(__name__) + class McpToolClient(Protocol): """MCP 工具客户端需要实现的最小同步接口。""" @@ -46,6 +50,11 @@ class McpActionRunner: self.client = client self.tool_names = tool_names or {} self._discovered_tools: list[str] | None = None + logger.info( + "MCP action runner 初始化 client=%s explicit_tool_names=%s", + type(client).__name__ if client else "", + json_for_log(self.tool_names), + ) def run( self, @@ -70,16 +79,34 @@ class McpActionRunner: node_url=node_url, stop_first=stop_first, ) + logger.info( + "MCP action 调用开始 action=%s tool=%s arguments=%s", + action, + tool_name, + json_for_log(arguments), + ) try: payload = self.client.call_tool(tool_name, arguments) except Exception as exc: # pragma: no cover - 防御性异常包装 + logger.exception("MCP action 调用异常 action=%s tool=%s", action, tool_name) return parse_mcp_result(action, {}, ok=False, tool_name=tool_name, error=str(exc)) - return parse_mcp_result(action, payload, ok=True, tool_name=tool_name) + logger.info("MCP action 原始返回 action=%s tool=%s payload=%s", action, tool_name, json_for_log(payload, max_text_len=1600)) + result = parse_mcp_result(action, payload, ok=True, tool_name=tool_name) + logger.info( + "MCP action 解析完成 action=%s tool=%s ok=%s values=%s error=%s", + action, + tool_name, + result.ok, + json_for_log(result.values), + result.error_summary, + ) + return result def _resolve_tool_name(self, action: str) -> str: """根据显式映射、server tools 自动发现和默认约定解析 tool name。""" explicit = self.tool_names.get(action) if explicit: + logger.info("MCP tool 使用显式映射 action=%s tool=%s", action, explicit) return explicit discovered = self._list_discovered_tools() @@ -89,12 +116,14 @@ class McpActionRunner: for candidate in candidates: matched = by_lower.get(candidate.lower()) if matched: + logger.info("MCP tool 自动匹配 action=%s tool=%s candidates=%s", action, matched, candidates) return matched available = ", ".join(discovered) raise ValueError(f"MCP server 未发现 action 对应 tool: {action}; 已发现: {available}") fallback = DEFAULT_NODE_MCP_TOOLS.get(action) if fallback: + logger.info("MCP tool 使用默认约定 action=%s tool=%s", action, fallback) return fallback raise ValueError(f"action 未映射 MCP tool: {action}") @@ -108,7 +137,9 @@ class McpActionRunner: try: self._discovered_tools = list(self.client.list_tools()) except Exception: + logger.exception("MCP tool 自动发现失败,使用默认 tool name 约定") self._discovered_tools = [] + logger.info("MCP tool 自动发现完成 tools=%s", self._discovered_tools) return self._discovered_tools def _build_arguments( diff --git a/pam_deploy_graph/script_runner.py b/pam_deploy_graph/script_runner.py index 5f6d27c..0349059 100644 --- a/pam_deploy_graph/script_runner.py +++ b/pam_deploy_graph/script_runner.py @@ -2,13 +2,18 @@ from __future__ import annotations +import logging import subprocess +import time from pathlib import Path from typing import Any +from .logging_utils import json_for_log from .models import ActionResult from .output_parser import parse_script_result +logger = logging.getLogger(__name__) + class ScriptActionRunner: """脚本 action runner,负责构造命令、执行脚本并解析结果。""" @@ -40,15 +45,39 @@ class ScriptActionRunner: stop_first=stop_first, trace_file_path=trace_file_path, ) - completed = subprocess.run( - command, - cwd=str(self.script_base_dir), - capture_output=True, - text=True, - timeout=timeout_sec, - check=False, + started_at = time.perf_counter() + logger.info( + "脚本 action 开始 action=%s command=%s cwd=%s config=%s ip=%s trace=%s timeout=%s", + action, + json_for_log(command), + self.script_base_dir, + config_path, + ip or "", + trace_file_path or "", + timeout_sec, ) - return parse_script_result( + try: + completed = subprocess.run( + command, + cwd=str(self.script_base_dir), + capture_output=True, + text=True, + timeout=timeout_sec, + check=False, + ) + except Exception: + logger.exception("脚本 action 执行异常 action=%s command=%s cwd=%s", action, json_for_log(command), self.script_base_dir) + raise + duration_ms = int((time.perf_counter() - started_at) * 1000) + logger.info( + "脚本 action 结束 action=%s exit_code=%s duration_ms=%s stdout=%s stderr=%s", + action, + completed.returncode, + duration_ms, + json_for_log(completed.stdout, max_text_len=1200), + json_for_log(completed.stderr, max_text_len=1200), + ) + result = parse_script_result( action=action, stdout=completed.stdout, stderr=completed.stderr, @@ -56,6 +85,14 @@ class ScriptActionRunner: backend="script", tool_name=script_entry, ) + logger.info( + "脚本 action 解析完成 action=%s ok=%s values=%s error=%s", + action, + result.ok, + json_for_log(result.values), + result.error_summary, + ) + return result def build_command( self, diff --git a/tests/test_interactive_cli.py b/tests/test_interactive_cli.py index 17b0d39..d243195 100644 --- a/tests/test_interactive_cli.py +++ b/tests/test_interactive_cli.py @@ -6,7 +6,7 @@ import pytest from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.fake_runner import FakeActionRunner from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input -from pam_deploy_graph.models import LlmActionAnalysis +from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult PARAMS = { @@ -35,6 +35,30 @@ class BlockingReviewLlmClient: ) +class FakeTestableLlmClient: + def __init__(self) -> None: + self.requests: list[str] = [] + + def understand_request(self, text: str) -> LlmIntentResult: + self.requests.append(text) + return LlmIntentResult( + intent="deploy", + mode_preference="MCP", + strategy_preference="hybrid_node_mcp", + confidence=0.91, + reasons=["test ok"], + ) + + def extract_params(self, text, base_params=None): + raise AssertionError("llm test should only call understand_request") + + def generate_plan(self, *, params, intent, strategy): + raise AssertionError("llm test should only call understand_request") + + def analyze_action_result(self, *, action, result, state_summary): + return LlmActionAnalysis(action=action) + + def run_session(session: InteractiveCliSession, inputs: list[str]) -> list[str]: output: list[str] = [] iterator = iter(inputs) @@ -260,6 +284,24 @@ def test_chat_llm_review_block_message_is_visible(tmp_path: Path): assert any("如需继续,输入 resume" in item for item in output) +def test_chat_llm_test_command_uses_current_client(tmp_path: Path): + llm = FakeTestableLlmClient() + session = InteractiveCliSession( + agent=PamDeployAgent(llm_client=llm), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["llm test 检查模型", "exit"]) + + assert llm.requests == ["检查模型"] + assert any("正在测试 LLM: FakeTestableLlmClient" in item for item in output) + assert any("LLM 测试通过" in item for item in output) + assert any("- intent: deploy" in item for item in output) + assert any("- strategy: hybrid_node_mcp" in item for item in output) + + def test_chat_can_hot_load_mcp_config(tmp_path: Path): mcp_config = tmp_path / "mcp.json" mcp_config.write_text('{"transport": "stdio", "command": "python"}', encoding="utf-8") diff --git a/tests/test_logging_utils.py b/tests/test_logging_utils.py new file mode 100644 index 0000000..3c9c400 --- /dev/null +++ b/tests/test_logging_utils.py @@ -0,0 +1,28 @@ +from pam_deploy_graph.logging_utils import json_for_log, redact_for_log + + +def test_redact_for_log_masks_sensitive_keys_and_inline_assignments(): + payload = { + "CLIENT_SECRET": "home-secret", + "api_key": "llm-key", + "nested": { + "Authorization": "Bearer token-value", + "message": "CLIENT_SECRET=abc api_key:xyz Authorization=Bearer raw-token header Bearer plain-token", + }, + } + + redacted = redact_for_log(payload) + serialized = json_for_log(payload) + + assert redacted["CLIENT_SECRET"] == "***" + assert redacted["api_key"] == "***" + assert redacted["nested"]["Authorization"] == "***" + assert "home-secret" not in serialized + assert "llm-key" not in serialized + assert "token-value" not in serialized + assert "CLIENT_SECRET=***" in serialized + assert "api_key:***" in serialized + assert "Authorization=***" in serialized + assert "Bearer ***" in serialized + assert "raw-token" not in serialized + assert "plain-token" not in serialized