增强 chat LLM 交互与单 action 执行能力

- 扩展 LLM client 协议,支持普通对话、日志分析和单 action 解析
- chat 非内置输入默认进入 LLM 普通对话,不再本地拦截问候
- 新增 ask、log analyze、action propose、action run 等交互命令
- 单 action 执行前强制人工确认,并复用现有 ActionRouter、审核、事件和 checkpoint
- 日志分析默认读取尾部内容并脱敏后再提交给 LLM
- 更新 README、发布包 README 和 run.sh help
- 补充 LLM 与 chat 交互相关测试
This commit is contained in:
dark 2026-06-05 11:49:13 +08:00
parent 33065f6c09
commit 85afabcd94
13 changed files with 1088 additions and 79 deletions

View File

@ -87,11 +87,12 @@ packaging/
- `--analyze-actions``llm action-analysis on` 改为只控制是否把详细审核结果写入 `events`,不再控制审核是否执行。 - `--analyze-actions``llm action-analysis on` 改为只控制是否把详细审核结果写入 `events`,不再控制审核是否执行。
- chat 会播报 action 审核开始、审核完成和审核失败,避免黑盒执行。 - chat 会播报 action 审核开始、审核完成和审核失败,避免黑盒执行。
- chat 支持执行中按 `Ctrl+C` 中断,保存 checkpoint 后再 `resume` - chat 支持执行中按 `Ctrl+C` 中断,保存 checkpoint 后再 `resume`
- chat 支持普通 LLM 对话、日志尾部分析和单 action 执行:`ask <问题>``log analyze <路径>``action propose <需求>``action run ...`
- chat 支持 `set KEY=VALUE``load params <路径>` 热更新当前运行参数,并同步回写运行中的 `config.txt` 与 checkpoint。 - chat 支持 `set KEY=VALUE``load params <路径>` 热更新当前运行参数,并同步回写运行中的 `config.txt` 与 checkpoint。
- 支持通过 `--llm-action-analysis-prompt-file``PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。 - 支持通过 `--llm-action-analysis-prompt-file``PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。
- 增加统一运行日志,默认写入 `logs/pam_deploy_agent.log`,覆盖 CLI/chat、LLM 调用、action 路由、脚本/MCP 调用、LangGraph、checkpoint 等关键流程,并按天切分、默认保留 14 个历史日切文件。 - 增加统一运行日志,默认写入 `logs/pam_deploy_agent.log`,覆盖 CLI/chat、LLM 调用、action 路由、脚本/MCP 调用、LangGraph、checkpoint 等关键流程,并按天切分、默认保留 14 个历史日切文件。
- chat 支持 `llm test [文本]`,可用当前 LLM client 做一次轻量调用,确认真实 LLM 或规则 fallback 是否正常加载。 - chat 支持 `llm test [文本]`,可用当前 LLM client 做一次轻量调用,确认真实 LLM 或规则 fallback 是否正常加载。
- 添加基础测试,当前本地结果为 `73 passed, 3 skipped`。 - 添加基础测试,当前本地结果为 `83 passed, 3 skipped`。
未完成: 未完成:
@ -289,6 +290,11 @@ PAM> run
PAM> status PAM> status
PAM> params PAM> params
PAM> events 5 PAM> events 5
PAM> ask 这个 agent 能做什么
PAM> log analyze logs/pam_deploy_agent.log 请帮我看最近异常 --tail 400
PAM> action propose 请单独执行 verify-ip 192.168.1.10
PAM> action run verify-ip ip=192.168.1.10
PAM> action run llm 请单独执行 get-online-ips
PAM> llm test PAM> llm test
PAM> llm action-analysis on PAM> llm action-analysis on
PAM> llm config action_analysis_prompt_file=prompts/action_review.txt PAM> llm config action_analysis_prompt_file=prompts/action_review.txt
@ -300,7 +306,7 @@ PAM> resume
PAM> exit PAM> exit
``` ```
`chat` 默认仍要求在会话内显式输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。输入 `你好``hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时可直接描述部署任务,或显式使用 `analyze <需求>`。每个 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;审核输入只包含当前 action 的结构化结果和必要诊断日志,不会把完整运行态 `state_summary` 交给大模型,避免跨步骤状态干扰判断;只有审核通过才会把 action 记为 completed如果审核建议停止或审核本身失败流程会暂停并输出建议等待用户决定是否 `resume` 重试当前 action。`create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会在云下载接口中传入 `parentVersionNumber`,用于指定继承哪个版本的规则;默认空值不传,沿用正在使用的版本规则。`poll-download-progress``poll-upgrade-progress` 每次只查询一次进度workflow 会按 `POLL_INTERVAL_SEC``DOWNLOAD_POLL_MAX_ATTEMPTS``UPGRADE_POLL_MAX_ATTEMPTS` 重复调用,并在每次返回后让 LLM/规则判断是否完成、播报进度;未完成时不会跳到下一个 action。`verify-ip` 用于应用启动后的健康检查,失败时 workflow 会按 `VERIFY_INTERVAL_SEC` 重试,最多 `VERIFY_MAX_ATTEMPTS` 次;默认约每 10 秒一次、最多 12 次,仍未通过才暂停。逐 IP action 失败时也会暂停,修复外部环境后输入 `resume` 会从当前 action 重试;如果确实需要回滚,使用 `rollback [IP]` 显式执行。`llm test [文本]` 可测试当前 LLM client 是否可用。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted``set KEY=VALUE``load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file``--mcp-config``--analyze-actions` `chat` 默认把非内置命令交给当前 LLM 做普通对话,不会自动触发部署 workflow需要结构化分析部署需求时请显式使用 `analyze <需求>`,完整部署仍要求输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM`action propose <需求>` 只让 LLM 解析单 action 计划,不执行;`action run <action> [ip=...] [KEY=VALUE...]``action run llm <需求>` 会展示 action、backend、ip、风险和参数用户输入 `yes` 后才会复用现有 ActionRouter 执行单 action。每个 workflow action 和单 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;审核输入只包含当前 action 的结构化结果和必要诊断日志,不会把完整运行态 `state_summary` 交给大模型,避免跨步骤状态干扰判断;只有 workflow 审核通过才会把 action 记为 completed如果审核建议停止或审核本身失败流程会暂停并输出建议等待用户决定是否 `resume` 重试当前 action。`create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会在云下载接口中传入 `parentVersionNumber`,用于指定继承哪个版本的规则;默认空值不传,沿用正在使用的版本规则。`poll-download-progress``poll-upgrade-progress` 每次只查询一次进度workflow 会按 `POLL_INTERVAL_SEC``DOWNLOAD_POLL_MAX_ATTEMPTS``UPGRADE_POLL_MAX_ATTEMPTS` 重复调用,并在每次返回后让 LLM/规则判断是否完成、播报进度;未完成时不会跳到下一个 action。`verify-ip` 用于应用启动后的健康检查,失败时 workflow 会按 `VERIFY_INTERVAL_SEC` 重试,最多 `VERIFY_MAX_ATTEMPTS` 次;默认约每 10 秒一次、最多 12 次,仍未通过才暂停。逐 IP action 失败时也会暂停,修复外部环境后输入 `resume` 会从当前 action 重试;如果确实需要回滚,使用 `rollback [IP]` 显式执行。`llm test [文本]` 可测试当前 LLM client 是否可用。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted``set KEY=VALUE``load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file``--mcp-config``--analyze-actions`
云下载相关参数: 云下载相关参数:

View File

@ -78,6 +78,8 @@ cd pam-deploy-agent-linux-x86_64
- `--analyze-actions` 只控制是否把详细审核结果写入 `events` - `--analyze-actions` 只控制是否把详细审核结果写入 `events`
- action 失败或审核阻断后会保存 checkpoint 并暂停;修复外部环境后通过 `resume` 从当前 action 重试。 - action 失败或审核阻断后会保存 checkpoint 并暂停;修复外部环境后通过 `resume` 从当前 action 重试。
- 回滚不再属于主 workflow 自动分支;需要时使用 chat 内 `rollback [IP]` 或 CLI `rollback --checkpoint ...` 显式执行。 - 回滚不再属于主 workflow 自动分支;需要时使用 chat 内 `rollback [IP]` 或 CLI `rollback --checkpoint ...` 显式执行。
- chat 中非内置命令默认交给当前 LLM 普通对话,不会自动触发部署 workflow完整部署仍需 `analyze` / `run` 并人工确认。
- chat 支持 `ask <问题>``log analyze <路径>``action propose <需求>``action run ...`,可用于普通问答、日志尾部分析和确认后执行单 action。
- chat 支持执行中 `Ctrl+C` 中断后保存 checkpoint再通过 `resume` 重试当前 action。 - chat 支持执行中 `Ctrl+C` 中断后保存 checkpoint再通过 `resume` 重试当前 action。
- chat 支持 `set KEY=VALUE``load params <路径>` 热更新当前运行任务参数。 - chat 支持 `set KEY=VALUE``load params <路径>` 热更新当前运行任务参数。
- 进度查询和健康检查重试参数可通过 `POLL_INTERVAL_SEC``DOWNLOAD_POLL_MAX_ATTEMPTS``UPGRADE_POLL_MAX_ATTEMPTS``VERIFY_INTERVAL_SEC``VERIFY_MAX_ATTEMPTS` 配置。 - 进度查询和健康检查重试参数可通过 `POLL_INTERVAL_SEC``DOWNLOAD_POLL_MAX_ATTEMPTS``UPGRADE_POLL_MAX_ATTEMPTS``VERIFY_INTERVAL_SEC``VERIFY_MAX_ATTEMPTS` 配置。

View File

@ -72,6 +72,11 @@ PAM> run
PAM> status PAM> status
PAM> params PAM> params
PAM> events 5 PAM> events 5
PAM> ask 这个 agent 能做什么
PAM> log analyze logs/pam_deploy_agent.log 请帮我看最近异常 --tail 400
PAM> action propose 请单独执行 verify-ip 192.168.1.10
PAM> action run verify-ip ip=192.168.1.10
PAM> action run llm 请单独执行 get-online-ips
PAM> llm test PAM> llm test
PAM> llm action-analysis on PAM> llm action-analysis on
PAM> llm config action_analysis_prompt_file=prompts/action_review.txt PAM> llm config action_analysis_prompt_file=prompts/action_review.txt
@ -244,7 +249,9 @@ MCP token 获取方式与 HOME 一致,默认按 `client_credentials` POST 到
- 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL``CLIENT_ID``CLIENT_SECRET``AIRPORT_CODE``APP_NAME``MODULE_NAME``VERSION_NUMBER``ZIP_FILE_PATH` - 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL``CLIENT_ID``CLIENT_SECRET``AIRPORT_CODE``APP_NAME``MODULE_NAME``VERSION_NUMBER``ZIP_FILE_PATH`
- `PARENT_VERSION_NUMBER` 是云下载可选参数;非空时会传给 `download-cloud``parentVersionNumber`,空值不会发送。 - `PARENT_VERSION_NUMBER` 是云下载可选参数;非空时会传给 `download-cloud``parentVersionNumber`,空值不会发送。
- `chat` 中输入 `你好``hello` 这类问候不会触发 LLM/结构化分析;需要分析部署需求时请直接描述部署任务,或显式使用 `analyze <需求>` - `chat` 中非内置命令默认交给当前 LLM 做普通对话,不会自动触发部署 workflow需要分析部署需求时请显式使用 `analyze <需求>`,完整部署仍需 `run` 并逐步确认。
- `ask <问题>` 可显式普通对话;`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM。
- `action propose <需求>` 只展示 LLM 解析出的单 action 计划;`action run <action> [ip=...] [KEY=VALUE...]``action run llm <需求>` 会在用户输入 `yes` 后才执行单 action。
- 每个 action 完成后都会自动执行一次 LLM/规则审核;`--analyze-actions``llm action-analysis on` 只控制是否把详细审核结果写入 `events` - 每个 action 完成后都会自动执行一次 LLM/规则审核;`--analyze-actions``llm action-analysis on` 只控制是否把详细审核结果写入 `events`
- action 审核输入不包含完整运行态 `state_summary`,只包含当前 action 的结构化结果和必要诊断日志。 - action 审核输入不包含完整运行态 `state_summary`,只包含当前 action 的结构化结果和必要诊断日志。
- `poll-download-progress``poll-upgrade-progress` 是单次进度查询 action未完成时不会进入下一个 action最大查询次数和间隔可通过 `config.txt` 或 chat `set` 热更新。 - `poll-download-progress``poll-upgrade-progress` 是单次进度查询 action未完成时不会进入下一个 action最大查询次数和间隔可通过 `config.txt` 或 chat `set` 热更新。

