dark d3f5c82d98 feat: 补充 Agent 运行日志并增加 LLM 测试命令
- 新增统一日志工具,支持日志文件路径和级别配置
- 记录 CLI/chat、Agent、LLM、action、MCP、LangGraph、checkpoint 等关键流程
- 对日志中的 token、secret、api_key、Authorization 等敏感信息做脱敏
- chat 新增 llm test 命令,用于验证当前 LLM client 是否正常加载
- 同步 README、打包文档和 run.sh 帮助说明
- 补充日志脱敏和 llm test 相关测试
2026-06-04 10:51:59 +08:00

1114 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PAM 部署 Agent 运行时。
本模块不强依赖 LangGraph可独立运行同一组节点也可在
`pam_deploy_graph.graph` 中接入 LangGraph。
"""
from __future__ import annotations
import re
import time
import logging
from dataclasses import asdict
from pathlib import Path
from typing import Any, Callable
from .action_router import ActionRouter, build_action_backends
from .checkpoint_store import save_checkpoint
from .config_writer import write_config
from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, REQUIRED_PARAMS
from .fake_runner import FakeActionRunner
from .llm import LlmClient, RuleBasedLlmClient, validate_deploy_plan, validate_intent_result
from .logging_utils import configure_logging, json_for_log
from .mcp_runner import McpActionRunner
from .models import ActionResult, AgentState, ExecutionStrategy, LlmActionAnalysis, LlmDeployPlan, LlmIntentResult, LlmParamResult
from .script_runner import ScriptActionRunner, select_script_entry
from .skill_policy import load_skill_policy
logger = logging.getLogger(__name__)
REQUIRED_ACTION_VALUES = {
"upload-package": ("HASH_CODE",),
"get-node-url": ("NODE_URL",),
}
class PamDeployAgent:
"""PAM 部署主 Agent串联 LLM、action 路由、确认和续跑状态。"""
def __init__(
self,
*,
skill_path: str | Path = "doc_scripts/PAM_AUTO_DEPLY_SKILL.md",
script_base_dir: str | Path = "doc_scripts",
mcp_runner: McpActionRunner | None = None,
fake_runner: FakeActionRunner | None = None,
llm_client: LlmClient | None = None,
action_analysis_enabled: bool = False,
progress_callback: Callable[[dict[str, Any]], None] | None = None,
) -> None:
"""初始化策略、脚本 runner、MCP runner、fake runner 和 LLM client。"""
self.log_path = configure_logging()
self.skill_policy = load_skill_policy(skill_path)
self.script_base_dir = Path(script_base_dir)
self.script_runner = ScriptActionRunner(self.script_base_dir)
self.fake_runner = fake_runner or FakeActionRunner()
self.mcp_runner = mcp_runner
self.llm_client = llm_client or RuleBasedLlmClient()
self.action_analysis_enabled = action_analysis_enabled
self.progress_callback = progress_callback
self.router = ActionRouter(
script_runner=self.script_runner,
mcp_runner=mcp_runner,
fake_runner=self.fake_runner,
)
logger.info(
"Agent 初始化完成 skill=%s script_base_dir=%s llm_client=%s mcp_runner=%s action_analysis_events=%s",
skill_path,
self.script_base_dir,
type(self.llm_client).__name__,
type(self.mcp_runner).__name__ if self.mcp_runner else "",
self.action_analysis_enabled,
)
def understand_request(self, text: str) -> LlmIntentResult:
"""调用 LLM 识别用户意图,并执行基础校验。"""
logger.info("LLM 意图识别开始 client=%s text_len=%s", type(self.llm_client).__name__, len(text))
try:
result = self.llm_client.understand_request(text)
except Exception:
logger.exception("LLM 意图识别失败 client=%s", type(self.llm_client).__name__)
raise
validate_intent_result(result)
logger.info("LLM 意图识别完成 result=%s", json_for_log(asdict(result)))
return result
def extract_params(self, text: str, base_params: dict[str, Any] | None = None) -> LlmParamResult:
"""从自然语言中抽取部署参数和控制参数。"""
logger.info(
"LLM 参数抽取开始 client=%s text_len=%s base_params=%s",
type(self.llm_client).__name__,
len(text),
json_for_log(base_params or {}),
)
try:
result = self.llm_client.extract_params(text, base_params)
except Exception:
logger.exception("LLM 参数抽取失败 client=%s", type(self.llm_client).__name__)
raise
logger.info("LLM 参数抽取完成 result=%s", json_for_log(asdict(result)))
return result
def generate_plan(
self,
*,
params: dict[str, Any],
intent: str,
strategy: ExecutionStrategy,
) -> LlmDeployPlan:
"""根据参数、意图和执行策略生成部署计划。"""
logger.info(
"LLM 计划生成开始 client=%s intent=%s strategy=%s params=%s",
type(self.llm_client).__name__,
intent,
strategy,
json_for_log(params),
)
try:
plan = self.llm_client.generate_plan(params=params, intent=intent, strategy=strategy)
except Exception:
logger.exception("LLM 计划生成失败 client=%s intent=%s strategy=%s", type(self.llm_client).__name__, intent, strategy)
raise
validate_deploy_plan(plan)
logger.info("LLM 计划生成完成 result=%s", json_for_log(asdict(plan)))
return plan
def analyze_request(self, text: str, base_params: dict[str, Any] | None = None) -> dict[str, Any]:
"""完成意图识别、参数抽取和计划生成,供 analyze/chat 使用。"""
logger.info("需求分析开始 text_len=%s base_params=%s", len(text), json_for_log(base_params or {}))
intent = self.understand_request(text)
params = self.extract_params(text, base_params)
strategy = self._choose_strategy(intent.strategy_preference)
plan = self.generate_plan(
params={**DEFAULT_PARAMS, **params.extracted_params},
intent=intent.intent,
strategy=strategy,
)
result = {
"intent": intent,
"params": params,
"plan": plan,
}
logger.info(
"需求分析完成 intent=%s strategy=%s missing=%s plan_actions=%s",
intent.intent,
strategy,
params.missing_required_params,
plan.planned_actions,
)
return result
def normalize_params(self, params: dict[str, Any]) -> dict[str, Any]:
"""合并默认参数并校验必填参数是否齐全。"""
normalized = {**DEFAULT_PARAMS, **params}
missing = [key for key in REQUIRED_PARAMS if not normalized.get(key)]
if missing:
raise ValueError(f"缺少必填参数: {', '.join(missing)}")
normalized["ZIP_FILE_PATH"] = _normalize_local_file_path(str(normalized["ZIP_FILE_PATH"]).strip())
return normalized
def _choose_strategy(self, preference: str) -> ExecutionStrategy:
"""把 LLM 给出的策略偏好转换为内部执行策略。"""
if preference in ("hybrid_node_mcp", "script_only", "fake"):
return preference # type: ignore[return-value]
return "hybrid_node_mcp"
def create_state(
self,
*,
params: dict[str, Any],
execution_strategy: ExecutionStrategy = "hybrid_node_mcp",
run_id: str | None = None,
script_entry: str | None = None,
config_path: str | None = None,
trace_file_path: str | None = None,
checkpoint_path: str | None = None,
target_ips: list[str] | None = None,
) -> AgentState:
"""创建一次运行所需的 AgentState并写入脚本配置文件。"""
logger.info(
"创建 AgentState 开始 strategy=%s checkpoint=%s target_ips=%s params=%s",
execution_strategy,
checkpoint_path or "",
target_ips or [],
json_for_log(params),
)
normalized = self.normalize_params(params)
actual_run_id = run_id or time.strftime("%Y%m%d_%H%M%S")
actual_script_entry = script_entry or select_script_entry()
runtime_dir = Path("runtime")
actual_config_path = _absolute_path(config_path or runtime_dir / f"config_{actual_run_id}.txt")
actual_trace_path = _absolute_path(trace_file_path or Path("logs") / f"api_trace_{actual_run_id}.log")
write_config(normalized, actual_config_path)
state = AgentState(
run_id=actual_run_id,
params=normalized,
execution_strategy=execution_strategy,
action_backends=build_action_backends(execution_strategy),
script_entry=actual_script_entry,
script_base_dir=str(self.script_base_dir),
config_path=str(actual_config_path),
trace_file_path=str(actual_trace_path),
checkpoint_path=checkpoint_path or "",
target_ips=target_ips or [],
)
logger.info(
"创建 AgentState 完成 run_id=%s config=%s trace=%s script_entry=%s backends=%s",
state.run_id,
state.config_path,
state.trace_file_path,
state.script_entry,
json_for_log(state.action_backends),
)
return state
def pause_state(
self,
state: AgentState,
*,
reason: str,
review_context: dict[str, Any] | None = None,
) -> AgentState:
"""将当前 state 标记为暂停,并持久化 checkpoint。"""
logger.info(
"暂停 state run_id=%s reason=%s checkpoint=%s context=%s",
state.run_id,
reason,
state.checkpoint_path,
json_for_log(review_context or {}),
)
state.paused = True
state.pause_reason = reason
state.review_context = dict(review_context or {})
self._save_checkpoint(state)
return state
def resume_state(self, state: AgentState) -> AgentState:
"""清理暂停标记,允许后续继续执行。"""
logger.info("恢复 state run_id=%s previous_reason=%s checkpoint=%s", state.run_id, state.pause_reason, state.checkpoint_path)
state.paused = False
state.pause_reason = ""
state.review_context = {}
self._save_checkpoint(state)
return state
def update_state_params(self, state: AgentState, updates: dict[str, Any]) -> AgentState:
"""热更新 state 中的参数,并回写 config 文件。"""
logger.info("热更新 state 参数开始 run_id=%s updates=%s", state.run_id, json_for_log(updates))
merged = {**state.params, **updates}
normalized = self.normalize_params(merged)
state.params = normalized
if state.config_path:
write_config(normalized, state.config_path)
self._save_checkpoint(state)
logger.info("热更新 state 参数完成 run_id=%s config=%s params=%s", state.run_id, state.config_path, json_for_log(state.params))
return state
def preview(self, params: dict[str, Any], strategy: ExecutionStrategy = "hybrid_node_mcp") -> str:
"""渲染部署预览,展示参数和 action 路由。"""
normalized = self.normalize_params(params)
routes = build_action_backends(strategy)
if strategy == "hybrid_node_mcp":
home_backend = "脚本 action"
node_backend = "MCP"
elif strategy == "script_only":
home_backend = "脚本 action"
node_backend = "脚本 action"
else:
home_backend = "fake"
node_backend = "fake"
lines = [
"## PAM 部署预览",
"",
f"- 执行策略: {strategy}",
f"- PAM_HOME: {home_backend}",
f"- PAM_NODE: {node_backend}",
f"- 机场: {normalized['AIRPORT_CODE']}",
f"- 应用: {normalized['APP_NAME']}",
f"- 模块: {normalized['MODULE_NAME']}",
f"- 版本: {normalized['VERSION_NUMBER']}",
"",
"| action | backend |",
"| --- | --- |",
]
for action in GLOBAL_ACTION_SEQUENCE:
lines.append(f"| `{action}` | `{routes[action]}` |")
return "\n".join(lines)
def run_global_flow(self, state: AgentState) -> AgentState:
"""执行全局部署阶段,并跳过 checkpoint 中已完成的步骤。"""
logger.info(
"全局流程开始 run_id=%s paused=%s completed=%s",
state.run_id,
state.paused,
state.completed_global_steps,
)
if state.paused:
self._save_checkpoint(state)
return state
while True:
action = self.next_global_action(state)
if action is None:
logger.info("全局流程完成 run_id=%s completed=%s", state.run_id, state.completed_global_steps)
return state
self.run_global_action(state, action)
def next_global_action(self, state: AgentState) -> str | None:
"""返回下一个未完成的全局 action。"""
if state.paused:
return None
for action in GLOBAL_ACTION_SEQUENCE:
if action in state.completed_global_steps:
continue
return action
return None
def run_global_action(self, state: AgentState, action: str) -> AgentState:
"""执行一个全局 action并把结果写回 AgentState。"""
if action in state.completed_global_steps:
logger.info("跳过已完成全局 action run_id=%s action=%s", state.run_id, action)
return state
kwargs: dict[str, Any] = {}
if action == "publish-version":
if not state.hash_code:
state.last_failed_step = action
self._save_checkpoint(state)
raise RuntimeError("publish-version 缺少 HASH_CODE请确认 upload-package 是否成功返回 HASH_CODE")
kwargs["hash_code"] = state.hash_code
backend = state.action_backends.get(action, "script")
logger.info(
"全局 action 开始 run_id=%s action=%s backend=%s kwargs=%s",
state.run_id,
action,
backend,
json_for_log(kwargs),
)
self._emit_progress({"type": "ACTION_START", "stage": action, "backend": backend})
try:
result = self.router.run_action(state, action, **kwargs)
except Exception as exc:
logger.exception("全局 action 调用异常 run_id=%s action=%s backend=%s", state.run_id, action, backend)
result = ActionResult(
action=action,
backend=backend,
ok=False,
error_summary=str(exc),
)
logger.info("全局 action 返回 run_id=%s result=%s", state.run_id, _action_result_for_log(result))
state.events.append(
{
"type": "ACTION_DONE" if result.ok else "ACTION_FAIL",
"stage": action,
"backend": result.backend,
"message": result.error_summary or "ok",
}
)
analysis = self._append_action_analysis(state, action, result)
if not result.ok:
self._emit_progress(
{
"type": "ACTION_FAIL",
"stage": action,
"backend": result.backend,
"message": result.error_summary or "action 执行失败",
}
)
state.last_failed_step = action
self.pause_state(
state,
reason="action_failed",
review_context=self._review_context(action=action, analysis=analysis, result=result),
)
self._save_checkpoint(state)
raise RuntimeError(f"{action} 执行失败: {result.error_summary}")
missing_values = self._missing_required_values(action, result.values)
if missing_values:
message = f"{action} 返回缺少必要字段: {', '.join(missing_values)}"
self._emit_progress(
{
"type": "ACTION_FAIL",
"stage": action,
"backend": result.backend,
"message": message,
}
)
state.last_failed_step = action
self.pause_state(
state,
reason="action_missing_required_values",
review_context={
"type": "action_review",
"stage": action,
"message": message,
"missing_values": missing_values,
},
)
self._save_checkpoint(state)
raise RuntimeError(message)
self._apply_result(state, action, result.values)
state.completed_global_steps.append(action)
state.last_success_step = action
self._emit_progress(
{
"type": "ACTION_DONE",
"stage": action,
"backend": result.backend,
"message": result.values.get("MESSAGE", "ok"),
}
)
if analysis is not None and not analysis.should_continue:
state.last_failed_step = action
self.pause_state(
state,
reason="llm_review_blocked",
review_context=self._review_context(action=action, analysis=analysis, result=result),
)
logger.info("全局 action 被 LLM 审核拦截 run_id=%s action=%s analysis=%s", state.run_id, action, json_for_log(asdict(analysis)))
return state
self._save_checkpoint(state)
logger.info("全局 action 完成 run_id=%s action=%s completed=%s", state.run_id, action, state.completed_global_steps)
return state
def _missing_required_values(self, action: str, values: dict[str, Any]) -> list[str]:
"""检查 action 成功返回后是否带回后续步骤必需字段。"""
required = REQUIRED_ACTION_VALUES.get(action, ())
return [key for key in required if not values.get(key)]
def run_deploy_flow(self, state: AgentState) -> AgentState:
"""执行完整部署流程:全局阶段后进入逐 IP 阶段。"""
logger.info(
"部署流程开始 run_id=%s paused=%s pending=%s strategy=%s",
state.run_id,
state.paused,
state.pending_confirmation,
state.execution_strategy,
)
if state.pending_confirmation or state.paused:
self._save_checkpoint(state)
return state
self.run_global_flow(state)
self.run_ip_flow(state)
logger.info("部署流程结束 run_id=%s paused=%s pending=%s", state.run_id, state.paused, state.pending_confirmation)
return state
def run_ip_flow(self, state: AgentState) -> AgentState:
"""执行逐 IP 部署流程,失败时停在人工确认点。"""
logger.info(
"逐 IP 流程开始 run_id=%s paused=%s target_ips=%s online_ips=%s",
state.run_id,
state.paused,
state.target_ips,
state.online_ips,
)
if state.paused:
self._save_checkpoint(state)
return state
while True:
work = self.next_ip_action(state)
if work is None:
logger.info("逐 IP 流程完成或等待确认 run_id=%s pending=%s ip_states=%s", state.run_id, state.pending_confirmation, json_for_log(state.ip_states))
return state
ip, action = work
self.run_ip_action(state, ip, action)
def next_ip_action(self, state: AgentState) -> tuple[str, str] | None:
"""返回下一个待执行的单 IP action并按需初始化 IP 状态。"""
if state.pending_confirmation or state.paused:
self._save_checkpoint(state)
return None
self._resolve_target_ips(state)
logger.info("计算下一个 IP action run_id=%s target_ips=%s", state.run_id, state.target_ips)
for ip in state.target_ips:
ip_state = state.ip_states.get(ip)
if ip_state and ip_state.get("status") == "SUCCESS":
continue
if ip_state and ip_state.get("status") == "FAILED":
if ip_state.get("rollback_status") == "PENDING_AGENT_CONFIRMATION":
state.pending_confirmation = f"rollback-ip:{ip}"
self._save_checkpoint(state)
return None
continue
if not ip_state:
logger.info("初始化 IP 状态 run_id=%s ip=%s", state.run_id, ip)
state.events.append({"type": "IP_START", "ip": ip, "message": "start"})
ip_state = {
"status": "RUNNING",
"completed_steps": [],
"failed_stage": "",
"failure_reason": "",
"rollback_status": "ROLLBACK_NOT_RUN",
"rollback_stop_first": False,
"log_file": "",
}
state.ip_states[ip] = ip_state
completed_steps = ip_state.setdefault("completed_steps", [])
for action in IP_ACTION_SEQUENCE:
if action not in completed_steps:
return ip, action
ip_state["status"] = "SUCCESS"
state.events.append({"type": "IP_DONE", "ip": ip, "message": "success"})
self._save_checkpoint(state)
logger.info("IP 部署完成 run_id=%s ip=%s", state.run_id, ip)
return None
def run_ip_action(self, state: AgentState, ip: str, action: str) -> AgentState:
"""执行一个单 IP action并在失败时设置人工确认点。"""
ip_state = state.ip_states[ip]
completed_steps = ip_state.setdefault("completed_steps", [])
if action in completed_steps:
logger.info("跳过已完成 IP action run_id=%s ip=%s action=%s", state.run_id, ip, action)
return state
logger.info(
"IP action 开始 run_id=%s ip=%s action=%s backend=%s ip_state=%s",
state.run_id,
ip,
action,
state.action_backends.get(action, ""),
json_for_log(ip_state),
)
self._emit_progress(
{
"type": "ACTION_START",
"stage": action,
"backend": state.action_backends.get(action, ""),
"ip": ip,
}
)
backend = state.action_backends.get(action, "script")
try:
result = self.router.run_action(state, action, ip=ip)
except Exception as exc:
logger.exception("IP action 调用异常 run_id=%s ip=%s action=%s backend=%s", state.run_id, ip, action, backend)
result = ActionResult(
action=action,
backend=backend,
ok=False,
error_summary=str(exc),
)
failed = (not result.ok) or self._business_failed(action, result.values)
logger.info(
"IP action 返回 run_id=%s ip=%s action=%s failed=%s result=%s",
state.run_id,
ip,
action,
failed,
_action_result_for_log(result),
)
state.events.append(
{
"type": "ACTION_FAIL" if failed else "ACTION_DONE",
"stage": action,
"backend": result.backend,
"ip": ip,
"message": result.error_summary or result.values.get("MESSAGE", "ok"),
}
)
analysis = self._append_action_analysis(state, action, result, ip=ip)
if failed:
self._emit_progress(
{
"type": "ACTION_FAIL",
"stage": action,
"backend": result.backend,
"ip": ip,
"message": result.error_summary or result.values.get("MESSAGE", "action 执行失败"),
}
)
self.pause_state(
state,
reason="action_failed",
review_context=self._review_context(action=action, analysis=analysis, result=result, ip=ip),
)
self._record_ip_failure(state, ip, action, result.error_summary or str(result.values))
if action != "download-log":
self._download_log_best_effort(state, ip)
state.pending_confirmation = f"rollback-ip:{ip}"
self._save_checkpoint(state)
logger.info("IP action 失败并进入确认 run_id=%s ip=%s action=%s pending=%s", state.run_id, ip, action, state.pending_confirmation)
return state
self._apply_ip_result(ip_state, action, result.values)
completed_steps.append(action)
self._emit_progress(
{
"type": "ACTION_DONE",
"stage": action,
"backend": result.backend,
"ip": ip,
"message": result.values.get("MESSAGE", "ok"),
}
)
if analysis is not None and not analysis.should_continue:
self.pause_state(
state,
reason="llm_review_blocked",
review_context=self._review_context(action=action, analysis=analysis, result=result, ip=ip),
)
logger.info("IP action 被 LLM 审核拦截 run_id=%s ip=%s action=%s analysis=%s", state.run_id, ip, action, json_for_log(asdict(analysis)))
return state
self._save_checkpoint(state)
logger.info("IP action 完成 run_id=%s ip=%s action=%s completed=%s", state.run_id, ip, action, completed_steps)
return state
def build_confirmation_request(self, state: AgentState) -> dict[str, Any]:
"""把 pending_confirmation 转换为面向用户的确认请求。"""
if not state.pending_confirmation:
return {}
kind, _, value = state.pending_confirmation.partition(":")
if kind == "rollback-ip":
ip_state = state.ip_states.get(value, {})
return {
"type": "rollback-ip",
"ip": value,
"failed_stage": ip_state.get("failed_stage", ""),
"failure_reason": ip_state.get("failure_reason", ""),
"rollback_stop_first": bool(ip_state.get("rollback_stop_first", False)),
"allowed_decisions": ["approve", "reject"],
}
return {
"type": kind,
"value": value,
"allowed_decisions": ["approve", "reject"],
}
def confirm_pending(self, state: AgentState, *, approved: bool, operator_note: str = "") -> AgentState:
"""处理人工确认结果;当前支持失败 IP 的回滚确认。"""
request = self.build_confirmation_request(state)
if not request:
raise ValueError("当前没有待确认事项")
if request["type"] != "rollback-ip":
raise ValueError(f"不支持的确认类型: {request['type']}")
ip = request["ip"]
ip_state = state.ip_states[ip]
logger.info(
"人工确认开始 run_id=%s approved=%s request=%s note_len=%s",
state.run_id,
approved,
json_for_log(request),
len(operator_note),
)
if not approved:
ip_state["rollback_status"] = "REJECTED_BY_OPERATOR"
state.events.append(
{
"type": "CONFIRMATION_REJECTED",
"stage": "rollback-ip",
"ip": ip,
"message": operator_note or "rollback rejected by operator",
}
)
state.pending_confirmation = ""
state.paused = False
state.pause_reason = ""
state.review_context = {}
self._save_checkpoint(state)
logger.info("人工确认拒绝回滚 run_id=%s ip=%s", state.run_id, ip)
return state
backend = state.action_backends.get("rollback-ip", "script")
self._emit_progress(
{
"type": "ACTION_START",
"stage": "rollback-ip",
"backend": backend,
"ip": ip,
}
)
try:
result = self.router.run_action(
state,
"rollback-ip",
ip=ip,
stop_first=bool(ip_state.get("rollback_stop_first", False)),
)
except Exception as exc:
logger.exception("rollback action 调用异常 run_id=%s ip=%s backend=%s", state.run_id, ip, backend)
result = ActionResult(
action="rollback-ip",
backend=backend,
ok=False,
error_summary=str(exc),
)
logger.info("rollback action 返回 run_id=%s ip=%s result=%s", state.run_id, ip, _action_result_for_log(result))
ip_state["rollback_status"] = "ROLLBACK_DONE" if result.ok else "ROLLBACK_FAILED"
state.events.append(
{
"type": "ACTION_DONE" if result.ok else "ACTION_FAIL",
"stage": "rollback-ip",
"backend": result.backend,
"ip": ip,
"message": result.error_summary or result.values.get("MESSAGE", "ok"),
}
)
self._append_action_analysis(state, "rollback-ip", result, ip=ip)
if result.ok:
state.pending_confirmation = ""
state.last_success_step = "rollback-ip"
state.last_failed_step = ""
state.paused = False
state.pause_reason = ""
state.review_context = {}
self._emit_progress(
{
"type": "ACTION_DONE",
"stage": "rollback-ip",
"backend": result.backend,
"ip": ip,
"message": result.values.get("MESSAGE", "ok"),
}
)
else:
state.pending_confirmation = f"rollback-ip:{ip}"
state.last_failed_step = "rollback-ip"
state.paused = True
state.pause_reason = "rollback_failed"
self._emit_progress(
{
"type": "ACTION_FAIL",
"stage": "rollback-ip",
"backend": result.backend,
"ip": ip,
"message": result.error_summary or result.values.get("MESSAGE", "rollback 执行失败"),
}
)
self._save_checkpoint(state)
logger.info(
"人工确认处理完成 run_id=%s ip=%s rollback_status=%s pending=%s paused=%s",
state.run_id,
ip,
ip_state.get("rollback_status"),
state.pending_confirmation,
state.paused,
)
return state
def _emit_progress(self, payload: dict[str, Any]) -> None:
"""向 CLI/chat 回调 action 执行进度,回调失败不影响主流程。"""
if self.progress_callback is None:
return
try:
self.progress_callback(payload)
except Exception:
return
def _apply_result(self, state: AgentState, action: str, values: dict[str, Any]) -> None:
"""把全局 action 返回值写回 AgentState。"""
if "HASH_CODE" in values:
state.hash_code = str(values["HASH_CODE"])
if "NODE_URL" in values:
state.node_url = str(values["NODE_URL"])
if action == "get-online-ips":
ips = values.get("IP", [])
if isinstance(ips, str):
ips = [ips]
state.online_ips = list(ips)
state.target_ips = state.target_ips or state.online_ips.copy()
def _resolve_target_ips(self, state: AgentState) -> None:
"""根据在线 IP 和用户指定 IP 计算最终目标 IP。"""
if not state.target_ips:
state.target_ips = state.online_ips.copy()
logger.info("目标 IP 未指定,使用全部在线 IP run_id=%s target_ips=%s", state.run_id, state.target_ips)
return
online = set(state.online_ips)
requested = state.target_ips
state.target_ips = [ip for ip in requested if ip in online]
missing = [ip for ip in requested if ip not in online]
if missing:
state.events.append(
{
"type": "TARGET_SCOPE_CHANGED",
"message": "some requested IPs are not online",
"missing_ips": missing,
"target_ips": state.target_ips,
}
)
logger.info(
"目标 IP 范围调整 run_id=%s requested=%s target_ips=%s missing=%s",
state.run_id,
requested,
state.target_ips,
missing,
)
def _business_failed(self, action: str, values: dict[str, Any]) -> bool:
"""识别 exit code 之外的业务失败条件。"""
if action == "verify-ip":
success = values.get("SUCCESS")
if success is None:
return False
return str(success).lower() not in ("true", "1", "yes")
return False
def _apply_ip_result(self, ip_state: dict[str, Any], action: str, values: dict[str, Any]) -> None:
"""把逐 IP action 返回值写回单 IP 状态。"""
if action == "download-log":
ip_state["log_file"] = str(values.get("LOG_FILE", ""))
def _record_ip_failure(self, state: AgentState, ip: str, action: str, reason: str) -> None:
"""记录单 IP 失败,并设置待回滚确认状态。"""
ip_state = state.ip_states[ip]
stop_first = action in ("start-ip", "verify-ip")
logger.info(
"记录 IP 失败 run_id=%s ip=%s action=%s reason=%s stop_first=%s",
state.run_id,
ip,
action,
reason,
stop_first,
)
ip_state.update(
{
"status": "FAILED",
"failed_stage": action,
"failure_reason": reason,
"rollback_status": "PENDING_AGENT_CONFIRMATION",
"rollback_stop_first": stop_first,
}
)
state.last_failed_step = action
state.events.append(
{
"type": "CONFIRMATION_REQUIRED",
"stage": "rollback-ip",
"ip": ip,
"stop_first": stop_first,
"message": f"{action} 执行失败,需要确认是否回滚",
}
)
def _download_log_best_effort(self, state: AgentState, ip: str) -> None:
"""失败后尽力下载日志,日志失败不覆盖原失败原因。"""
backend = state.action_backends.get("download-log", "script")
logger.info("失败后尝试下载日志 run_id=%s ip=%s backend=%s", state.run_id, ip, backend)
self._emit_progress(
{
"type": "ACTION_START",
"stage": "download-log",
"backend": backend,
"ip": ip,
"message": "失败后尝试下载日志",
}
)
try:
result = self.router.run_action(state, "download-log", ip=ip)
except Exception as exc:
logger.exception("失败后下载日志 action 调用异常 run_id=%s ip=%s backend=%s", state.run_id, ip, backend)
result = ActionResult(
action="download-log",
backend=backend,
ok=False,
error_summary=str(exc),
)
ip_state = state.ip_states[ip]
if result.ok:
ip_state["log_file"] = str(result.values.get("LOG_FILE", ""))
state.events.append(
{
"type": "ACTION_DONE",
"stage": "download-log",
"backend": result.backend,
"ip": ip,
"message": "已尽力下载日志",
}
)
self._emit_progress(
{
"type": "ACTION_DONE",
"stage": "download-log",
"backend": result.backend,
"ip": ip,
"message": result.values.get("MESSAGE", "已尽力下载日志"),
}
)
logger.info("失败后下载日志完成 run_id=%s ip=%s result=%s", state.run_id, ip, _action_result_for_log(result))
else:
state.events.append(
{
"type": "ACTION_FAIL",
"stage": "download-log",
"backend": result.backend,
"ip": ip,
"message": result.error_summary or "尽力下载日志失败",
}
)
self._emit_progress(
{
"type": "ACTION_FAIL",
"stage": "download-log",
"backend": result.backend,
"ip": ip,
"message": result.error_summary or "尽力下载日志失败",
}
)
logger.info("失败后下载日志失败 run_id=%s ip=%s result=%s", state.run_id, ip, _action_result_for_log(result))
self._append_action_analysis(state, "download-log", result, ip=ip)
def _save_checkpoint(self, state: AgentState) -> None:
"""如果配置了 checkpoint 路径,则保存完整运行状态。"""
if state.checkpoint_path:
logger.info(
"保存 checkpoint run_id=%s path=%s paused=%s pending=%s last_success=%s last_failed=%s",
state.run_id,
state.checkpoint_path,
state.paused,
state.pending_confirmation,
state.last_success_step,
state.last_failed_step,
)
save_checkpoint(state, state.checkpoint_path, redact=False)
def _append_action_analysis(
self,
state: AgentState,
action: str,
result,
*,
ip: str | None = None,
) -> Any:
"""启用 action 后分析时,把诊断结果追加到 events。"""
logger.info(
"LLM action 审核开始 run_id=%s action=%s ip=%s result=%s",
state.run_id,
action,
ip or "",
_action_result_for_log(result),
)
self._emit_progress(
{
"type": "ACTION_REVIEW_START",
"stage": action,
"ip": ip or "",
"message": "LLM 开始分析 action 结果",
}
)
try:
analysis = self.llm_client.analyze_action_result(
action=action,
result=result,
state_summary=self._state_summary_for_llm(state, ip=ip),
)
except Exception as exc: # pragma: no cover - 审核失败时也要显式暂停,避免黑盒继续执行
logger.exception("LLM action 审核失败 run_id=%s action=%s ip=%s", state.run_id, action, ip or "")
state.events.append(
{
"type": "ACTION_ANALYSIS_FAIL",
"stage": action,
"ip": ip or "",
"message": str(exc),
}
)
self._emit_progress(
{
"type": "ACTION_REVIEW_FAIL",
"stage": action,
"ip": ip or "",
"message": str(exc),
}
)
return LlmActionAnalysis(
action=action,
has_anomaly=True,
severity="high",
possible_reason=f"LLM 审核失败: {exc}",
suggested_action="请检查 LLM 配置、网络或 action 审核提示词文件后再继续。",
requires_confirmation=True,
should_continue=False,
notes=["action 结果未完成 LLM 审核,流程已自动暂停。"],
)
payload = asdict(analysis)
payload.update({"type": "ACTION_ANALYSIS", "stage": action})
if ip:
payload["ip"] = ip
if self.action_analysis_enabled:
state.events.append(payload)
self._emit_progress(
{
"type": "ACTION_REVIEW_DONE",
"stage": action,
"ip": ip or "",
"message": analysis.suggested_action or analysis.possible_reason or "LLM 审核完成",
"has_anomaly": analysis.has_anomaly,
"severity": analysis.severity,
"should_continue": analysis.should_continue,
}
)
logger.info(
"LLM action 审核完成 run_id=%s action=%s ip=%s analysis=%s",
state.run_id,
action,
ip or "",
json_for_log(payload),
)
return analysis
def _state_summary_for_llm(self, state: AgentState, *, ip: str | None = None) -> dict[str, Any]:
"""生成给 LLM action 分析使用的脱敏状态摘要。"""
return {
"run_id": state.run_id,
"execution_strategy": state.execution_strategy,
"completed_global_steps": state.completed_global_steps,
"online_ip_count": len(state.online_ips),
"target_ips": state.target_ips,
"current_ip": ip or "",
"current_ip_state": state.ip_states.get(ip, {}) if ip else {},
"pending_confirmation": state.pending_confirmation,
"paused": state.paused,
"pause_reason": state.pause_reason,
"last_success_step": state.last_success_step,
"last_failed_step": state.last_failed_step,
}
def _review_context(
self,
*,
action: str,
analysis,
result,
ip: str | None = None,
) -> dict[str, Any]:
"""构造面向用户展示的审核暂停上下文。"""
context = {
"type": "action_review",
"stage": action,
"ip": ip or "",
"backend": result.backend,
"ok": result.ok,
"error_summary": result.error_summary,
}
if analysis is not None:
context.update(
{
"severity": analysis.severity,
"has_anomaly": analysis.has_anomaly,
"possible_reason": analysis.possible_reason,
"suggested_action": analysis.suggested_action,
"should_continue": analysis.should_continue,
"notes": list(analysis.notes),
}
)
return context
def render_report(self, state: AgentState) -> str:
"""渲染当前部署状态报告。"""
success = sum(1 for item in state.ip_states.values() if item.get("status") == "SUCCESS")
failed = sum(1 for item in state.ip_states.values() if item.get("status") == "FAILED")
lines = [
"## PAM 智能部署报告",
"",
f"- 执行策略: {state.execution_strategy}",
f"- 机场: {state.params['AIRPORT_CODE']}",
f"- 应用: {state.params['APP_NAME']}",
f"- 模块: {state.params['MODULE_NAME']}",
f"- 版本: {state.params['VERSION_NUMBER']}",
f"- 在线工作站数: {len(state.online_ips)}",
f"- 目标工作站数: {len(state.target_ips)}",
f"- 成功: {success}",
f"- 失败: {failed}",
f"- 待确认: {state.pending_confirmation or '-'}",
f"- 暂停状态: {'' if state.paused else ''}",
f"- 暂停原因: {state.pause_reason or '-'}",
"",
"| IP | 状态 | 失败阶段 | 回滚状态 | 日志 |",
"| --- | --- | --- | --- | --- |",
]
for ip, ip_state in state.ip_states.items():
lines.append(
"| {ip} | {status} | {failed_stage} | {rollback_status} | {log_file} |".format(
ip=ip,
status=ip_state.get("status", ""),
failed_stage=ip_state.get("failed_stage") or "-",
rollback_status=ip_state.get("rollback_status") or "-",
log_file=ip_state.get("log_file") or "-",
)
)
return "\n".join(lines)
def _absolute_path(path: str | Path) -> Path:
"""把传给脚本的文件路径转换为绝对路径,避免 cwd 切换后读错文件。"""
return Path(path).expanduser().resolve()
def _normalize_local_file_path(path: str) -> str:
"""把本地相对文件路径转换为绝对路径Windows/Unix 绝对路径保持不变。"""
if re.match(r"^[A-Za-z]:[\\/]", path):
return path
if path.startswith("/"):
return path
value = Path(path).expanduser()
if value.is_absolute():
return str(value)
return str(value.resolve())
def _action_result_for_log(result: ActionResult) -> str:
"""生成 action 结果日志摘要,避免写入完整 stdout/stderr。"""
return json_for_log(
{
"action": result.action,
"backend": result.backend,
"ok": result.ok,
"exit_code": result.exit_code,
"tool_name": result.tool_name,
"values": result.values,
"stderr": result.stderr,
"error_summary": result.error_summary,
},
max_text_len=1000,
)