From 85afabcd9467680d31f5747c0f81af68cccc9ecf Mon Sep 17 00:00:00 2001 From: dark Date: Fri, 5 Jun 2026 11:49:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=BC=BA=20chat=20LLM=20=E4=BA=A4?= =?UTF-8?q?=E4=BA=92=E4=B8=8E=E5=8D=95=20action=20=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 扩展 LLM client 协议,支持普通对话、日志分析和单 action 解析 - chat 非内置输入默认进入 LLM 普通对话,不再本地拦截问候 - 新增 ask、log analyze、action propose、action run 等交互命令 - 单 action 执行前强制人工确认,并复用现有 ActionRouter、审核、事件和 checkpoint - 日志分析默认读取尾部内容并脱敏后再提交给 LLM - 更新 README、发布包 README 和 run.sh help - 补充 LLM 与 chat 交互相关测试 --- README.md | 10 +- packaging/README_linux_package.md | 2 + packaging/README_packaged_agent.md | 9 +- packaging/build_linux_self_contained.sh | 4 +- pam_deploy_graph/agent.py | 117 ++++- pam_deploy_graph/interactive.py | 500 +++++++++++++++++++--- pam_deploy_graph/llm/base.py | 19 + pam_deploy_graph/llm/openai_compatible.py | 121 +++++- pam_deploy_graph/llm/prompts.py | 40 ++ pam_deploy_graph/llm/rule_based.py | 81 +++- pam_deploy_graph/models.py | 13 + tests/test_interactive_cli.py | 152 ++++++- tests/test_llm_structured.py | 99 +++++ 13 files changed, 1088 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index d13e78c..a7ac9f6 100644 --- a/README.md +++ b/README.md @@ -87,11 +87,12 @@ packaging/ - `--analyze-actions` 和 `llm action-analysis on` 改为只控制是否把详细审核结果写入 `events`,不再控制审核是否执行。 - chat 会播报 action 审核开始、审核完成和审核失败,避免黑盒执行。 - chat 支持执行中按 `Ctrl+C` 中断,保存 checkpoint 后再 `resume`。 +- chat 支持普通 LLM 对话、日志尾部分析和单 action 执行:`ask <问题>`、`log analyze <路径>`、`action propose <需求>`、`action run ...`。 - chat 支持 `set KEY=VALUE` 和 `load params <路径>` 热更新当前运行参数,并同步回写运行中的 `config.txt` 与 checkpoint。 - 支持通过 `--llm-action-analysis-prompt-file`、`PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。 - 增加统一运行日志,默认写入 `logs/pam_deploy_agent.log`,覆盖 CLI/chat、LLM 调用、action 路由、脚本/MCP 调用、LangGraph、checkpoint 等关键流程,并按天切分、默认保留 14 个历史日切文件。 - chat 支持 `llm test [文本]`,可用当前 LLM client 做一次轻量调用,确认真实 LLM 或规则 fallback 是否正常加载。 -- 添加基础测试,当前本地结果为 `73 passed, 3 skipped`。 +- 添加基础测试,当前本地结果为 `83 passed, 3 skipped`。 未完成: @@ -289,6 +290,11 @@ PAM> run PAM> status PAM> params PAM> events 5 +PAM> ask 这个 agent 能做什么 +PAM> log analyze logs/pam_deploy_agent.log 请帮我看最近异常 --tail 400 +PAM> action propose 请单独执行 verify-ip 192.168.1.10 +PAM> action run verify-ip ip=192.168.1.10 +PAM> action run llm 请单独执行 get-online-ips PAM> llm test PAM> llm action-analysis on PAM> llm config action_analysis_prompt_file=prompts/action_review.txt @@ -300,7 +306,7 @@ PAM> resume PAM> exit ``` -`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。每个 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;审核输入只包含当前 action 的结构化结果和必要诊断日志,不会把完整运行态 `state_summary` 交给大模型,避免跨步骤状态干扰判断;只有审核通过才会把 action 记为 completed;如果审核建议停止或审核本身失败,流程会暂停并输出建议,等待用户决定是否 `resume` 重试当前 action。`create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会在云下载接口中传入 `parentVersionNumber`,用于指定继承哪个版本的规则;默认空值不传,沿用正在使用的版本规则。`poll-download-progress` 和 `poll-upgrade-progress` 每次只查询一次进度,workflow 会按 `POLL_INTERVAL_SEC`、`DOWNLOAD_POLL_MAX_ATTEMPTS`、`UPGRADE_POLL_MAX_ATTEMPTS` 重复调用,并在每次返回后让 LLM/规则判断是否完成、播报进度;未完成时不会跳到下一个 action。`verify-ip` 用于应用启动后的健康检查,失败时 workflow 会按 `VERIFY_INTERVAL_SEC` 重试,最多 `VERIFY_MAX_ATTEMPTS` 次;默认约每 10 秒一次、最多 12 次,仍未通过才暂停。逐 IP action 失败时也会暂停,修复外部环境后输入 `resume` 会从当前 action 重试;如果确实需要回滚,使用 `rollback [IP]` 显式执行。`llm test [文本]` 可测试当前 LLM client 是否可用。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断,chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted`。`set KEY=VALUE` 和 `load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file`、`--mcp-config` 和 `--analyze-actions`。 +`chat` 默认把非内置命令交给当前 LLM 做普通对话,不会自动触发部署 workflow;需要结构化分析部署需求时请显式使用 `analyze <需求>`,完整部署仍要求输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM;`action propose <需求>` 只让 LLM 解析单 action 计划,不执行;`action run [ip=...] [KEY=VALUE...]` 或 `action run llm <需求>` 会展示 action、backend、ip、风险和参数,用户输入 `yes` 后才会复用现有 ActionRouter 执行单 action。每个 workflow action 和单 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;审核输入只包含当前 action 的结构化结果和必要诊断日志,不会把完整运行态 `state_summary` 交给大模型,避免跨步骤状态干扰判断;只有 workflow 审核通过才会把 action 记为 completed;如果审核建议停止或审核本身失败,流程会暂停并输出建议,等待用户决定是否 `resume` 重试当前 action。`create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会在云下载接口中传入 `parentVersionNumber`,用于指定继承哪个版本的规则;默认空值不传,沿用正在使用的版本规则。`poll-download-progress` 和 `poll-upgrade-progress` 每次只查询一次进度,workflow 会按 `POLL_INTERVAL_SEC`、`DOWNLOAD_POLL_MAX_ATTEMPTS`、`UPGRADE_POLL_MAX_ATTEMPTS` 重复调用,并在每次返回后让 LLM/规则判断是否完成、播报进度;未完成时不会跳到下一个 action。`verify-ip` 用于应用启动后的健康检查,失败时 workflow 会按 `VERIFY_INTERVAL_SEC` 重试,最多 `VERIFY_MAX_ATTEMPTS` 次;默认约每 10 秒一次、最多 12 次,仍未通过才暂停。逐 IP action 失败时也会暂停,修复外部环境后输入 `resume` 会从当前 action 重试;如果确实需要回滚,使用 `rollback [IP]` 显式执行。`llm test [文本]` 可测试当前 LLM client 是否可用。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断,chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted`。`set KEY=VALUE` 和 `load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file`、`--mcp-config` 和 `--analyze-actions`。 云下载相关参数: diff --git a/packaging/README_linux_package.md b/packaging/README_linux_package.md index 26976f2..d860d84 100644 --- a/packaging/README_linux_package.md +++ b/packaging/README_linux_package.md @@ -78,6 +78,8 @@ cd pam-deploy-agent-linux-x86_64 - `--analyze-actions` 只控制是否把详细审核结果写入 `events`。 - action 失败或审核阻断后会保存 checkpoint 并暂停;修复外部环境后通过 `resume` 从当前 action 重试。 - 回滚不再属于主 workflow 自动分支;需要时使用 chat 内 `rollback [IP]` 或 CLI `rollback --checkpoint ...` 显式执行。 +- chat 中非内置命令默认交给当前 LLM 普通对话,不会自动触发部署 workflow;完整部署仍需 `analyze` / `run` 并人工确认。 +- chat 支持 `ask <问题>`、`log analyze <路径>`、`action propose <需求>`、`action run ...`,可用于普通问答、日志尾部分析和确认后执行单 action。 - chat 支持执行中 `Ctrl+C` 中断后保存 checkpoint,再通过 `resume` 重试当前 action。 - chat 支持 `set KEY=VALUE` 和 `load params <路径>` 热更新当前运行任务参数。 - 进度查询和健康检查重试参数可通过 `POLL_INTERVAL_SEC`、`DOWNLOAD_POLL_MAX_ATTEMPTS`、`UPGRADE_POLL_MAX_ATTEMPTS`、`VERIFY_INTERVAL_SEC`、`VERIFY_MAX_ATTEMPTS` 配置。 diff --git a/packaging/README_packaged_agent.md b/packaging/README_packaged_agent.md index a36273c..7df546b 100644 --- a/packaging/README_packaged_agent.md +++ b/packaging/README_packaged_agent.md @@ -72,6 +72,11 @@ PAM> run PAM> status PAM> params PAM> events 5 +PAM> ask 这个 agent 能做什么 +PAM> log analyze logs/pam_deploy_agent.log 请帮我看最近异常 --tail 400 +PAM> action propose 请单独执行 verify-ip 192.168.1.10 +PAM> action run verify-ip ip=192.168.1.10 +PAM> action run llm 请单独执行 get-online-ips PAM> llm test PAM> llm action-analysis on PAM> llm config action_analysis_prompt_file=prompts/action_review.txt @@ -244,7 +249,9 @@ MCP token 获取方式与 HOME 一致,默认按 `client_credentials` POST 到 - 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL`、`CLIENT_ID`、`CLIENT_SECRET`、`AIRPORT_CODE`、`APP_NAME`、`MODULE_NAME`、`VERSION_NUMBER`、`ZIP_FILE_PATH`。 - `PARENT_VERSION_NUMBER` 是云下载可选参数;非空时会传给 `download-cloud` 的 `parentVersionNumber`,空值不会发送。 -- `chat` 中输入 `你好`、`hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时请直接描述部署任务,或显式使用 `analyze <需求>`。 +- `chat` 中非内置命令默认交给当前 LLM 做普通对话,不会自动触发部署 workflow;需要分析部署需求时请显式使用 `analyze <需求>`,完整部署仍需 `run` 并逐步确认。 +- `ask <问题>` 可显式普通对话;`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM。 +- `action propose <需求>` 只展示 LLM 解析出的单 action 计划;`action run [ip=...] [KEY=VALUE...]` 和 `action run llm <需求>` 会在用户输入 `yes` 后才执行单 action。 - 每个 action 完成后都会自动执行一次 LLM/规则审核;`--analyze-actions` 和 `llm action-analysis on` 只控制是否把详细审核结果写入 `events`。 - action 审核输入不包含完整运行态 `state_summary`,只包含当前 action 的结构化结果和必要诊断日志。 - `poll-download-progress` 和 `poll-upgrade-progress` 是单次进度查询 action,未完成时不会进入下一个 action;最大查询次数和间隔可通过 `config.txt` 或 chat `set` 热更新。 diff --git a/packaging/build_linux_self_contained.sh b/packaging/build_linux_self_contained.sh index 9335236..0c44dab 100644 --- a/packaging/build_linux_self_contained.sh +++ b/packaging/build_linux_self_contained.sh @@ -198,8 +198,8 @@ LLM 环境变量: 5. action 失败或审核阻断后会暂停;修复后用 resume 从当前 action 重试,需要回滚时用 rollback 显式执行。 6. chat 会在执行前归一化并展示实际写入脚本配置的参数;script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。 7. PARENT_VERSION_NUMBER 是云下载可选参数;空值不发送,非空时传给 parentVersionNumber。 - 8. chat 执行过程中会播报每个 action 的开始、完成或失败;普通问候不会触发 LLM/结构化分析。 - 9. chat 内可使用 params、events、rollback、list checkpoints、load checkpoint、load params、llm config、llm test、mcp config 等命令。 + 8. chat 执行过程中会播报每个 action 的开始、完成或失败;非内置输入默认交给 LLM 普通对话,不会自动触发部署 workflow。 + 9. chat 内可使用 ask、log analyze、action propose、action run、params、events、rollback、list checkpoints、load checkpoint、load params、llm config、llm test、mcp config 等命令。 10. 日志默认写入 logs/pam_deploy_agent.log,按天切分并默认保留 14 个历史日切文件;日志会脱敏 token、secret、api_key、Authorization 等字段。 11. checkpoint 会保存完整运行参数,请放在受控目录。 HELP_TEXT diff --git a/pam_deploy_graph/agent.py b/pam_deploy_graph/agent.py index 3c6e48d..c0406e8 100644 --- a/pam_deploy_graph/agent.py +++ b/pam_deploy_graph/agent.py @@ -16,7 +16,7 @@ from typing import Any, Callable from .action_router import ActionRouter, build_action_backends from .checkpoint_store import save_checkpoint from .config_writer import write_config -from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, REQUIRED_PARAMS +from .constants import ALLOWED_ACTIONS, DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, NODE_ACTIONS, REQUIRED_PARAMS from .fake_runner import FakeActionRunner from .llm import LlmClient, RuleBasedLlmClient, validate_deploy_plan, validate_intent_result from .logging_utils import configure_logging, json_for_log @@ -34,6 +34,8 @@ REQUIRED_ACTION_VALUES = { PROGRESS_ACTIONS = {"poll-download-progress", "poll-upgrade-progress"} VERIFY_ACTION = "verify-ip" +IP_REQUIRED_ACTIONS = set(IP_ACTION_SEQUENCE) | {"stop-ip", "rollback-ip"} +SINGLE_ACTION_KWARGS = {"hash_code", "node_url", "stop_first", "timeout_sec"} class PamDeployAgent: @@ -513,6 +515,119 @@ class PamDeployAgent: logger.info("IP 部署完成 run_id=%s ip=%s", state.run_id, ip) return None + def run_single_action( + self, + state: AgentState, + action: str, + *, + ip: str = "", + kwargs: dict[str, Any] | None = None, + ) -> ActionResult: + """执行一次独立 action,并复用路由、审核、事件和 checkpoint。""" + kwargs = dict(kwargs or {}) + self._validate_single_action_context(state, action, ip=ip, kwargs=kwargs) + route_kwargs = {key: value for key, value in kwargs.items() if key in SINGLE_ACTION_KWARGS} + if action == "publish-version": + route_kwargs["hash_code"] = route_kwargs.get("hash_code") or state.hash_code + if route_kwargs.get("node_url"): + state.node_url = str(route_kwargs["node_url"]) + backend = state.action_backends.get(action, "script") + logger.info( + "单 action 开始 run_id=%s action=%s backend=%s ip=%s kwargs=%s", + state.run_id, + action, + backend, + ip, + json_for_log(route_kwargs), + ) + self._emit_progress({"type": "ACTION_START", "stage": action, "backend": backend, "ip": ip}) + try: + result = self.router.run_action(state, action, ip=ip or None, **route_kwargs) + except Exception as exc: + logger.exception("单 action 调用异常 run_id=%s action=%s backend=%s ip=%s", state.run_id, action, backend, ip) + result = ActionResult( + action=action, + backend=backend, + ok=False, + error_summary=str(exc), + ) + logger.info("单 action 返回 run_id=%s action=%s result=%s", state.run_id, action, _action_result_for_log(result)) + analysis = self._append_action_analysis(state, action, result, ip=ip or None) + failed = (not result.ok) or self._business_failed(action, result.values) + if failed: + message = result.error_summary or result.values.get("MESSAGE", "action 执行失败") + fail_event = { + "type": "SINGLE_ACTION_FAIL", + "stage": action, + "backend": result.backend, + "ip": ip, + "message": message, + } + state.events.append(fail_event) + self._emit_progress({"type": "ACTION_FAIL", "stage": action, "backend": result.backend, "ip": ip, "message": message}) + state.last_failed_step = action + state.paused = True + state.pause_reason = "single_action_failed" + state.review_context = self._review_context(action=action, analysis=analysis, result=result, ip=ip or None) + self._save_checkpoint(state) + return result + if analysis is not None and not analysis.should_continue: + message = analysis.suggested_action or analysis.possible_reason or "LLM 审核要求暂停" + state.events.append( + { + "type": "SINGLE_ACTION_BLOCKED", + "stage": action, + "backend": result.backend, + "ip": ip, + "message": message, + } + ) + state.last_failed_step = action + state.paused = True + state.pause_reason = "single_action_review_blocked" + state.review_context = self._review_context(action=action, analysis=analysis, result=result, ip=ip or None) + self._save_checkpoint(state) + return result + self._apply_result(state, action, result.values) + if ip and ip in state.ip_states: + self._apply_ip_result(state.ip_states[ip], action, result.values) + state.last_success_step = action + if state.last_failed_step == action: + state.last_failed_step = "" + done_message = self._progress_message(action, result, ip=ip or None) if action in PROGRESS_ACTIONS else result.values.get("MESSAGE", "ok") + done_event = { + "type": "SINGLE_ACTION_DONE", + "stage": action, + "backend": result.backend, + "ip": ip, + "message": done_message, + } + state.events.append(done_event) + self._emit_progress({"type": "ACTION_DONE", "stage": action, "backend": result.backend, "ip": ip, "message": done_message}) + self._save_checkpoint(state) + logger.info("单 action 完成 run_id=%s action=%s ip=%s", state.run_id, action, ip) + return result + + def _validate_single_action_context( + self, + state: AgentState, + action: str, + *, + ip: str = "", + kwargs: dict[str, Any] | None = None, + ) -> None: + """校验单 action 是否具备必要上下文。""" + kwargs = kwargs or {} + if action not in ALLOWED_ACTIONS: + raise ValueError(f"不支持的 action: {action}") + if action in IP_REQUIRED_ACTIONS and not ip: + raise ValueError(f"{action} 需要提供 ip") + if action == "publish-version" and not (kwargs.get("hash_code") or state.hash_code): + raise ValueError("publish-version 缺少 HASH_CODE,请先执行 upload-package 或显式提供 hash_code=...") + backend = state.action_backends.get(action, "script") + if backend == "mcp" and action != "get-online-ips" and not (kwargs.get("node_url") or state.node_url): + raise ValueError(f"{action} 使用 MCP 时需要 NODE_URL,请先执行 get-node-url 或显式提供 node_url=...") + def run_ip_action(self, state: AgentState, ip: str, action: str) -> AgentState: """执行一个单 IP action;失败时暂停并保留该 action 供 resume 重试。""" ip_state = state.ip_states[ip] diff --git a/pam_deploy_graph/interactive.py b/pam_deploy_graph/interactive.py index eecf66f..28cde0c 100644 --- a/pam_deploy_graph/interactive.py +++ b/pam_deploy_graph/interactive.py @@ -19,17 +19,28 @@ from .llm import build_llm_client from .llm.rule_based import RuleBasedLlmClient from .logging_utils import configure_logging, json_for_log, redact_for_log from .mcp_factory import build_mcp_runner_from_config -from .models import AgentState, ExecutionStrategy +from .models import ActionResult, AgentState, ExecutionStrategy, LlmSingleActionProposal from .params_loader import load_params_file +from .constants import ALLOWED_ACTIONS, IP_ACTION_SEQUENCE +from .action_router import build_action_backends InputFunc = Callable[[str], str] OutputFunc = Callable[[str], None] logger = logging.getLogger(__name__) +DEFAULT_LOG_ANALYSIS_TAIL_LINES = 400 +DEFAULT_LOG_ANALYSIS_MAX_BYTES = 64 * 1024 COMMAND_HELP = """可用命令: help 显示帮助 preview 查看当前参数和执行策略 + ask <问题> 和当前 LLM 普通对话,不触发部署执行 analyze <需求> 只做理解和计划,不执行 + log analyze <路径> [问题] [--tail N] [--max-bytes N] + 读取日志尾部并交给 LLM 分析 + action propose <需求> 让 LLM 解析单个 action 执行建议,只展示不执行 + action run [ip=...] [KEY=VALUE...] + 手工指定并确认后执行单个 action + action run llm <需求> 让 LLM 解析并确认后执行单个 action params 脱敏展示当前会话参数 events [数量] 查看最近 action 事件,默认 10 条 set KEY=VALUE 修改当前会话参数 @@ -48,7 +59,7 @@ COMMAND_HELP = """可用命令: checkpoint 显示 checkpoint 路径 exit 退出 -也可以直接输入自然语言需求,Agent 会先分析并更新会话参数;执行仍需输入 run。 +非内置命令会默认交给 LLM 普通对话;完整部署仍需使用 analyze/run 并人工确认。 执行中可按 Ctrl+C 中断,保存 checkpoint 后再用 resume 继续。 """ @@ -136,6 +147,9 @@ class InteractiveCliSession: if normalized == "preview": self.output(self.agent.preview(self.params, self.strategy)) return True + if normalized == "ask": + self._ask_llm(rest.strip()) + return True if normalized == "params": self._show_params() return True @@ -154,6 +168,12 @@ class InteractiveCliSession: if normalized == "mcp": self._configure_mcp(rest.strip()) return True + if normalized == "log": + self._handle_log_command(rest.strip()) + return True + if normalized == "action": + self._handle_action_command(rest.strip()) + return True if normalized in ("run", "deploy", "execute"): self._run_deploy() return True @@ -185,17 +205,7 @@ class InteractiveCliSession: self._load_checkpoint(rest.strip()[len("checkpoint") :].strip()) return True - if _is_small_talk(text): - logger.info("chat 输入识别为寒暄,跳过结构化分析") - self.output("你好。可以输入 help 查看命令,或直接描述部署需求;执行前仍需输入 run 并确认。") - return True - if not _looks_like_deploy_request(text): - logger.info("chat 输入未命中部署需求粗筛,跳过结构化分析") - self.output("我没有识别到明确的部署需求。可以输入 help 查看命令,或用 analyze <需求> 明确触发需求分析。") - return True - - self.output("正在分析需求...") - self._analyze(text) + self._ask_llm(text) return True def _analyze(self, text: str) -> None: @@ -274,13 +284,260 @@ class InteractiveCliSession: events = self.state.events[-max(count, 1) :] self.output(json.dumps(redact_mapping(events), ensure_ascii=False, indent=2, default=str)) + def _ask_llm(self, text: str) -> None: + """把普通自然语言输入交给 LLM,不触发部署 workflow。""" + if not text: + self.output("格式:ask <问题>") + return + client_name = type(self.agent.llm_client).__name__ + self.output(f"正在询问 LLM: {client_name}") + logger.info("chat 普通 LLM 对话开始 client=%s text=%s", client_name, redact_for_log(text, max_text_len=800)) + try: + answer = self.agent.llm_client.chat(text, context=self._llm_chat_context()) + except Exception as exc: + logger.exception("chat 普通 LLM 对话失败 client=%s", client_name) + self.output(f"LLM 对话失败: {exc}") + return + self.output(answer or "LLM 未返回内容。") + logger.info("chat 普通 LLM 对话完成 client=%s answer=%s", client_name, redact_for_log(answer, max_text_len=1200)) + + def _handle_log_command(self, text: str) -> None: + """处理日志分析命令。""" + try: + parts = _split_command_args(text) + except ValueError as exc: + self.output(f"log 命令解析失败: {exc}") + return + if not parts or parts[0] != "analyze": + self.output("格式:log analyze <路径> [问题] [--tail N] [--max-bytes N]") + return + try: + path, question, tail_lines, max_bytes = _parse_log_analyze_args(parts[1:]) + except ValueError as exc: + self.output(f"log analyze 参数错误: {exc}") + return + if not path: + self.output("格式:log analyze <路径> [问题] [--tail N] [--max-bytes N]") + return + try: + log_text = _read_log_tail(path, tail_lines=tail_lines, max_bytes=max_bytes) + except OSError as exc: + logger.exception("chat 日志读取失败 path=%s", path) + self.output(f"日志读取失败: {exc}") + return + if not log_text.strip(): + self.output(f"日志文件没有可分析内容: {path}") + return + client_name = type(self.agent.llm_client).__name__ + self.output(f"正在分析日志: {path}") + logger.info( + "chat 日志分析开始 client=%s path=%s tail=%s max_bytes=%s question=%s text_len=%s", + client_name, + path, + tail_lines, + max_bytes, + redact_for_log(question, max_text_len=300), + len(log_text), + ) + try: + answer = self.agent.llm_client.analyze_log(log_text, question=question or None, source_path=str(path)) + except Exception as exc: + logger.exception("chat 日志分析失败 client=%s path=%s", client_name, path) + self.output(f"日志分析失败: {exc}") + return + self.output(answer or "LLM 未返回日志分析结果。") + logger.info("chat 日志分析完成 path=%s answer=%s", path, redact_for_log(answer, max_text_len=1200)) + + def _handle_action_command(self, text: str) -> None: + """处理单 action proposal 和执行命令。""" + try: + parts = _split_command_args(text) + except ValueError as exc: + self.output(f"action 命令解析失败: {exc}") + return + if not parts: + self.output("格式:action propose <需求> | action run [ip=...] [KEY=VALUE...] | action run llm <需求>") + return + command = parts[0] + if command == "propose": + request = text.partition("propose")[2].strip() + self._propose_action(request, execute=False) + return + if command == "run": + request = text.partition("run")[2].strip() + if not request: + self.output("格式:action run [ip=...] [KEY=VALUE...] | action run llm <需求>") + return + if request.lower().startswith("llm "): + proposal = self._propose_action(request[4:].strip(), execute=True) + if proposal and proposal.action: + self._run_single_action_with_confirmation(proposal) + return + try: + proposal = _parse_manual_action_run(request) + except ValueError as exc: + self.output(f"action run 参数错误: {exc}") + return + self._run_single_action_with_confirmation(proposal) + return + self.output("未知 action 命令。格式:action propose <需求> | action run ...") + + def _propose_action(self, text: str, *, execute: bool) -> LlmSingleActionProposal | None: + """调用 LLM 解析单 action 建议,并展示结果。""" + if not text: + self.output("请输入要解析的单 action 需求。") + return None + client_name = type(self.agent.llm_client).__name__ + self.output(f"正在解析单 action: {client_name}") + logger.info("chat 单 action proposal 开始 execute=%s text=%s", execute, redact_for_log(text, max_text_len=800)) + try: + proposal = self.agent.llm_client.propose_action( + text, + list(ALLOWED_ACTIONS), + self.params, + state_summary=self._state_summary_for_llm(), + ) + except Exception as exc: + logger.exception("chat 单 action proposal 失败") + self.output(f"单 action 解析失败: {exc}") + return None + if not proposal.action: + self.output("LLM 未识别到明确 action。请写出具体 action 名,或使用 action run 手工指定。") + logger.info("chat 单 action proposal 未识别 action proposal=%s", json_for_log(asdict(proposal))) + return proposal + if not execute: + self.output(_format_action_proposal(proposal, self._backend_for_action(proposal.action))) + logger.info("chat 单 action proposal 完成 execute=%s proposal=%s", execute, json_for_log(asdict(proposal))) + return proposal + + def _run_single_action_with_confirmation(self, proposal: LlmSingleActionProposal) -> None: + """展示单 action 计划,确认后执行。""" + if proposal.action not in ALLOWED_ACTIONS: + self.output(f"不支持的 action: {proposal.action}") + return + if not self._prepare_params_for_run(): + return + problems = self._validate_single_action_prerequisites(proposal) + if problems: + self.output("单 action 执行前检查未通过:") + for problem in problems: + self.output(f"- {problem}") + return + self.output(_format_action_proposal(proposal, self._backend_for_action(proposal.action))) + if not self._ask_yes_no("确认执行此单 action 请输入 yes: "): + self.output("已取消单 action 执行。") + logger.info("chat 单 action 执行被用户取消 proposal=%s", json_for_log(asdict(proposal))) + return + if not self._ensure_state_for_single_action(proposal): + return + if self.state is None: + self.output("当前没有运行状态。") + return + try: + result = self.agent.run_single_action( + self.state, + proposal.action, + ip=proposal.ip, + kwargs=proposal.kwargs, + ) + except Exception as exc: + logger.exception("chat 单 action 执行失败 proposal=%s", json_for_log(asdict(proposal))) + self.output(f"单 action 执行失败: {exc}") + self._print_pause_context() + return + self.output("单 action 执行完成" if result.ok and not self.state.paused else "单 action 执行已停止") + self.output(_format_action_result(result)) + self._print_pause_context() + self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}") + + def _ensure_state_for_single_action(self, proposal: LlmSingleActionProposal) -> bool: + """确保单 action 有可用 state;没有时用当前参数创建临时 state。""" + if self.state is None: + target_ips = list(self.target_ips) + if proposal.ip and proposal.ip not in target_ips: + target_ips.append(proposal.ip) + self.state = self.agent.create_state( + params=self.params, + execution_strategy=self.strategy, + checkpoint_path=self.checkpoint_path, + target_ips=target_ips, + ) + self.graph_runtime = None + logger.info("chat 单 action 创建临时 state run_id=%s checkpoint=%s", self.state.run_id, self.state.checkpoint_path) + if proposal.ip and proposal.ip not in self.state.target_ips: + self.state.target_ips.append(proposal.ip) + if proposal.ip and proposal.ip not in self.state.ip_states: + self.state.ip_states[proposal.ip] = { + "status": "RUNNING", + "completed_steps": [], + "failed_stage": "", + "failure_reason": "", + "rollback_status": "ROLLBACK_NOT_RUN", + "rollback_stop_first": False, + "log_file": "", + } + return True + + def _validate_single_action_prerequisites(self, proposal: LlmSingleActionProposal) -> list[str]: + """在执行单 action 前做 chat 层友好检查。""" + problems: list[str] = [] + action = proposal.action + backend = self._backend_for_action(action) + if action in (set(IP_ACTION_SEQUENCE) | {"stop-ip", "rollback-ip"}) and not proposal.ip: + problems.append(f"{action} 需要提供 ip,例如:action run {action} ip=192.168.1.10") + if backend == "mcp" and self.agent.mcp_runner is None: + problems.append("当前 action 会路由到 MCP,但尚未配置 MCP runner。请启动时传 --mcp-config 或执行 mcp config <路径>。") + if backend == "script": + script_entry = self.agent.script_base_dir / "deploy.sh" + ps_entry = self.agent.script_base_dir / "deploy.ps1" + if not script_entry.exists() and not ps_entry.exists(): + problems.append(f"脚本入口不存在: {script_entry} 或 {ps_entry}") + if action == "upload-package" and self.strategy != "fake": + zip_path = str(self.params.get("ZIP_FILE_PATH", "")).strip() + if not _path_exists(zip_path): + problems.append(f"ZIP_FILE_PATH 不存在: {zip_path}") + return problems + + def _backend_for_action(self, action: str) -> str: + """读取当前 action 后端,优先使用已有 state。""" + if self.state is not None: + return self.state.action_backends.get(action, "script") + return build_action_backends(self.strategy).get(action, "script") + + def _llm_chat_context(self) -> dict[str, Any]: + """构造普通对话上下文,敏感参数先脱敏。""" + return { + "strategy": self.strategy, + "params": redact_mapping(self.params), + "target_ips": list(self.target_ips), + "checkpoint_path": self.checkpoint_path, + "state": self._state_summary_for_llm(), + } + + def _state_summary_for_llm(self) -> dict[str, Any]: + """构造给 LLM 的最小 state 摘要。""" + if self.state is None: + return {} + return { + "run_id": self.state.run_id, + "strategy": self.state.execution_strategy, + "paused": self.state.paused, + "pause_reason": self.state.pause_reason, + "pending_confirmation": self.state.pending_confirmation, + "last_success_step": self.state.last_success_step, + "last_failed_step": self.state.last_failed_step, + "hash_code_present": bool(self.state.hash_code), + "node_url_present": bool(self.state.node_url), + "target_ips": list(self.state.target_ips), + } + def _configure_llm(self, text: str) -> None: """热加载 LLM 配置,或开关 action 后诊断。""" if not text: self.output("格式:llm config base_url=... api_key=... model=... action_analysis_prompt_file=... | llm test [文本] | llm fallback | llm action-analysis on|off") return try: - parts = shlex.split(text) + parts = _split_command_args(text) except ValueError as exc: logger.exception("chat LLM 命令解析失败 text=%s", redact_for_log(text, max_text_len=500)) self.output(f"LLM 命令解析失败: {exc}") @@ -958,14 +1215,170 @@ def _parse_key_values(parts: list[str]) -> dict[str, str]: continue key, value = part.split("=", 1) if key: - values[key] = value + values[key] = _strip_outer_quotes(value) return values +def _parse_log_analyze_args(parts: list[str]) -> tuple[Path, str, int, int]: + """解析 `log analyze` 参数。""" + if not parts: + raise ValueError("缺少日志路径") + path = Path(parts[0]).expanduser() + tail_lines = DEFAULT_LOG_ANALYSIS_TAIL_LINES + max_bytes = DEFAULT_LOG_ANALYSIS_MAX_BYTES + question_parts: list[str] = [] + index = 1 + while index < len(parts): + part = parts[index] + if part == "--tail": + index += 1 + if index >= len(parts): + raise ValueError("--tail 需要提供行数") + tail_lines = _positive_int(parts[index], "--tail") + elif part == "--max-bytes": + index += 1 + if index >= len(parts): + raise ValueError("--max-bytes 需要提供字节数") + max_bytes = _positive_int(parts[index], "--max-bytes") + else: + question_parts.append(part) + index += 1 + return path, " ".join(question_parts).strip(), tail_lines, max_bytes + + +def _read_log_tail(path: Path, *, tail_lines: int, max_bytes: int) -> str: + """读取日志尾部文本,并先做脱敏和截断。""" + if not path.exists(): + raise OSError(f"日志文件不存在: {path}") + if not path.is_file(): + raise OSError(f"不是普通日志文件: {path}") + size = path.stat().st_size + with path.open("rb") as handle: + if size > max_bytes: + handle.seek(max(size - max_bytes, 0)) + raw = handle.read(max_bytes) + marker = f"[只读取尾部 {max_bytes} 字节,文件总大小 {size} 字节]\n" + else: + raw = handle.read() + marker = "" + text = raw.decode("utf-8", errors="replace") + lines = text.splitlines() + if len(lines) > tail_lines: + lines = lines[-tail_lines:] + marker += f"[只保留尾部 {tail_lines} 行]\n" + return marker + str(redact_for_log("\n".join(lines), max_text_len=max_bytes)) + + +def _parse_manual_action_run(text: str) -> LlmSingleActionProposal: + """解析 `action run [ip=...] [KEY=VALUE...]`。""" + parts = _split_command_args(text) + if not parts: + raise ValueError("缺少 action 名") + action = parts[0] + if action not in ALLOWED_ACTIONS: + raise ValueError(f"不支持的 action: {action}") + values = _parse_key_values(parts[1:]) + ip = values.pop("ip", values.pop("IP", "")) + kwargs: dict[str, Any] = {} + for key, value in values.items(): + normalized_key = _single_action_kwarg_name(key) + kwargs[normalized_key] = _single_action_value(normalized_key, value) + risk = "high" if action in ("publish-version", "create-download-task", "upgrade-ip", "start-ip", "stop-ip", "rollback-ip") else "medium" + return LlmSingleActionProposal( + action=action, + ip=ip, + kwargs=kwargs, + reason="用户手工指定单 action。", + risk_level=risk, # type: ignore[arg-type] + requires_confirmation=True, + ) + + +def _single_action_kwarg_name(key: str) -> str: + """把命令行参数名转换为 Agent 单 action kwargs。""" + aliases = { + "HASH_CODE": "hash_code", + "hashCode": "hash_code", + "hash-code": "hash_code", + "NODE_URL": "node_url", + "nodeUrl": "node_url", + "node-url": "node_url", + "stop-first": "stop_first", + "STOP_FIRST": "stop_first", + "timeout": "timeout_sec", + "TIMEOUT": "timeout_sec", + } + return aliases.get(key, key) + + +def _single_action_value(key: str, value: str) -> Any: + """解析单 action kwargs 的基础类型。""" + if key == "stop_first": + return value.strip().lower() in ("1", "true", "yes", "y", "on") + if key == "timeout_sec": + return int(value) + return value + + +def _format_action_proposal(proposal: LlmSingleActionProposal, backend: str) -> str: + """格式化单 action 执行建议。""" + safe_kwargs = redact_mapping(proposal.kwargs) + lines = [ + "单 action 计划:", + f"- action: {proposal.action or '-'}", + f"- backend: {backend or '-'}", + f"- ip: {proposal.ip or '-'}", + f"- risk: {proposal.risk_level}", + f"- confirmation: {'required' if proposal.requires_confirmation else 'not-required'}", + ] + if proposal.reason: + lines.append(f"- reason: {proposal.reason}") + if safe_kwargs: + lines.append("- kwargs: " + json.dumps(safe_kwargs, ensure_ascii=False, sort_keys=True)) + return "\n".join(lines) + + +def _format_action_result(result: ActionResult) -> str: + """格式化单 action 执行结果摘要。""" + payload = { + "action": result.action, + "backend": result.backend, + "ok": result.ok, + "exit_code": result.exit_code, + "tool_name": result.tool_name, + "values": result.values, + "error_summary": result.error_summary, + } + return json.dumps(redact_mapping(payload), ensure_ascii=False, indent=2, default=str) + + +def _positive_int(value: str, name: str) -> int: + """解析正整数参数。""" + try: + number = int(value) + except ValueError as exc: + raise ValueError(f"{name} 必须是整数") from exc + if number <= 0: + raise ValueError(f"{name} 必须大于 0") + return number + + +def _split_command_args(text: str) -> list[str]: + """按 chat 命令语义拆分参数,并保留 Windows 路径中的反斜杠。""" + return [_strip_outer_quotes(part) for part in shlex.split(text, posix=False)] + + +def _strip_outer_quotes(value: str) -> str: + """去掉 shlex(posix=False) 保留下来的成对引号。""" + if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'): + return value[1:-1] + return value + + def _parse_rollback_args(text: str) -> tuple[str, bool | None, str]: """解析 chat rollback 命令参数,返回 IP、停机覆盖值和备注。""" try: - parts = shlex.split(text) + parts = _split_command_args(text) except ValueError as exc: raise ValueError(str(exc)) from exc ip = "" @@ -1002,56 +1415,6 @@ def _find_current_failed_ip(state: AgentState) -> str: return "" -def _is_small_talk(text: str) -> bool: - """识别不应触发 LLM/结构化分析的简单寒暄。""" - normalized = text.strip().lower() - return normalized in { - "你好", - "您好", - "hello", - "hi", - "hey", - "在吗", - "谢谢", - "thanks", - "thank you", - } - - -def _looks_like_deploy_request(text: str) -> bool: - """粗筛自然语言部署需求,避免任意闲聊都触发耗时分析。""" - lowered = text.lower() - deploy_keywords = ( - "部署", - "发布", - "升级", - "回滚", - "预演", - "执行", - "pam", - "mcp", - "node", - "版本", - "机场", - "deploy", - "release", - "upgrade", - "rollback", - "preview", - ) - param_markers = ( - "HOME_BASE_URL", - "CLIENT_ID", - "AIRPORT_CODE", - "APP_NAME", - "MODULE_NAME", - "VERSION_NUMBER", - "PARENT_VERSION_NUMBER", - "ZIP_FILE_PATH", - ) - return any(keyword in lowered for keyword in deploy_keywords) or any(marker in text for marker in param_markers) - - def _path_exists(path: str) -> bool: """检查本地路径是否存在,兼容打包到 Linux 后的绝对路径。""" if not path: @@ -1073,7 +1436,12 @@ def _build_prompt_input(input_func: InputFunc) -> InputFunc: commands = [ "help", "preview", + "ask", "analyze", + "log analyze", + "action propose", + "action run", + "action run llm", "params", "events", "set", diff --git a/pam_deploy_graph/llm/base.py b/pam_deploy_graph/llm/base.py index 5aa9f8c..c8524c9 100644 --- a/pam_deploy_graph/llm/base.py +++ b/pam_deploy_graph/llm/base.py @@ -11,6 +11,7 @@ from pam_deploy_graph.models import ( LlmDeployPlan, LlmIntentResult, LlmParamResult, + LlmSingleActionProposal, ) @@ -43,3 +44,21 @@ class LlmClient(Protocol): ) -> LlmActionAnalysis: """分析 action 执行结果,并给出是否允许继续执行的建议。""" ... + + def chat(self, text: str, context: dict[str, Any] | None = None) -> str: + """进行普通自然语言对话,不触发部署 workflow。""" + ... + + def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str: + """分析日志文本并给出异常摘要、原因和建议。""" + ... + + def propose_action( + self, + text: str, + allowed_actions: list[str], + params: dict[str, Any], + state_summary: dict[str, Any] | None = None, + ) -> LlmSingleActionProposal: + """把自然语言解析为单次 action 调用建议。""" + ... diff --git a/pam_deploy_graph/llm/openai_compatible.py b/pam_deploy_graph/llm/openai_compatible.py index d570260..e330ce5 100644 --- a/pam_deploy_graph/llm/openai_compatible.py +++ b/pam_deploy_graph/llm/openai_compatible.py @@ -23,10 +23,19 @@ from pam_deploy_graph.constants import ( SENSITIVE_KEYS, ) from pam_deploy_graph.logging_utils import json_for_log, redact_for_log -from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult +from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult, LlmSingleActionProposal from pam_deploy_graph.models import ActionResult, LlmActionAnalysis -from .prompts import ACTION_ANALYSIS_PROMPT, INTENT_PROMPT, PARAM_PROMPT, PLAN_PROMPT, SYSTEM_PROMPT +from .prompts import ( + ACTION_ANALYSIS_PROMPT, + CHAT_PROMPT, + INTENT_PROMPT, + LOG_ANALYSIS_PROMPT, + PARAM_PROMPT, + PLAN_PROMPT, + SINGLE_ACTION_PROMPT, + SYSTEM_PROMPT, +) JsonTransport = Callable[[str, dict[str, str], dict[str, Any], float], dict[str, Any]] logger = logging.getLogger(__name__) @@ -172,6 +181,59 @@ class OpenAICompatibleLlmClient: notes=_string_list(payload.get("notes")), ) + def chat(self, text: str, context: dict[str, Any] | None = None) -> str: + """调用 LLM 做普通对话,不要求 JSON 响应。""" + return self._complete_text( + "chat", + CHAT_PROMPT, + { + "user_text": text, + "context": _redact_sensitive(context or {}), + }, + ) + + def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str: + """调用 LLM 分析日志尾部摘要。""" + return self._complete_text( + "analyze_log", + LOG_ANALYSIS_PROMPT, + { + "source_path": source_path, + "question": question or "请分析日志中的异常、可能原因和下一步建议。", + "log_tail": redact_for_log(log_text, max_text_len=64000), + }, + ) + + def propose_action( + self, + text: str, + allowed_actions: list[str], + params: dict[str, Any], + state_summary: dict[str, Any] | None = None, + ) -> LlmSingleActionProposal: + """调用 LLM 把自然语言解析为单 action 调用建议。""" + payload = self._complete_json( + "propose_action", + SINGLE_ACTION_PROMPT, + { + "user_text": text, + "allowed_actions": allowed_actions, + "params": _redact_sensitive(params), + "state_summary": _redact_sensitive(state_summary or {}), + }, + ) + action = _string(payload, "action", "") + if action not in allowed_actions: + action = "" + return LlmSingleActionProposal( + action=action, + ip=_string(payload, "ip", ""), + kwargs=_dict(payload.get("kwargs")), + reason=_string(payload, "reason", ""), + risk_level=_risk_level(payload.get("risk_level")), + requires_confirmation=True, + ) + def _complete_json(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> dict[str, Any]: """发送 chat/completions 请求,并解析 JSON 对象响应。""" started_at = time.perf_counter() @@ -237,6 +299,53 @@ class OpenAICompatibleLlmClient: ) return parsed + def _complete_text(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> str: + """发送 chat/completions 请求,并返回普通文本响应。""" + started_at = time.perf_counter() + endpoint = _chat_completions_url(self.base_url) + request_payload = { + "model": self.model, + "temperature": self.temperature, + "messages": [ + {"role": "system", "content": instruction}, + { + "role": "user", + "content": "输入 JSON:\n" + json.dumps(input_payload, ensure_ascii=False, sort_keys=True), + }, + ], + } + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + logger.info( + "LLM 文本请求开始 operation=%s endpoint=%s model=%s timeout=%s has_api_key=%s input=%s", + operation, + endpoint, + self.model, + self.timeout_sec, + bool(self.api_key), + json_for_log(input_payload, max_text_len=1600), + ) + try: + response = self.transport(endpoint, headers, request_payload, self.timeout_sec) + content = str(_message_content(response)) + except Exception: + logger.exception( + "LLM 文本请求失败 operation=%s endpoint=%s duration_ms=%s input=%s", + operation, + endpoint, + int((time.perf_counter() - started_at) * 1000), + json_for_log(input_payload, max_text_len=1600), + ) + raise + logger.info( + "LLM 文本请求完成 operation=%s duration_ms=%s content=%s", + operation, + int((time.perf_counter() - started_at) * 1000), + redact_for_log(content, max_text_len=1600), + ) + return content.strip() + def _default_transport( url: str, @@ -401,6 +510,14 @@ def _optional_bool(value: Any) -> bool | None: return bool(value) +def _risk_level(value: Any) -> str: + """解析单 action 风险等级,非法值降级为 medium。""" + text = str(value or "").strip().lower() + if text in ("low", "medium", "high"): + return text + return "medium" + + def _dict(value: Any) -> dict[str, Any]: """确保返回 dict,非法值降级为空 dict。""" return value if isinstance(value, dict) else {} diff --git a/pam_deploy_graph/llm/prompts.py b/pam_deploy_graph/llm/prompts.py index 87a95d1..d4e988c 100644 --- a/pam_deploy_graph/llm/prompts.py +++ b/pam_deploy_graph/llm/prompts.py @@ -100,3 +100,43 @@ ACTION_ANALYSIS_PROMPT = """分析一次 PAM action 执行结果。 - 脚本正常过程日志不会作为错误依据,不能因为日志来自 stderr 就判定异常。 - 不要输出密钥、token、Authorization 或完整日志原文。 """ + +CHAT_PROMPT = """你是 PAM 部署 Agent 的交互助手。 + +要求: +- 可以回答普通问题、解释当前 Agent 的命令和部署流程。 +- 不要自动触发部署、回滚、升级、脚本执行或 MCP 调用。 +- 如果用户想执行完整部署,提示使用 `analyze <需求>` 先分析,确认后再输入 `run`。 +- 如果用户想单独执行 action,提示使用 `action propose <需求>` 或 `action run ...`,执行前仍需要人工确认。 +- 不要输出密钥、token、Authorization、CLIENT_SECRET 或 api_key。 +""" + +LOG_ANALYSIS_PROMPT = """分析 PAM Agent 或部署脚本日志。 + +要求: +- 优先总结异常现象、可能原因和建议下一步。 +- 不要输出密钥、token、Authorization、CLIENT_SECRET 或 api_key。 +- 输入通常是日志尾部摘要,不代表完整文件。 +- 不要因为日志来自 stderr 就直接判定失败,要结合 ERROR、Exception、fail、状态码和上下文判断。 +""" + +SINGLE_ACTION_PROMPT = """把用户自然语言解析成一次 PAM action 调用建议。 + +输出 JSON schema: +{ + "action": "get-token", + "ip": "", + "kwargs": {}, + "reason": "...", + "risk_level": "low|medium|high", + "requires_confirmation": true +} + +要求: +- `action` 必须来自输入的 allowed_actions;不能识别明确 action 时返回空字符串。 +- 不要猜测危险 action,不要自动规划多个 action。 +- 逐 IP action 必须尽量提取 `ip`。 +- 除 `ip` 外的额外参数放入 `kwargs`。 +- 所有 action 都必须 `requires_confirmation=true`。 +- 不要输出密钥、token、Authorization、CLIENT_SECRET 或 api_key。 +""" diff --git a/pam_deploy_graph/llm/rule_based.py b/pam_deploy_graph/llm/rule_based.py index e875f25..0f919d4 100644 --- a/pam_deploy_graph/llm/rule_based.py +++ b/pam_deploy_graph/llm/rule_based.py @@ -11,7 +11,7 @@ import re from dataclasses import asdict from typing import Any -from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS +from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS, SENSITIVE_KEYS from pam_deploy_graph.logging_utils import json_for_log, redact_for_log from pam_deploy_graph.models import ( ActionResult, @@ -20,6 +20,7 @@ from pam_deploy_graph.models import ( LlmDeployPlan, LlmIntentResult, LlmParamResult, + LlmSingleActionProposal, ) logger = logging.getLogger(__name__) @@ -56,6 +57,79 @@ KEY_ALIASES = { class RuleBasedLlmClient: """基于规则的轻量 LLM client fallback。""" + def chat(self, text: str, context: dict[str, Any] | None = None) -> str: + """规则 fallback 的普通对话说明。""" + logger.info("规则 LLM 普通对话 text=%s context=%s", redact_for_log(text, max_text_len=800), json_for_log(context or {})) + lowered = text.lower() + if any(word in lowered for word in ("help", "帮助", "怎么用", "命令")): + return ( + "当前是本地规则 LLM fallback。可用 `analyze <需求>` 分析部署需求,`run` 执行完整 workflow," + "`action propose <需求>` 解析单个 action,`action run ...` 确认后执行单个 action," + "`log analyze <路径>` 分析日志尾部。" + ) + return ( + "当前未配置真实 LLM,已使用本地规则 fallback。普通闲聊只能给出有限说明;" + "如需自然语言问答、日志深度分析或更准确的 action 解析,请配置真实 LLM。" + ) + + def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str: + """用本地规则分析日志尾部。""" + logger.info("规则 LLM 日志分析 source=%s question=%s text_len=%s", source_path, redact_for_log(question or "", max_text_len=300), len(log_text)) + lines = log_text.splitlines() + problem_lines = [ + line + for line in lines + if re.search(r"error|exception|fail|traceback|timeout|refused|denied|失败|异常|错误|超时", line, flags=re.IGNORECASE) + ] + summary = [ + f"日志来源: {source_path or '-'}", + f"已分析尾部 {len(lines)} 行。", + ] + if question: + summary.append(f"关注问题: {question}") + if problem_lines: + summary.append(f"发现 {len(problem_lines)} 行疑似异常,最近几条:") + summary.extend(f"- {redact_for_log(line, max_text_len=240)}" for line in problem_lines[-5:]) + summary.append("建议:优先检查以上异常附近的接口返回、网络连通性、认证信息和目标服务状态。") + else: + summary.append("未在日志尾部发现明显 ERROR/Exception/fail/timeout 关键字。") + summary.append("建议:如问题仍存在,请扩大 `--tail` 或提供更具体的问题描述。") + return "\n".join(summary) + + def propose_action( + self, + text: str, + allowed_actions: list[str], + params: dict[str, Any], + state_summary: dict[str, Any] | None = None, + ) -> LlmSingleActionProposal: + """只在用户明确写出 action 名时生成单 action 建议。""" + logger.info( + "规则 LLM 单 action 解析开始 text=%s allowed=%s state=%s", + redact_for_log(text, max_text_len=800), + allowed_actions, + json_for_log(state_summary or {}), + ) + action = "" + lowered = text.lower() + for candidate in allowed_actions: + if candidate.lower() in lowered: + action = candidate + break + ip_match = re.search(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", text) + kwargs = _safe_action_kwargs(self._extract_key_values(text)) + risk = "high" if action in ("publish-version", "create-download-task", "upgrade-ip", "start-ip", "stop-ip", "rollback-ip") else "medium" + proposal = LlmSingleActionProposal( + action=action, + ip=ip_match.group(0) if ip_match else "", + kwargs=kwargs, + reason="规则 fallback 仅在输入中出现明确 action 名时生成建议。" if action else "未识别到明确 action 名。", + risk_level=risk, # type: ignore[arg-type] + requires_confirmation=True, + ) + logger.info("规则 LLM 单 action 解析完成 proposal=%s", json_for_log(asdict(proposal))) + return proposal + def understand_request(self, text: str) -> LlmIntentResult: """用关键词规则识别用户意图和执行策略偏好。""" logger.info("规则 LLM 意图识别开始 text=%s", redact_for_log(text, max_text_len=800)) @@ -334,3 +408,8 @@ def _progress_note(values: dict[str, Any]) -> str: def _lower_value(value: Any) -> str: """把字段值转成小写字符串。""" return str(value).strip().lower() if value is not None else "" + + +def _safe_action_kwargs(values: dict[str, str]) -> dict[str, str]: + """过滤单 action 额外参数,避免把敏感字段放入执行建议。""" + return {key: value for key, value in values.items() if key not in SENSITIVE_KEYS} diff --git a/pam_deploy_graph/models.py b/pam_deploy_graph/models.py index dacae5d..bdd5301 100644 --- a/pam_deploy_graph/models.py +++ b/pam_deploy_graph/models.py @@ -11,6 +11,7 @@ IntentName = Literal["deploy", "show_usage", "preview", "query_node_ips", "rollb ModePreference = Literal["MCP", "API脚本", "未指定"] StrategyPreference = Literal["hybrid_node_mcp", "script_only", "fake", "未指定"] ActionAnalysisSeverity = Literal["info", "low", "medium", "high"] +ActionRiskLevel = Literal["low", "medium", "high"] @dataclass(slots=True) @@ -104,6 +105,18 @@ class LlmActionAnalysis: notes: list[str] = field(default_factory=list) +@dataclass(slots=True) +class LlmSingleActionProposal: + """LLM 对单次 action 调用的结构化建议。""" + + action: str + ip: str = "" + kwargs: dict[str, Any] = field(default_factory=dict) + reason: str = "" + risk_level: ActionRiskLevel = "medium" + requires_confirmation: bool = True + + @dataclass(slots=True) class AgentState: """一次部署运行的完整状态,可序列化到 checkpoint。""" diff --git a/tests/test_interactive_cli.py b/tests/test_interactive_cli.py index 332ac9b..30a4f61 100644 --- a/tests/test_interactive_cli.py +++ b/tests/test_interactive_cli.py @@ -7,7 +7,7 @@ import pytest from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.fake_runner import FakeActionRunner from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input -from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult +from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult, LlmSingleActionProposal PARAMS = { @@ -41,6 +41,9 @@ class BlockingReviewLlmClient: class FakeTestableLlmClient: def __init__(self) -> None: self.requests: list[str] = [] + self.chat_requests: list[tuple[str, dict]] = [] + self.log_requests: list[tuple[str, str | None, str]] = [] + self.proposal_requests: list[str] = [] def understand_request(self, text: str) -> LlmIntentResult: self.requests.append(text) @@ -61,6 +64,24 @@ class FakeTestableLlmClient: def analyze_action_result(self, *, action, result): return LlmActionAnalysis(action=action) + def chat(self, text, context=None): + self.chat_requests.append((text, context or {})) + return f"chat answer: {text}" + + def analyze_log(self, log_text, question=None, source_path=""): + self.log_requests.append((log_text, question, source_path)) + return "log analysis answer" + + def propose_action(self, text, allowed_actions, params, state_summary=None): + self.proposal_requests.append(text) + return LlmSingleActionProposal( + action="verify-ip" if "verify" in text else "get-online-ips", + ip="192.168.1.10" if "192.168.1.10" in text else "", + reason="test proposal", + risk_level="medium", + requires_confirmation=True, + ) + class FlakyVerifyRunner(FakeActionRunner): """第一次 verify-ip 失败,后续恢复成功,用于覆盖断点重试。""" @@ -177,9 +198,10 @@ def test_chat_run_prints_progress_poll_updates(tmp_path: Path): assert "poll-download-progress" in session.state.completed_global_steps -def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path): +def test_chat_greeting_goes_to_llm_chat_without_structured_analysis(tmp_path: Path): + llm = FakeTestableLlmClient() session = InteractiveCliSession( - agent=PamDeployAgent(), + agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), @@ -188,10 +210,132 @@ def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path): output = run_session(session, ["你好", "exit"]) assert session.last_analysis is None - assert any("可以输入 help 查看命令" in item for item in output) + assert llm.chat_requests[0][0] == "你好" + assert any("正在询问 LLM: FakeTestableLlmClient" in item for item in output) + assert any("chat answer: 你好" in item for item in output) assert not any("已生成结构化理解" in item for item in output) +def test_chat_ask_command_uses_llm_chat(tmp_path: Path): + llm = FakeTestableLlmClient() + session = InteractiveCliSession( + agent=PamDeployAgent(llm_client=llm), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["ask 这个 agent 能做什么", "exit"]) + + assert llm.chat_requests[0][0] == "这个 agent 能做什么" + assert llm.chat_requests[0][1]["params"]["CLIENT_SECRET"] == "***" + assert any("chat answer: 这个 agent 能做什么" in item for item in output) + + +def test_chat_log_analyze_reads_tail_and_redacts(tmp_path: Path): + llm = FakeTestableLlmClient() + log_path = tmp_path / "agent.log" + log_path.write_text( + "\n".join( + [ + "line 1 CLIENT_SECRET=real-secret", + "line 2 ok", + "line 3 ERROR failed", + ] + ), + encoding="utf-8", + ) + session = InteractiveCliSession( + agent=PamDeployAgent(llm_client=llm), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, [f"log analyze {log_path} 请找异常 --tail 2", "exit"]) + + log_text, question, source_path = llm.log_requests[0] + assert "line 1" not in log_text + assert "real-secret" not in log_text + assert "line 3 ERROR failed" in log_text + assert question == "请找异常" + assert source_path == str(log_path) + assert any("log analysis answer" in item for item in output) + + +def test_chat_action_propose_only_shows_plan_without_execution(tmp_path: Path): + llm = FakeTestableLlmClient() + fake = FakeActionRunner() + session = InteractiveCliSession( + agent=PamDeployAgent(fake_runner=fake, llm_client=llm), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["action propose 请 verify-ip 192.168.1.10", "exit"]) + + assert llm.proposal_requests == ["请 verify-ip 192.168.1.10"] + assert fake.calls == [] + assert any("单 action 计划" in item for item in output) + assert any("- action: verify-ip" in item for item in output) + + +def test_chat_action_run_llm_requires_confirmation_before_execution(tmp_path: Path): + llm = FakeTestableLlmClient() + fake = FakeActionRunner() + session = InteractiveCliSession( + agent=PamDeployAgent(fake_runner=fake, llm_client=llm), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "no", "exit"]) + + assert fake.calls == [] + assert any("已取消单 action 执行" in item for item in output) + + output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "yes", "exit"]) + + assert ("verify-ip", {"ip": "192.168.1.10"}) in fake.calls + assert session.state is not None + assert any(event["type"] == "SINGLE_ACTION_DONE" for event in session.state.events) + assert any("单 action 执行完成" in item for item in output) + + +def test_chat_action_run_missing_ip_is_friendly(tmp_path: Path): + fake = FakeActionRunner() + session = InteractiveCliSession( + agent=PamDeployAgent(fake_runner=fake), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["action run verify-ip", "exit"]) + + assert fake.calls == [] + assert any("需要提供 ip" in item for item in output) + + +def test_chat_action_run_manual_executes_fake_action(tmp_path: Path): + fake = FakeActionRunner() + session = InteractiveCliSession( + agent=PamDeployAgent(fake_runner=fake), + params=PARAMS, + strategy="fake", + checkpoint_path=str(tmp_path / "checkpoint.json"), + ) + + output = run_session(session, ["action run get-online-ips", "yes", "exit"]) + + assert ("get-online-ips", {"ip": None}) in fake.calls + assert session.state is not None + assert session.state.online_ips == ["192.168.1.10", "192.168.1.11"] + assert any("单 action 执行完成" in item for item in output) + + def test_chat_preflight_blocks_missing_zip_path_before_confirm(tmp_path: Path): missing_package = tmp_path / "missing.zip" session = InteractiveCliSession( diff --git a/tests/test_llm_structured.py b/tests/test_llm_structured.py index 6db8cbc..4c445e6 100644 --- a/tests/test_llm_structured.py +++ b/tests/test_llm_structured.py @@ -287,6 +287,105 @@ def test_openai_compatible_client_omits_success_script_logs_from_action_review() assert "[FLOW][START]" not in json.dumps(input_payload, ensure_ascii=False) +def test_openai_compatible_client_supports_plain_chat(): + calls = [] + + def transport(url, headers, payload, timeout_sec): + calls.append(payload) + return {"choices": [{"message": {"content": "普通回答"}}]} + + client = OpenAICompatibleLlmClient( + base_url="https://llm.example/v1", + api_key="secret-key", + model="model-a", + transport=transport, + ) + + answer = client.chat("你好", context={"CLIENT_SECRET": "real-secret"}) + + serialized_prompt = str(calls[0]) + assert answer == "普通回答" + assert "response_format" not in calls[0] + assert "real-secret" not in serialized_prompt + assert "不要自动触发部署" in calls[0]["messages"][0]["content"] + + +def test_openai_compatible_client_analyzes_log_with_redaction(): + calls = [] + + def transport(url, headers, payload, timeout_sec): + calls.append(payload) + return {"choices": [{"message": {"content": "日志分析"}}]} + + client = OpenAICompatibleLlmClient( + base_url="https://llm.example/v1", + api_key="secret-key", + model="model-a", + transport=transport, + ) + + answer = client.analyze_log("ERROR CLIENT_SECRET=real-secret", question="为什么失败", source_path="agent.log") + + input_payload = _llm_input_payload(calls[0]) + assert answer == "日志分析" + assert input_payload["source_path"] == "agent.log" + assert input_payload["question"] == "为什么失败" + assert "real-secret" not in json.dumps(input_payload, ensure_ascii=False) + assert "不要因为日志来自 stderr" in calls[0]["messages"][0]["content"] + + +def test_openai_compatible_client_proposes_single_action(): + calls = [] + + def transport(url, headers, payload, timeout_sec): + calls.append(payload) + return { + "choices": [ + { + "message": { + "content": ( + '{"action":"verify-ip","ip":"192.168.1.10","kwargs":{"timeout_sec":10},' + '"reason":"用户要求健康检查","risk_level":"low","requires_confirmation":false}' + ) + } + } + ] + } + + client = OpenAICompatibleLlmClient( + base_url="https://llm.example/v1", + api_key="secret-key", + model="model-a", + transport=transport, + ) + + proposal = client.propose_action( + "检查 192.168.1.10", + ["verify-ip", "get-online-ips"], + {"CLIENT_SECRET": "real-secret"}, + state_summary={"node_url_present": True}, + ) + + input_payload = _llm_input_payload(calls[0]) + assert proposal.action == "verify-ip" + assert proposal.ip == "192.168.1.10" + assert proposal.kwargs == {"timeout_sec": 10} + assert proposal.risk_level == "low" + assert proposal.requires_confirmation is True + assert "real-secret" not in json.dumps(input_payload, ensure_ascii=False) + + +def test_rule_based_client_proposes_only_explicit_action(): + client = RuleBasedLlmClient() + + proposal = client.propose_action("请 verify-ip 192.168.1.10", ["verify-ip"], {}, {}) + unknown = client.propose_action("帮我检查一下", ["verify-ip"], {}, {}) + + assert proposal.action == "verify-ip" + assert proposal.ip == "192.168.1.10" + assert unknown.action == "" + + def _llm_input_payload(request_payload): content = request_payload["messages"][1]["content"] _, _, raw_json = content.partition("输入 JSON:\n")