View File

@ -198,8 +198,8 @@ LLM 环境变量:
5. action 失败或审核阻断后会暂停;修复后用 resume 从当前 action 重试,需要回滚时用 rollback 显式执行。 5. action 失败或审核阻断后会暂停;修复后用 resume 从当前 action 重试,需要回滚时用 rollback 显式执行。
6. chat 会在执行前归一化并展示实际写入脚本配置的参数script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。 6. chat 会在执行前归一化并展示实际写入脚本配置的参数script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。
7. PARENT_VERSION_NUMBER 是云下载可选参数;空值不发送,非空时传给 parentVersionNumber。 7. PARENT_VERSION_NUMBER 是云下载可选参数;空值不发送,非空时传给 parentVersionNumber。
8. chat 执行过程中会播报每个 action 的开始、完成或失败;普通问候不会触发 LLM/结构化分析 8. chat 执行过程中会播报每个 action 的开始、完成或失败;非内置输入默认交给 LLM 普通对话,不会自动触发部署 workflow
9. chat 内可使用 params、events、rollback、list checkpoints、load checkpoint、load params、llm config、llm test、mcp config 等命令。 9. chat 内可使用 ask、log analyze、action propose、action run、params、events、rollback、list checkpoints、load checkpoint、load params、llm config、llm test、mcp config 等命令。
10. 日志默认写入 logs/pam_deploy_agent.log按天切分并默认保留 14 个历史日切文件;日志会脱敏 token、secret、api_key、Authorization 等字段。 10. 日志默认写入 logs/pam_deploy_agent.log按天切分并默认保留 14 个历史日切文件;日志会脱敏 token、secret、api_key、Authorization 等字段。
11. checkpoint 会保存完整运行参数,请放在受控目录。 11. checkpoint 会保存完整运行参数,请放在受控目录。
HELP_TEXT HELP_TEXT

View File

@ -16,7 +16,7 @@ from typing import Any, Callable
from .action_router import ActionRouter, build_action_backends from .action_router import ActionRouter, build_action_backends
from .checkpoint_store import save_checkpoint from .checkpoint_store import save_checkpoint
from .config_writer import write_config from .config_writer import write_config
from .constants import DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, REQUIRED_PARAMS from .constants import ALLOWED_ACTIONS, DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, NODE_ACTIONS, REQUIRED_PARAMS
from .fake_runner import FakeActionRunner from .fake_runner import FakeActionRunner
from .llm import LlmClient, RuleBasedLlmClient, validate_deploy_plan, validate_intent_result from .llm import LlmClient, RuleBasedLlmClient, validate_deploy_plan, validate_intent_result
from .logging_utils import configure_logging, json_for_log from .logging_utils import configure_logging, json_for_log
@ -34,6 +34,8 @@ REQUIRED_ACTION_VALUES = {
PROGRESS_ACTIONS = {"poll-download-progress", "poll-upgrade-progress"} PROGRESS_ACTIONS = {"poll-download-progress", "poll-upgrade-progress"}
VERIFY_ACTION = "verify-ip" VERIFY_ACTION = "verify-ip"
IP_REQUIRED_ACTIONS = set(IP_ACTION_SEQUENCE) | {"stop-ip", "rollback-ip"}
SINGLE_ACTION_KWARGS = {"hash_code", "node_url", "stop_first", "timeout_sec"}
class PamDeployAgent: class PamDeployAgent:
@ -513,6 +515,119 @@ class PamDeployAgent:
logger.info("IP 部署完成 run_id=%s ip=%s", state.run_id, ip) logger.info("IP 部署完成 run_id=%s ip=%s", state.run_id, ip)
return None return None
def run_single_action(
self,
state: AgentState,
action: str,
*,
ip: str = "",
kwargs: dict[str, Any] | None = None,
) -> ActionResult:
"""执行一次独立 action并复用路由、审核、事件和 checkpoint。"""
kwargs = dict(kwargs or {})
self._validate_single_action_context(state, action, ip=ip, kwargs=kwargs)
route_kwargs = {key: value for key, value in kwargs.items() if key in SINGLE_ACTION_KWARGS}
if action == "publish-version":
route_kwargs["hash_code"] = route_kwargs.get("hash_code") or state.hash_code
if route_kwargs.get("node_url"):
state.node_url = str(route_kwargs["node_url"])
backend = state.action_backends.get(action, "script")
logger.info(
"单 action 开始 run_id=%s action=%s backend=%s ip=%s kwargs=%s",
state.run_id,
action,
backend,
ip,
json_for_log(route_kwargs),
)
self._emit_progress({"type": "ACTION_START", "stage": action, "backend": backend, "ip": ip})
try:
result = self.router.run_action(state, action, ip=ip or None, **route_kwargs)
except Exception as exc:
logger.exception("单 action 调用异常 run_id=%s action=%s backend=%s ip=%s", state.run_id, action, backend, ip)
result = ActionResult(
action=action,
backend=backend,
ok=False,
error_summary=str(exc),
)
logger.info("单 action 返回 run_id=%s action=%s result=%s", state.run_id, action, _action_result_for_log(result))
analysis = self._append_action_analysis(state, action, result, ip=ip or None)
failed = (not result.ok) or self._business_failed(action, result.values)
if failed:
message = result.error_summary or result.values.get("MESSAGE", "action 执行失败")
fail_event = {
"type": "SINGLE_ACTION_FAIL",
"stage": action,
"backend": result.backend,
"ip": ip,
"message": message,
}
state.events.append(fail_event)
self._emit_progress({"type": "ACTION_FAIL", "stage": action, "backend": result.backend, "ip": ip, "message": message})
state.last_failed_step = action
state.paused = True
state.pause_reason = "single_action_failed"
state.review_context = self._review_context(action=action, analysis=analysis, result=result, ip=ip or None)
self._save_checkpoint(state)
return result
if analysis is not None and not analysis.should_continue:
message = analysis.suggested_action or analysis.possible_reason or "LLM 审核要求暂停"
state.events.append(
{
"type": "SINGLE_ACTION_BLOCKED",
"stage": action,
"backend": result.backend,
"ip": ip,
"message": message,
}
)
state.last_failed_step = action
state.paused = True
state.pause_reason = "single_action_review_blocked"
state.review_context = self._review_context(action=action, analysis=analysis, result=result, ip=ip or None)
self._save_checkpoint(state)
return result
self._apply_result(state, action, result.values)
if ip and ip in state.ip_states:
self._apply_ip_result(state.ip_states[ip], action, result.values)
state.last_success_step = action
if state.last_failed_step == action:
state.last_failed_step = ""
done_message = self._progress_message(action, result, ip=ip or None) if action in PROGRESS_ACTIONS else result.values.get("MESSAGE", "ok")
done_event = {
"type": "SINGLE_ACTION_DONE",
"stage": action,
"backend": result.backend,
"ip": ip,
"message": done_message,
}
state.events.append(done_event)
self._emit_progress({"type": "ACTION_DONE", "stage": action, "backend": result.backend, "ip": ip, "message": done_message})
self._save_checkpoint(state)
logger.info("单 action 完成 run_id=%s action=%s ip=%s", state.run_id, action, ip)
return result
def _validate_single_action_context(
self,
state: AgentState,
action: str,
*,
ip: str = "",
kwargs: dict[str, Any] | None = None,
) -> None:
"""校验单 action 是否具备必要上下文。"""
kwargs = kwargs or {}
if action not in ALLOWED_ACTIONS:
raise ValueError(f"不支持的 action: {action}")
if action in IP_REQUIRED_ACTIONS and not ip:
raise ValueError(f"{action} 需要提供 ip")
if action == "publish-version" and not (kwargs.get("hash_code") or state.hash_code):
raise ValueError("publish-version 缺少 HASH_CODE请先执行 upload-package 或显式提供 hash_code=...")
backend = state.action_backends.get(action, "script")
if backend == "mcp" and action != "get-online-ips" and not (kwargs.get("node_url") or state.node_url):
raise ValueError(f"{action} 使用 MCP 时需要 NODE_URL请先执行 get-node-url 或显式提供 node_url=...")
def run_ip_action(self, state: AgentState, ip: str, action: str) -> AgentState: def run_ip_action(self, state: AgentState, ip: str, action: str) -> AgentState:
"""执行一个单 IP action失败时暂停并保留该 action 供 resume 重试。""" """执行一个单 IP action失败时暂停并保留该 action 供 resume 重试。"""
ip_state = state.ip_states[ip] ip_state = state.ip_states[ip]

View File

@ -19,17 +19,28 @@ from .llm import build_llm_client
from .llm.rule_based import RuleBasedLlmClient from .llm.rule_based import RuleBasedLlmClient
from .logging_utils import configure_logging, json_for_log, redact_for_log from .logging_utils import configure_logging, json_for_log, redact_for_log
from .mcp_factory import build_mcp_runner_from_config from .mcp_factory import build_mcp_runner_from_config
from .models import AgentState, ExecutionStrategy from .models import ActionResult, AgentState, ExecutionStrategy, LlmSingleActionProposal
from .params_loader import load_params_file from .params_loader import load_params_file
from .constants import ALLOWED_ACTIONS, IP_ACTION_SEQUENCE
from .action_router import build_action_backends
InputFunc = Callable[[str], str] InputFunc = Callable[[str], str]
OutputFunc = Callable[[str], None] OutputFunc = Callable[[str], None]
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DEFAULT_LOG_ANALYSIS_TAIL_LINES = 400
DEFAULT_LOG_ANALYSIS_MAX_BYTES = 64 * 1024
COMMAND_HELP = """可用命令: COMMAND_HELP = """可用命令:
help 显示帮助 help 显示帮助
preview 查看当前参数和执行策略 preview 查看当前参数和执行策略
ask <问题> 和当前 LLM 普通对话不触发部署执行
analyze <需求> 只做理解和计划不执行 analyze <需求> 只做理解和计划不执行
log analyze <路径> [问题] [--tail N] [--max-bytes N]
读取日志尾部并交给 LLM 分析
action propose <需求> LLM 解析单个 action 执行建议只展示不执行
action run <action> [ip=...] [KEY=VALUE...]
手工指定并确认后执行单个 action
action run llm <需求> LLM 解析并确认后执行单个 action
params 脱敏展示当前会话参数 params 脱敏展示当前会话参数
events [数量] 查看最近 action 事件默认 10 events [数量] 查看最近 action 事件默认 10
set KEY=VALUE 修改当前会话参数 set KEY=VALUE 修改当前会话参数
@ -48,7 +59,7 @@ COMMAND_HELP = """可用命令:
checkpoint 显示 checkpoint 路径 checkpoint 显示 checkpoint 路径
exit 退出 exit 退出
也可以直接输入自然语言需求Agent 会先分析并更新会话参数执行仍需输入 run 非内置命令会默认交给 LLM 普通对话完整部署仍需使用 analyze/run 并人工确认
执行中可按 Ctrl+C 中断保存 checkpoint 后再用 resume 继续 执行中可按 Ctrl+C 中断保存 checkpoint 后再用 resume 继续
""" """
@ -136,6 +147,9 @@ class InteractiveCliSession:
if normalized == "preview": if normalized == "preview":
self.output(self.agent.preview(self.params, self.strategy)) self.output(self.agent.preview(self.params, self.strategy))
return True return True
if normalized == "ask":
self._ask_llm(rest.strip())
return True
if normalized == "params": if normalized == "params":
self._show_params() self._show_params()
return True return True
@ -154,6 +168,12 @@ class InteractiveCliSession:
if normalized == "mcp": if normalized == "mcp":
self._configure_mcp(rest.strip()) self._configure_mcp(rest.strip())
return True return True
if normalized == "log":
self._handle_log_command(rest.strip())
return True
if normalized == "action":
self._handle_action_command(rest.strip())
return True
if normalized in ("run", "deploy", "execute"): if normalized in ("run", "deploy", "execute"):
self._run_deploy() self._run_deploy()
return True return True
@ -185,17 +205,7 @@ class InteractiveCliSession:
self._load_checkpoint(rest.strip()[len("checkpoint") :].strip()) self._load_checkpoint(rest.strip()[len("checkpoint") :].strip())
return True return True
if _is_small_talk(text): self._ask_llm(text)
logger.info("chat 输入识别为寒暄,跳过结构化分析")
self.output("你好。可以输入 help 查看命令,或直接描述部署需求;执行前仍需输入 run 并确认。")
return True
if not _looks_like_deploy_request(text):
logger.info("chat 输入未命中部署需求粗筛,跳过结构化分析")
self.output("我没有识别到明确的部署需求。可以输入 help 查看命令,或用 analyze <需求> 明确触发需求分析。")
return True
self.output("正在分析需求...")
self._analyze(text)
return True return True
def _analyze(self, text: str) -> None: def _analyze(self, text: str) -> None:
@ -274,13 +284,260 @@ class InteractiveCliSession:
events = self.state.events[-max(count, 1) :] events = self.state.events[-max(count, 1) :]
self.output(json.dumps(redact_mapping(events), ensure_ascii=False, indent=2, default=str)) self.output(json.dumps(redact_mapping(events), ensure_ascii=False, indent=2, default=str))
def _ask_llm(self, text: str) -> None:
"""把普通自然语言输入交给 LLM不触发部署 workflow。"""
if not text:
self.output("格式ask <问题>")
return
client_name = type(self.agent.llm_client).__name__
self.output(f"正在询问 LLM: {client_name}")
logger.info("chat 普通 LLM 对话开始 client=%s text=%s", client_name, redact_for_log(text, max_text_len=800))
try:
answer = self.agent.llm_client.chat(text, context=self._llm_chat_context())
except Exception as exc:
logger.exception("chat 普通 LLM 对话失败 client=%s", client_name)
self.output(f"LLM 对话失败: {exc}")
return
self.output(answer or "LLM 未返回内容。")
logger.info("chat 普通 LLM 对话完成 client=%s answer=%s", client_name, redact_for_log(answer, max_text_len=1200))
def _handle_log_command(self, text: str) -> None:
"""处理日志分析命令。"""
try:
parts = _split_command_args(text)
except ValueError as exc:
self.output(f"log 命令解析失败: {exc}")
return
if not parts or parts[0] != "analyze":
self.output("格式log analyze <路径> [问题] [--tail N] [--max-bytes N]")
return
try:
path, question, tail_lines, max_bytes = _parse_log_analyze_args(parts[1:])
except ValueError as exc:
self.output(f"log analyze 参数错误: {exc}")
return
if not path:
self.output("格式log analyze <路径> [问题] [--tail N] [--max-bytes N]")
return
try:
log_text = _read_log_tail(path, tail_lines=tail_lines, max_bytes=max_bytes)
except OSError as exc:
logger.exception("chat 日志读取失败 path=%s", path)
self.output(f"日志读取失败: {exc}")
return
if not log_text.strip():
self.output(f"日志文件没有可分析内容: {path}")
return
client_name = type(self.agent.llm_client).__name__
self.output(f"正在分析日志: {path}")
logger.info(
"chat 日志分析开始 client=%s path=%s tail=%s max_bytes=%s question=%s text_len=%s",
client_name,
path,
tail_lines,
max_bytes,
redact_for_log(question, max_text_len=300),
len(log_text),
)
try:
answer = self.agent.llm_client.analyze_log(log_text, question=question or None, source_path=str(path))
except Exception as exc:
logger.exception("chat 日志分析失败 client=%s path=%s", client_name, path)
self.output(f"日志分析失败: {exc}")
return
self.output(answer or "LLM 未返回日志分析结果。")
logger.info("chat 日志分析完成 path=%s answer=%s", path, redact_for_log(answer, max_text_len=1200))
def _handle_action_command(self, text: str) -> None:
"""处理单 action proposal 和执行命令。"""
try:
parts = _split_command_args(text)
except ValueError as exc:
self.output(f"action 命令解析失败: {exc}")
return
if not parts:
self.output("格式action propose <需求> | action run <action> [ip=...] [KEY=VALUE...] | action run llm <需求>")
return
command = parts[0]
if command == "propose":
request = text.partition("propose")[2].strip()
self._propose_action(request, execute=False)
return
if command == "run":
request = text.partition("run")[2].strip()
if not request:
self.output("格式action run <action> [ip=...] [KEY=VALUE...] | action run llm <需求>")
return
if request.lower().startswith("llm "):
proposal = self._propose_action(request[4:].strip(), execute=True)
if proposal and proposal.action:
self._run_single_action_with_confirmation(proposal)
return
try:
proposal = _parse_manual_action_run(request)
except ValueError as exc:
self.output(f"action run 参数错误: {exc}")
return
self._run_single_action_with_confirmation(proposal)
return
self.output("未知 action 命令。格式action propose <需求> | action run <action> ...")
def _propose_action(self, text: str, *, execute: bool) -> LlmSingleActionProposal | None:
"""调用 LLM 解析单 action 建议,并展示结果。"""
if not text:
self.output("请输入要解析的单 action 需求。")
return None
client_name = type(self.agent.llm_client).__name__
self.output(f"正在解析单 action: {client_name}")
logger.info("chat 单 action proposal 开始 execute=%s text=%s", execute, redact_for_log(text, max_text_len=800))
try:
proposal = self.agent.llm_client.propose_action(
text,
list(ALLOWED_ACTIONS),
self.params,
state_summary=self._state_summary_for_llm(),
)
except Exception as exc:
logger.exception("chat 单 action proposal 失败")
self.output(f"单 action 解析失败: {exc}")
return None
if not proposal.action:
self.output("LLM 未识别到明确 action。请写出具体 action 名,或使用 action run <action> 手工指定。")
logger.info("chat 单 action proposal 未识别 action proposal=%s", json_for_log(asdict(proposal)))
return proposal
if not execute:
self.output(_format_action_proposal(proposal, self._backend_for_action(proposal.action)))
logger.info("chat 单 action proposal 完成 execute=%s proposal=%s", execute, json_for_log(asdict(proposal)))
return proposal
def _run_single_action_with_confirmation(self, proposal: LlmSingleActionProposal) -> None:
"""展示单 action 计划,确认后执行。"""
if proposal.action not in ALLOWED_ACTIONS:
self.output(f"不支持的 action: {proposal.action}")
return
if not self._prepare_params_for_run():
return
problems = self._validate_single_action_prerequisites(proposal)
if problems:
self.output("单 action 执行前检查未通过:")
for problem in problems:
self.output(f"- {problem}")
return
self.output(_format_action_proposal(proposal, self._backend_for_action(proposal.action)))
if not self._ask_yes_no("确认执行此单 action 请输入 yes: "):
self.output("已取消单 action 执行。")
logger.info("chat 单 action 执行被用户取消 proposal=%s", json_for_log(asdict(proposal)))
return
if not self._ensure_state_for_single_action(proposal):
return
if self.state is None:
self.output("当前没有运行状态。")
return
try:
result = self.agent.run_single_action(
self.state,
proposal.action,
ip=proposal.ip,
kwargs=proposal.kwargs,
)
except Exception as exc:
logger.exception("chat 单 action 执行失败 proposal=%s", json_for_log(asdict(proposal)))
self.output(f"单 action 执行失败: {exc}")
self._print_pause_context()
return
self.output("单 action 执行完成" if result.ok and not self.state.paused else "单 action 执行已停止")
self.output(_format_action_result(result))
self._print_pause_context()
self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}")
def _ensure_state_for_single_action(self, proposal: LlmSingleActionProposal) -> bool:
"""确保单 action 有可用 state没有时用当前参数创建临时 state。"""
if self.state is None:
target_ips = list(self.target_ips)
if proposal.ip and proposal.ip not in target_ips:
target_ips.append(proposal.ip)
self.state = self.agent.create_state(
params=self.params,
execution_strategy=self.strategy,
checkpoint_path=self.checkpoint_path,
target_ips=target_ips,
)
self.graph_runtime = None
logger.info("chat 单 action 创建临时 state run_id=%s checkpoint=%s", self.state.run_id, self.state.checkpoint_path)
if proposal.ip and proposal.ip not in self.state.target_ips:
self.state.target_ips.append(proposal.ip)
if proposal.ip and proposal.ip not in self.state.ip_states:
self.state.ip_states[proposal.ip] = {
"status": "RUNNING",
"completed_steps": [],
"failed_stage": "",
"failure_reason": "",
"rollback_status": "ROLLBACK_NOT_RUN",
"rollback_stop_first": False,
"log_file": "",
}
return True
def _validate_single_action_prerequisites(self, proposal: LlmSingleActionProposal) -> list[str]:
"""在执行单 action 前做 chat 层友好检查。"""
problems: list[str] = []
action = proposal.action
backend = self._backend_for_action(action)
if action in (set(IP_ACTION_SEQUENCE) | {"stop-ip", "rollback-ip"}) and not proposal.ip:
problems.append(f"{action} 需要提供 ip例如action run {action} ip=192.168.1.10")
if backend == "mcp" and self.agent.mcp_runner is None:
problems.append("当前 action 会路由到 MCP但尚未配置 MCP runner。请启动时传 --mcp-config 或执行 mcp config <路径>。")
if backend == "script":
script_entry = self.agent.script_base_dir / "deploy.sh"
ps_entry = self.agent.script_base_dir / "deploy.ps1"
if not script_entry.exists() and not ps_entry.exists():
problems.append(f"脚本入口不存在: {script_entry}{ps_entry}")
if action == "upload-package" and self.strategy != "fake":
zip_path = str(self.params.get("ZIP_FILE_PATH", "")).strip()
if not _path_exists(zip_path):
problems.append(f"ZIP_FILE_PATH 不存在: {zip_path}")
return problems
def _backend_for_action(self, action: str) -> str:
"""读取当前 action 后端,优先使用已有 state。"""
if self.state is not None:
return self.state.action_backends.get(action, "script")
return build_action_backends(self.strategy).get(action, "script")
def _llm_chat_context(self) -> dict[str, Any]:
"""构造普通对话上下文,敏感参数先脱敏。"""
return {
"strategy": self.strategy,
"params": redact_mapping(self.params),
"target_ips": list(self.target_ips),
"checkpoint_path": self.checkpoint_path,
"state": self._state_summary_for_llm(),
}
def _state_summary_for_llm(self) -> dict[str, Any]:
"""构造给 LLM 的最小 state 摘要。"""
if self.state is None:
return {}
return {
"run_id": self.state.run_id,
"strategy": self.state.execution_strategy,
"paused": self.state.paused,
"pause_reason": self.state.pause_reason,
"pending_confirmation": self.state.pending_confirmation,
"last_success_step": self.state.last_success_step,
"last_failed_step": self.state.last_failed_step,
"hash_code_present": bool(self.state.hash_code),
"node_url_present": bool(self.state.node_url),
"target_ips": list(self.state.target_ips),
}
def _configure_llm(self, text: str) -> None: def _configure_llm(self, text: str) -> None:
"""热加载 LLM 配置,或开关 action 后诊断。""" """热加载 LLM 配置,或开关 action 后诊断。"""
if not text: if not text:
self.output("格式llm config base_url=... api_key=... model=... action_analysis_prompt_file=... | llm test [文本] | llm fallback | llm action-analysis on|off") self.output("格式llm config base_url=... api_key=... model=... action_analysis_prompt_file=... | llm test [文本] | llm fallback | llm action-analysis on|off")
return return
try: try:
parts = shlex.split(text) parts = _split_command_args(text)
except ValueError as exc: except ValueError as exc:
logger.exception("chat LLM 命令解析失败 text=%s", redact_for_log(text, max_text_len=500)) logger.exception("chat LLM 命令解析失败 text=%s", redact_for_log(text, max_text_len=500))
self.output(f"LLM 命令解析失败: {exc}") self.output(f"LLM 命令解析失败: {exc}")
@ -958,14 +1215,170 @@ def _parse_key_values(parts: list[str]) -> dict[str, str]:
continue continue
key, value = part.split("=", 1) key, value = part.split("=", 1)
if key: if key:
values[key] = value values[key] = _strip_outer_quotes(value)
return values return values
def _parse_log_analyze_args(parts: list[str]) -> tuple[Path, str, int, int]:
"""解析 `log analyze` 参数。"""
if not parts:
raise ValueError("缺少日志路径")
path = Path(parts[0]).expanduser()
tail_lines = DEFAULT_LOG_ANALYSIS_TAIL_LINES
max_bytes = DEFAULT_LOG_ANALYSIS_MAX_BYTES
question_parts: list[str] = []
index = 1
while index < len(parts):
part = parts[index]
if part == "--tail":
index += 1
if index >= len(parts):
raise ValueError("--tail 需要提供行数")
tail_lines = _positive_int(parts[index], "--tail")
elif part == "--max-bytes":
index += 1
if index >= len(parts):
raise ValueError("--max-bytes 需要提供字节数")
max_bytes = _positive_int(parts[index], "--max-bytes")
else:
question_parts.append(part)
index += 1
return path, " ".join(question_parts).strip(), tail_lines, max_bytes
def _read_log_tail(path: Path, *, tail_lines: int, max_bytes: int) -> str:
"""读取日志尾部文本,并先做脱敏和截断。"""
if not path.exists():
raise OSError(f"日志文件不存在: {path}")
if not path.is_file():
raise OSError(f"不是普通日志文件: {path}")
size = path.stat().st_size
with path.open("rb") as handle:
if size > max_bytes:
handle.seek(max(size - max_bytes, 0))
raw = handle.read(max_bytes)
marker = f"[只读取尾部 {max_bytes} 字节,文件总大小 {size} 字节]\n"
else:
raw = handle.read()
marker = ""
text = raw.decode("utf-8", errors="replace")
lines = text.splitlines()
if len(lines) > tail_lines:
lines = lines[-tail_lines:]
marker += f"[只保留尾部 {tail_lines} 行]\n"
return marker + str(redact_for_log("\n".join(lines), max_text_len=max_bytes))
def _parse_manual_action_run(text: str) -> LlmSingleActionProposal:
"""解析 `action run <action> [ip=...] [KEY=VALUE...]`。"""
parts = _split_command_args(text)
if not parts:
raise ValueError("缺少 action 名")
action = parts[0]
if action not in ALLOWED_ACTIONS:
raise ValueError(f"不支持的 action: {action}")
values = _parse_key_values(parts[1:])
ip = values.pop("ip", values.pop("IP", ""))
kwargs: dict[str, Any] = {}
for key, value in values.items():
normalized_key = _single_action_kwarg_name(key)
kwargs[normalized_key] = _single_action_value(normalized_key, value)
risk = "high" if action in ("publish-version", "create-download-task", "upgrade-ip", "start-ip", "stop-ip", "rollback-ip") else "medium"
return LlmSingleActionProposal(
action=action,
ip=ip,
kwargs=kwargs,
reason="用户手工指定单 action。",
risk_level=risk, # type: ignore[arg-type]
requires_confirmation=True,
)
def _single_action_kwarg_name(key: str) -> str:
"""把命令行参数名转换为 Agent 单 action kwargs。"""
aliases = {
"HASH_CODE": "hash_code",
"hashCode": "hash_code",
"hash-code": "hash_code",
"NODE_URL": "node_url",
"nodeUrl": "node_url",
"node-url": "node_url",
"stop-first": "stop_first",
"STOP_FIRST": "stop_first",
"timeout": "timeout_sec",
"TIMEOUT": "timeout_sec",
}
return aliases.get(key, key)
def _single_action_value(key: str, value: str) -> Any:
"""解析单 action kwargs 的基础类型。"""
if key == "stop_first":
return value.strip().lower() in ("1", "true", "yes", "y", "on")
if key == "timeout_sec":
return int(value)
return value
def _format_action_proposal(proposal: LlmSingleActionProposal, backend: str) -> str:
"""格式化单 action 执行建议。"""
safe_kwargs = redact_mapping(proposal.kwargs)
lines = [
"单 action 计划:",
f"- action: {proposal.action or '-'}",
f"- backend: {backend or '-'}",
f"- ip: {proposal.ip or '-'}",
f"- risk: {proposal.risk_level}",
f"- confirmation: {'required' if proposal.requires_confirmation else 'not-required'}",
]
if proposal.reason:
lines.append(f"- reason: {proposal.reason}")
if safe_kwargs:
lines.append("- kwargs: " + json.dumps(safe_kwargs, ensure_ascii=False, sort_keys=True))
return "\n".join(lines)
def _format_action_result(result: ActionResult) -> str:
"""格式化单 action 执行结果摘要。"""
payload = {
"action": result.action,
"backend": result.backend,
"ok": result.ok,
"exit_code": result.exit_code,
"tool_name": result.tool_name,
"values": result.values,
"error_summary": result.error_summary,
}
return json.dumps(redact_mapping(payload), ensure_ascii=False, indent=2, default=str)
def _positive_int(value: str, name: str) -> int:
"""解析正整数参数。"""
try:
number = int(value)
except ValueError as exc:
raise ValueError(f"{name} 必须是整数") from exc
if number <= 0:
raise ValueError(f"{name} 必须大于 0")
return number
def _split_command_args(text: str) -> list[str]:
"""按 chat 命令语义拆分参数,并保留 Windows 路径中的反斜杠。"""
return [_strip_outer_quotes(part) for part in shlex.split(text, posix=False)]
def _strip_outer_quotes(value: str) -> str:
"""去掉 shlex(posix=False) 保留下来的成对引号。"""
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
return value[1:-1]
return value
def _parse_rollback_args(text: str) -> tuple[str, bool | None, str]: def _parse_rollback_args(text: str) -> tuple[str, bool | None, str]:
"""解析 chat rollback 命令参数,返回 IP、停机覆盖值和备注。""" """解析 chat rollback 命令参数,返回 IP、停机覆盖值和备注。"""
try: try:
parts = shlex.split(text) parts = _split_command_args(text)
except ValueError as exc: except ValueError as exc:
raise ValueError(str(exc)) from exc raise ValueError(str(exc)) from exc
ip = "" ip = ""
@ -1002,56 +1415,6 @@ def _find_current_failed_ip(state: AgentState) -> str:
return "" return ""
def _is_small_talk(text: str) -> bool:
"""识别不应触发 LLM/结构化分析的简单寒暄。"""
normalized = text.strip().lower()
return normalized in {
"你好",
"您好",
"hello",
"hi",
"hey",
"在吗",
"谢谢",
"thanks",
"thank you",
}
def _looks_like_deploy_request(text: str) -> bool:
"""粗筛自然语言部署需求,避免任意闲聊都触发耗时分析。"""
lowered = text.lower()
deploy_keywords = (
"部署",
"发布",
"升级",
"回滚",
"预演",
"执行",
"pam",
"mcp",
"node",
"版本",
"机场",
"deploy",
"release",
"upgrade",
"rollback",
"preview",
)
param_markers = (
"HOME_BASE_URL",
"CLIENT_ID",
"AIRPORT_CODE",
"APP_NAME",
"MODULE_NAME",
"VERSION_NUMBER",
"PARENT_VERSION_NUMBER",
"ZIP_FILE_PATH",
)
return any(keyword in lowered for keyword in deploy_keywords) or any(marker in text for marker in param_markers)
def _path_exists(path: str) -> bool: def _path_exists(path: str) -> bool:
"""检查本地路径是否存在,兼容打包到 Linux 后的绝对路径。""" """检查本地路径是否存在,兼容打包到 Linux 后的绝对路径。"""
if not path: if not path:
@ -1073,7 +1436,12 @@ def _build_prompt_input(input_func: InputFunc) -> InputFunc:
commands = [ commands = [
"help", "help",
"preview", "preview",
"ask",
"analyze", "analyze",
"log analyze",
"action propose",
"action run",
"action run llm",
"params", "params",
"events", "events",
"set", "set",

View File

@ -11,6 +11,7 @@ from pam_deploy_graph.models import (
LlmDeployPlan, LlmDeployPlan,
LlmIntentResult, LlmIntentResult,
LlmParamResult, LlmParamResult,
LlmSingleActionProposal,
) )
@ -43,3 +44,21 @@ class LlmClient(Protocol):
) -> LlmActionAnalysis: ) -> LlmActionAnalysis:
"""分析 action 执行结果,并给出是否允许继续执行的建议。""" """分析 action 执行结果,并给出是否允许继续执行的建议。"""
... ...
def chat(self, text: str, context: dict[str, Any] | None = None) -> str:
"""进行普通自然语言对话,不触发部署 workflow。"""
...
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""分析日志文本并给出异常摘要、原因和建议。"""
...
def propose_action(
self,
text: str,
allowed_actions: list[str],
params: dict[str, Any],
state_summary: dict[str, Any] | None = None,
) -> LlmSingleActionProposal:
"""把自然语言解析为单次 action 调用建议。"""
...

View File

@ -23,10 +23,19 @@ from pam_deploy_graph.constants import (
SENSITIVE_KEYS, SENSITIVE_KEYS,
) )
from pam_deploy_graph.logging_utils import json_for_log, redact_for_log from pam_deploy_graph.logging_utils import json_for_log, redact_for_log
from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult from pam_deploy_graph.models import ExecutionStrategy, LlmDeployPlan, LlmIntentResult, LlmParamResult, LlmSingleActionProposal
from pam_deploy_graph.models import ActionResult, LlmActionAnalysis from pam_deploy_graph.models import ActionResult, LlmActionAnalysis
from .prompts import ACTION_ANALYSIS_PROMPT, INTENT_PROMPT, PARAM_PROMPT, PLAN_PROMPT, SYSTEM_PROMPT from .prompts import (
ACTION_ANALYSIS_PROMPT,
CHAT_PROMPT,
INTENT_PROMPT,
LOG_ANALYSIS_PROMPT,
PARAM_PROMPT,
PLAN_PROMPT,
SINGLE_ACTION_PROMPT,
SYSTEM_PROMPT,
)
JsonTransport = Callable[[str, dict[str, str], dict[str, Any], float], dict[str, Any]] JsonTransport = Callable[[str, dict[str, str], dict[str, Any], float], dict[str, Any]]
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -172,6 +181,59 @@ class OpenAICompatibleLlmClient:
notes=_string_list(payload.get("notes")), notes=_string_list(payload.get("notes")),
) )
def chat(self, text: str, context: dict[str, Any] | None = None) -> str:
"""调用 LLM 做普通对话,不要求 JSON 响应。"""
return self._complete_text(
"chat",
CHAT_PROMPT,
{
"user_text": text,
"context": _redact_sensitive(context or {}),
},
)
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""调用 LLM 分析日志尾部摘要。"""
return self._complete_text(
"analyze_log",
LOG_ANALYSIS_PROMPT,
{
"source_path": source_path,
"question": question or "请分析日志中的异常、可能原因和下一步建议。",
"log_tail": redact_for_log(log_text, max_text_len=64000),
},
)
def propose_action(
self,
text: str,
allowed_actions: list[str],
params: dict[str, Any],
state_summary: dict[str, Any] | None = None,
) -> LlmSingleActionProposal:
"""调用 LLM 把自然语言解析为单 action 调用建议。"""
payload = self._complete_json(
"propose_action",
SINGLE_ACTION_PROMPT,
{
"user_text": text,
"allowed_actions": allowed_actions,
"params": _redact_sensitive(params),
"state_summary": _redact_sensitive(state_summary or {}),
},
)
action = _string(payload, "action", "")
if action not in allowed_actions:
action = ""
return LlmSingleActionProposal(
action=action,
ip=_string(payload, "ip", ""),
kwargs=_dict(payload.get("kwargs")),
reason=_string(payload, "reason", ""),
risk_level=_risk_level(payload.get("risk_level")),
requires_confirmation=True,
)
def _complete_json(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> dict[str, Any]: def _complete_json(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> dict[str, Any]:
"""发送 chat/completions 请求,并解析 JSON 对象响应。""" """发送 chat/completions 请求,并解析 JSON 对象响应。"""
started_at = time.perf_counter() started_at = time.perf_counter()
@ -237,6 +299,53 @@ class OpenAICompatibleLlmClient:
) )
return parsed return parsed
def _complete_text(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> str:
"""发送 chat/completions 请求,并返回普通文本响应。"""
started_at = time.perf_counter()
endpoint = _chat_completions_url(self.base_url)
request_payload = {
"model": self.model,
"temperature": self.temperature,
"messages": [
{"role": "system", "content": instruction},
{
"role": "user",
"content": "输入 JSON:\n" + json.dumps(input_payload, ensure_ascii=False, sort_keys=True),
},
],
}
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
logger.info(
"LLM 文本请求开始 operation=%s endpoint=%s model=%s timeout=%s has_api_key=%s input=%s",
operation,
endpoint,
self.model,
self.timeout_sec,
bool(self.api_key),
json_for_log(input_payload, max_text_len=1600),
)
try:
response = self.transport(endpoint, headers, request_payload, self.timeout_sec)
content = str(_message_content(response))
except Exception:
logger.exception(
"LLM 文本请求失败 operation=%s endpoint=%s duration_ms=%s input=%s",
operation,
endpoint,
int((time.perf_counter() - started_at) * 1000),
json_for_log(input_payload, max_text_len=1600),
)
raise
logger.info(
"LLM 文本请求完成 operation=%s duration_ms=%s content=%s",
operation,
int((time.perf_counter() - started_at) * 1000),
redact_for_log(content, max_text_len=1600),
)
return content.strip()
def _default_transport( def _default_transport(
url: str, url: str,
@ -401,6 +510,14 @@ def _optional_bool(value: Any) -> bool | None:
return bool(value) return bool(value)
def _risk_level(value: Any) -> str:
"""解析单 action 风险等级,非法值降级为 medium。"""
text = str(value or "").strip().lower()
if text in ("low", "medium", "high"):
return text
return "medium"
def _dict(value: Any) -> dict[str, Any]: def _dict(value: Any) -> dict[str, Any]:
"""确保返回 dict非法值降级为空 dict。""" """确保返回 dict非法值降级为空 dict。"""
return value if isinstance(value, dict) else {} return value if isinstance(value, dict) else {}

View File

@ -100,3 +100,43 @@ ACTION_ANALYSIS_PROMPT = """分析一次 PAM action 执行结果。
- 脚本正常过程日志不会作为错误依据不能因为日志来自 stderr 就判定异常 - 脚本正常过程日志不会作为错误依据不能因为日志来自 stderr 就判定异常
- 不要输出密钥tokenAuthorization 或完整日志原文 - 不要输出密钥tokenAuthorization 或完整日志原文
""" """
CHAT_PROMPT = """你是 PAM 部署 Agent 的交互助手。
要求
- 可以回答普通问题解释当前 Agent 的命令和部署流程
- 不要自动触发部署回滚升级脚本执行或 MCP 调用
- 如果用户想执行完整部署提示使用 `analyze <需求>` 先分析确认后再输入 `run`
- 如果用户想单独执行 action提示使用 `action propose <需求>` `action run ...`执行前仍需要人工确认
- 不要输出密钥tokenAuthorizationCLIENT_SECRET api_key
"""
LOG_ANALYSIS_PROMPT = """分析 PAM Agent 或部署脚本日志。
要求
- 优先总结异常现象可能原因和建议下一步
- 不要输出密钥tokenAuthorizationCLIENT_SECRET api_key
- 输入通常是日志尾部摘要不代表完整文件
- 不要因为日志来自 stderr 就直接判定失败要结合 ERRORExceptionfail状态码和上下文判断
"""
SINGLE_ACTION_PROMPT = """把用户自然语言解析成一次 PAM action 调用建议。
输出 JSON schema
{
"action": "get-token",
"ip": "",
"kwargs": {},
"reason": "...",
"risk_level": "low|medium|high",
"requires_confirmation": true
}
要求
- `action` 必须来自输入的 allowed_actions不能识别明确 action 时返回空字符串
- 不要猜测危险 action不要自动规划多个 action
- IP action 必须尽量提取 `ip`
- `ip` 外的额外参数放入 `kwargs`
- 所有 action 都必须 `requires_confirmation=true`
- 不要输出密钥tokenAuthorizationCLIENT_SECRET api_key
"""

View File

@ -11,7 +11,7 @@ import re
from dataclasses import asdict from dataclasses import asdict
from typing import Any from typing import Any
from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS, SENSITIVE_KEYS
from pam_deploy_graph.logging_utils import json_for_log, redact_for_log from pam_deploy_graph.logging_utils import json_for_log, redact_for_log
from pam_deploy_graph.models import ( from pam_deploy_graph.models import (
ActionResult, ActionResult,
@ -20,6 +20,7 @@ from pam_deploy_graph.models import (
LlmDeployPlan, LlmDeployPlan,
LlmIntentResult, LlmIntentResult,
LlmParamResult, LlmParamResult,
LlmSingleActionProposal,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -56,6 +57,79 @@ KEY_ALIASES = {
class RuleBasedLlmClient: class RuleBasedLlmClient:
"""基于规则的轻量 LLM client fallback。""" """基于规则的轻量 LLM client fallback。"""
def chat(self, text: str, context: dict[str, Any] | None = None) -> str:
"""规则 fallback 的普通对话说明。"""
logger.info("规则 LLM 普通对话 text=%s context=%s", redact_for_log(text, max_text_len=800), json_for_log(context or {}))
lowered = text.lower()
if any(word in lowered for word in ("help", "帮助", "怎么用", "命令")):
return (
"当前是本地规则 LLM fallback。可用 `analyze <需求>` 分析部署需求,`run` 执行完整 workflow"
"`action propose <需求>` 解析单个 action`action run ...` 确认后执行单个 action"
"`log analyze <路径>` 分析日志尾部。"
)
return (
"当前未配置真实 LLM已使用本地规则 fallback。普通闲聊只能给出有限说明"
"如需自然语言问答、日志深度分析或更准确的 action 解析,请配置真实 LLM。"
)
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""用本地规则分析日志尾部。"""
logger.info("规则 LLM 日志分析 source=%s question=%s text_len=%s", source_path, redact_for_log(question or "", max_text_len=300), len(log_text))
lines = log_text.splitlines()
problem_lines = [
line
for line in lines
if re.search(r"error|exception|fail|traceback|timeout|refused|denied|失败|异常|错误|超时", line, flags=re.IGNORECASE)
]
summary = [
f"日志来源: {source_path or '-'}",
f"已分析尾部 {len(lines)} 行。",
]
if question:
summary.append(f"关注问题: {question}")
if problem_lines:
summary.append(f"发现 {len(problem_lines)} 行疑似异常,最近几条:")
summary.extend(f"- {redact_for_log(line, max_text_len=240)}" for line in problem_lines[-5:])
summary.append("建议:优先检查以上异常附近的接口返回、网络连通性、认证信息和目标服务状态。")
else:
summary.append("未在日志尾部发现明显 ERROR/Exception/fail/timeout 关键字。")
summary.append("建议:如问题仍存在,请扩大 `--tail` 或提供更具体的问题描述。")
return "\n".join(summary)
def propose_action(
self,
text: str,
allowed_actions: list[str],
params: dict[str, Any],
state_summary: dict[str, Any] | None = None,
) -> LlmSingleActionProposal:
"""只在用户明确写出 action 名时生成单 action 建议。"""
logger.info(
"规则 LLM 单 action 解析开始 text=%s allowed=%s state=%s",
redact_for_log(text, max_text_len=800),
allowed_actions,
json_for_log(state_summary or {}),
)
action = ""
lowered = text.lower()
for candidate in allowed_actions:
if candidate.lower() in lowered:
action = candidate
break
ip_match = re.search(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", text)
kwargs = _safe_action_kwargs(self._extract_key_values(text))
risk = "high" if action in ("publish-version", "create-download-task", "upgrade-ip", "start-ip", "stop-ip", "rollback-ip") else "medium"
proposal = LlmSingleActionProposal(
action=action,
ip=ip_match.group(0) if ip_match else "",
kwargs=kwargs,
reason="规则 fallback 仅在输入中出现明确 action 名时生成建议。" if action else "未识别到明确 action 名。",
risk_level=risk, # type: ignore[arg-type]
requires_confirmation=True,
)
logger.info("规则 LLM 单 action 解析完成 proposal=%s", json_for_log(asdict(proposal)))
return proposal
def understand_request(self, text: str) -> LlmIntentResult: def understand_request(self, text: str) -> LlmIntentResult:
"""用关键词规则识别用户意图和执行策略偏好。""" """用关键词规则识别用户意图和执行策略偏好。"""
logger.info("规则 LLM 意图识别开始 text=%s", redact_for_log(text, max_text_len=800)) logger.info("规则 LLM 意图识别开始 text=%s", redact_for_log(text, max_text_len=800))
@ -334,3 +408,8 @@ def _progress_note(values: dict[str, Any]) -> str:
def _lower_value(value: Any) -> str: def _lower_value(value: Any) -> str:
"""把字段值转成小写字符串。""" """把字段值转成小写字符串。"""
return str(value).strip().lower() if value is not None else "" return str(value).strip().lower() if value is not None else ""
def _safe_action_kwargs(values: dict[str, str]) -> dict[str, str]:
"""过滤单 action 额外参数,避免把敏感字段放入执行建议。"""
return {key: value for key, value in values.items() if key not in SENSITIVE_KEYS}

View File

@ -11,6 +11,7 @@ IntentName = Literal["deploy", "show_usage", "preview", "query_node_ips", "rollb
ModePreference = Literal["MCP", "API脚本", "未指定"] ModePreference = Literal["MCP", "API脚本", "未指定"]
StrategyPreference = Literal["hybrid_node_mcp", "script_only", "fake", "未指定"] StrategyPreference = Literal["hybrid_node_mcp", "script_only", "fake", "未指定"]
ActionAnalysisSeverity = Literal["info", "low", "medium", "high"] ActionAnalysisSeverity = Literal["info", "low", "medium", "high"]
ActionRiskLevel = Literal["low", "medium", "high"]
@dataclass(slots=True) @dataclass(slots=True)
@ -104,6 +105,18 @@ class LlmActionAnalysis:
notes: list[str] = field(default_factory=list) notes: list[str] = field(default_factory=list)
@dataclass(slots=True)
class LlmSingleActionProposal:
"""LLM 对单次 action 调用的结构化建议。"""
action: str
ip: str = ""
kwargs: dict[str, Any] = field(default_factory=dict)
reason: str = ""
risk_level: ActionRiskLevel = "medium"
requires_confirmation: bool = True
@dataclass(slots=True) @dataclass(slots=True)
class AgentState: class AgentState:
"""一次部署运行的完整状态,可序列化到 checkpoint。""" """一次部署运行的完整状态,可序列化到 checkpoint。"""

View File

@ -7,7 +7,7 @@ import pytest
from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.agent import PamDeployAgent
from pam_deploy_graph.fake_runner import FakeActionRunner from pam_deploy_graph.fake_runner import FakeActionRunner
from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input
from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult, LlmSingleActionProposal
PARAMS = { PARAMS = {
@ -41,6 +41,9 @@ class BlockingReviewLlmClient:
class FakeTestableLlmClient: class FakeTestableLlmClient:
def __init__(self) -> None: def __init__(self) -> None:
self.requests: list[str] = [] self.requests: list[str] = []
self.chat_requests: list[tuple[str, dict]] = []
self.log_requests: list[tuple[str, str | None, str]] = []
self.proposal_requests: list[str] = []
def understand_request(self, text: str) -> LlmIntentResult: def understand_request(self, text: str) -> LlmIntentResult:
self.requests.append(text) self.requests.append(text)
@ -61,6 +64,24 @@ class FakeTestableLlmClient:
def analyze_action_result(self, *, action, result): def analyze_action_result(self, *, action, result):
return LlmActionAnalysis(action=action) return LlmActionAnalysis(action=action)
def chat(self, text, context=None):
self.chat_requests.append((text, context or {}))
return f"chat answer: {text}"
def analyze_log(self, log_text, question=None, source_path=""):
self.log_requests.append((log_text, question, source_path))
return "log analysis answer"
def propose_action(self, text, allowed_actions, params, state_summary=None):
self.proposal_requests.append(text)
return LlmSingleActionProposal(
action="verify-ip" if "verify" in text else "get-online-ips",
ip="192.168.1.10" if "192.168.1.10" in text else "",
reason="test proposal",
risk_level="medium",
requires_confirmation=True,
)
class FlakyVerifyRunner(FakeActionRunner): class FlakyVerifyRunner(FakeActionRunner):
"""第一次 verify-ip 失败,后续恢复成功,用于覆盖断点重试。""" """第一次 verify-ip 失败,后续恢复成功,用于覆盖断点重试。"""
@ -177,9 +198,10 @@ def test_chat_run_prints_progress_poll_updates(tmp_path: Path):
assert "poll-download-progress" in session.state.completed_global_steps assert "poll-download-progress" in session.state.completed_global_steps
def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path): def test_chat_greeting_goes_to_llm_chat_without_structured_analysis(tmp_path: Path):
llm = FakeTestableLlmClient()
session = InteractiveCliSession( session = InteractiveCliSession(
agent=PamDeployAgent(), agent=PamDeployAgent(llm_client=llm),
params=PARAMS, params=PARAMS,
strategy="fake", strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"), checkpoint_path=str(tmp_path / "checkpoint.json"),
@ -188,10 +210,132 @@ def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path):
output = run_session(session, ["你好", "exit"]) output = run_session(session, ["你好", "exit"])
assert session.last_analysis is None assert session.last_analysis is None
assert any("可以输入 help 查看命令" in item for item in output) assert llm.chat_requests[0][0] == "你好"
assert any("正在询问 LLM: FakeTestableLlmClient" in item for item in output)
assert any("chat answer: 你好" in item for item in output)
assert not any("已生成结构化理解" in item for item in output) assert not any("已生成结构化理解" in item for item in output)
def test_chat_ask_command_uses_llm_chat(tmp_path: Path):
llm = FakeTestableLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 这个 agent 能做什么", "exit"])
assert llm.chat_requests[0][0] == "这个 agent 能做什么"
assert llm.chat_requests[0][1]["params"]["CLIENT_SECRET"] == "***"
assert any("chat answer: 这个 agent 能做什么" in item for item in output)
def test_chat_log_analyze_reads_tail_and_redacts(tmp_path: Path):
llm = FakeTestableLlmClient()
log_path = tmp_path / "agent.log"
log_path.write_text(
"\n".join(
[
"line 1 CLIENT_SECRET=real-secret",
"line 2 ok",
"line 3 ERROR failed",
]
),
encoding="utf-8",
)
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, [f"log analyze {log_path} 请找异常 --tail 2", "exit"])
log_text, question, source_path = llm.log_requests[0]
assert "line 1" not in log_text
assert "real-secret" not in log_text
assert "line 3 ERROR failed" in log_text
assert question == "请找异常"
assert source_path == str(log_path)
assert any("log analysis answer" in item for item in output)
def test_chat_action_propose_only_shows_plan_without_execution(tmp_path: Path):
llm = FakeTestableLlmClient()
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake, llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action propose 请 verify-ip 192.168.1.10", "exit"])
assert llm.proposal_requests == ["请 verify-ip 192.168.1.10"]
assert fake.calls == []
assert any("单 action 计划" in item for item in output)
assert any("- action: verify-ip" in item for item in output)
def test_chat_action_run_llm_requires_confirmation_before_execution(tmp_path: Path):
llm = FakeTestableLlmClient()
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake, llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "no", "exit"])
assert fake.calls == []
assert any("已取消单 action 执行" in item for item in output)
output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "yes", "exit"])
assert ("verify-ip", {"ip": "192.168.1.10"}) in fake.calls
assert session.state is not None
assert any(event["type"] == "SINGLE_ACTION_DONE" for event in session.state.events)
assert any("单 action 执行完成" in item for item in output)
def test_chat_action_run_missing_ip_is_friendly(tmp_path: Path):
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run verify-ip", "exit"])
assert fake.calls == []
assert any("需要提供 ip" in item for item in output)
def test_chat_action_run_manual_executes_fake_action(tmp_path: Path):
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run get-online-ips", "yes", "exit"])
assert ("get-online-ips", {"ip": None}) in fake.calls
assert session.state is not None
assert session.state.online_ips == ["192.168.1.10", "192.168.1.11"]
assert any("单 action 执行完成" in item for item in output)
def test_chat_preflight_blocks_missing_zip_path_before_confirm(tmp_path: Path): def test_chat_preflight_blocks_missing_zip_path_before_confirm(tmp_path: Path):
missing_package = tmp_path / "missing.zip" missing_package = tmp_path / "missing.zip"
session = InteractiveCliSession( session = InteractiveCliSession(

View File

@ -287,6 +287,105 @@ def test_openai_compatible_client_omits_success_script_logs_from_action_review()
assert "[FLOW][START]" not in json.dumps(input_payload, ensure_ascii=False) assert "[FLOW][START]" not in json.dumps(input_payload, ensure_ascii=False)
def test_openai_compatible_client_supports_plain_chat():
calls = []
def transport(url, headers, payload, timeout_sec):
calls.append(payload)
return {"choices": [{"message": {"content": "普通回答"}}]}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
transport=transport,
)
answer = client.chat("你好", context={"CLIENT_SECRET": "real-secret"})
serialized_prompt = str(calls[0])
assert answer == "普通回答"
assert "response_format" not in calls[0]
assert "real-secret" not in serialized_prompt
assert "不要自动触发部署" in calls[0]["messages"][0]["content"]
def test_openai_compatible_client_analyzes_log_with_redaction():
calls = []
def transport(url, headers, payload, timeout_sec):
calls.append(payload)
return {"choices": [{"message": {"content": "日志分析"}}]}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
transport=transport,
)
answer = client.analyze_log("ERROR CLIENT_SECRET=real-secret", question="为什么失败", source_path="agent.log")
input_payload = _llm_input_payload(calls[0])
assert answer == "日志分析"
assert input_payload["source_path"] == "agent.log"
assert input_payload["question"] == "为什么失败"
assert "real-secret" not in json.dumps(input_payload, ensure_ascii=False)
assert "不要因为日志来自 stderr" in calls[0]["messages"][0]["content"]
def test_openai_compatible_client_proposes_single_action():
calls = []
def transport(url, headers, payload, timeout_sec):
calls.append(payload)
return {
"choices": [
{
"message": {
"content": (
'{"action":"verify-ip","ip":"192.168.1.10","kwargs":{"timeout_sec":10},'
'"reason":"用户要求健康检查","risk_level":"low","requires_confirmation":false}'
)
}
}
]
}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
api_key="secret-key",
model="model-a",
transport=transport,
)
proposal = client.propose_action(
"检查 192.168.1.10",
["verify-ip", "get-online-ips"],
{"CLIENT_SECRET": "real-secret"},
state_summary={"node_url_present": True},
)
input_payload = _llm_input_payload(calls[0])
assert proposal.action == "verify-ip"
assert proposal.ip == "192.168.1.10"
assert proposal.kwargs == {"timeout_sec": 10}
assert proposal.risk_level == "low"
assert proposal.requires_confirmation is True
assert "real-secret" not in json.dumps(input_payload, ensure_ascii=False)
def test_rule_based_client_proposes_only_explicit_action():
client = RuleBasedLlmClient()
proposal = client.propose_action("请 verify-ip 192.168.1.10", ["verify-ip"], {}, {})
unknown = client.propose_action("帮我检查一下", ["verify-ip"], {}, {})
assert proposal.action == "verify-ip"
assert proposal.ip == "192.168.1.10"
assert unknown.action == ""
def _llm_input_payload(request_payload): def _llm_input_payload(request_payload):
content = request_payload["messages"][1]["content"] content = request_payload["messages"][1]["content"]
_, _, raw_json = content.partition("输入 JSON:\n") _, _, raw_json = content.partition("输入 JSON:\n")