diff --git a/README.md b/README.md
index a7ac9f6..f63fe1c 100644
--- a/README.md
+++ b/README.md
@@ -88,6 +88,7 @@ packaging/
- chat 会播报 action 审核开始、审核完成和审核失败,避免黑盒执行。
- chat 支持执行中按 `Ctrl+C` 中断,保存 checkpoint 后再 `resume`。
- chat 支持普通 LLM 对话、日志尾部分析和单 action 执行:`ask <问题>`、`log analyze <路径>`、`action propose <需求>`、`action run ...`。
+- chat 普通对话会优先使用 OpenAI-compatible streaming 输出;如果服务端不支持流式,会自动退回普通请求。`...` 思考内容会被过滤,不展示也不写入运行日志。
- chat 支持 `set KEY=VALUE` 和 `load params <路径>` 热更新当前运行参数,并同步回写运行中的 `config.txt` 与 checkpoint。
- 支持通过 `--llm-action-analysis-prompt-file`、`PAM_LLM_ACTION_ANALYSIS_PROMPT_FILE` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。
- 增加统一运行日志,默认写入 `logs/pam_deploy_agent.log`,覆盖 CLI/chat、LLM 调用、action 路由、脚本/MCP 调用、LangGraph、checkpoint 等关键流程,并按天切分、默认保留 14 个历史日切文件。
@@ -135,7 +136,7 @@ python -m pam_deploy_graph.cli analyze \
仓库内已提供 [prompts/action_review.txt](/e:/AIcoding/agent_deply/prompts/action_review.txt) 作为“当前默认 action 审核提示词”的落地副本,后续自定义时可以先复制它再改,便于和内置默认行为对照。
-真实 LLM 调用位置在 `pam_deploy_graph/llm/openai_compatible.py`,提示词在 `pam_deploy_graph/llm/prompts.py`。发送给 LLM 的 `base_params` 会脱敏,`CLIENT_SECRET` 不会进入 prompt;本地生成计划后仍会执行 guardrails 校验。
+真实 LLM 调用位置在 `pam_deploy_graph/llm/openai_compatible.py`,提示词在 `pam_deploy_graph/llm/prompts.py`。发送给 LLM 的 `base_params` 会脱敏,`CLIENT_SECRET` 不会进入 prompt;本地生成计划后仍会执行 guardrails 校验。chat 普通对话优先使用 `/chat/completions` streaming;服务端不支持时会自动退回非流式请求。普通对话和日志分析会过滤 `...`、未闭合 `` 及内部思考内容。
chat 内可以用当前 client 做一次轻量测试,确认真实 LLM 或规则 fallback 是否正常加载:
@@ -306,7 +307,7 @@ PAM> resume
PAM> exit
```
-`chat` 默认把非内置命令交给当前 LLM 做普通对话,不会自动触发部署 workflow;需要结构化分析部署需求时请显式使用 `analyze <需求>`,完整部署仍要求输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM;`action propose <需求>` 只让 LLM 解析单 action 计划,不执行;`action run [ip=...] [KEY=VALUE...]` 或 `action run llm <需求>` 会展示 action、backend、ip、风险和参数,用户输入 `yes` 后才会复用现有 ActionRouter 执行单 action。每个 workflow action 和单 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;审核输入只包含当前 action 的结构化结果和必要诊断日志,不会把完整运行态 `state_summary` 交给大模型,避免跨步骤状态干扰判断;只有 workflow 审核通过才会把 action 记为 completed;如果审核建议停止或审核本身失败,流程会暂停并输出建议,等待用户决定是否 `resume` 重试当前 action。`create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会在云下载接口中传入 `parentVersionNumber`,用于指定继承哪个版本的规则;默认空值不传,沿用正在使用的版本规则。`poll-download-progress` 和 `poll-upgrade-progress` 每次只查询一次进度,workflow 会按 `POLL_INTERVAL_SEC`、`DOWNLOAD_POLL_MAX_ATTEMPTS`、`UPGRADE_POLL_MAX_ATTEMPTS` 重复调用,并在每次返回后让 LLM/规则判断是否完成、播报进度;未完成时不会跳到下一个 action。`verify-ip` 用于应用启动后的健康检查,失败时 workflow 会按 `VERIFY_INTERVAL_SEC` 重试,最多 `VERIFY_MAX_ATTEMPTS` 次;默认约每 10 秒一次、最多 12 次,仍未通过才暂停。逐 IP action 失败时也会暂停,修复外部环境后输入 `resume` 会从当前 action 重试;如果确实需要回滚,使用 `rollback [IP]` 显式执行。`llm test [文本]` 可测试当前 LLM client 是否可用。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断,chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted`。`set KEY=VALUE` 和 `load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file`、`--mcp-config` 和 `--analyze-actions`。
+`chat` 默认把非内置命令交给当前 LLM 做普通对话,不会自动触发部署 workflow;普通对话优先流式展示,`...` 思考内容会被过滤。需要结构化分析部署需求时请显式使用 `analyze <需求>`,完整部署仍要求输入 `run`,并确认参数、目标 IP 范围和最终执行后才会执行 action。`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM;`action propose <需求>` 只让 LLM 解析单 action 计划,不执行;`action run [ip=...] [KEY=VALUE...]` 或 `action run llm <需求>` 会展示 action、backend、ip、风险和参数,用户输入 `yes` 后才会复用现有 ActionRouter 执行单 action。每个 workflow action 和单 action 完成后都会自动进入一次 LLM/规则审核,并播报审核开始/结束;审核输入只包含当前 action 的结构化结果和必要诊断日志,不会把完整运行态 `state_summary` 交给大模型,避免跨步骤状态干扰判断;只有 workflow 审核通过才会把 action 记为 completed;如果审核建议停止或审核本身失败,流程会暂停并输出建议,等待用户决定是否 `resume` 重试当前 action。`create-download-task` 支持可选 `PARENT_VERSION_NUMBER`,非空时会在云下载接口中传入 `parentVersionNumber`,用于指定继承哪个版本的规则;默认空值不传,沿用正在使用的版本规则。`poll-download-progress` 和 `poll-upgrade-progress` 每次只查询一次进度,workflow 会按 `POLL_INTERVAL_SEC`、`DOWNLOAD_POLL_MAX_ATTEMPTS`、`UPGRADE_POLL_MAX_ATTEMPTS` 重复调用,并在每次返回后让 LLM/规则判断是否完成、播报进度;未完成时不会跳到下一个 action。`verify-ip` 用于应用启动后的健康检查,失败时 workflow 会按 `VERIFY_INTERVAL_SEC` 重试,最多 `VERIFY_MAX_ATTEMPTS` 次;默认约每 10 秒一次、最多 12 次,仍未通过才暂停。逐 IP action 失败时也会暂停,修复外部环境后输入 `resume` 会从当前 action 重试;如果确实需要回滚,使用 `rollback [IP]` 显式执行。`llm test [文本]` 可测试当前 LLM client 是否可用。`--analyze-actions` 仅控制详细审核结果是否写入 `events`。执行中可按 `Ctrl+C` 中断,chat 会保存当前 checkpoint 并把流程标记为 `user_interrupted`。`set KEY=VALUE` 和 `load params <路径>` 会把更新同步到当前运行 state、`config.txt` 和 checkpoint。`chat` 也支持 `--llm-base-url` / `--llm-api-key` / `--llm-model` / `--llm-action-analysis-prompt-file`、`--mcp-config` 和 `--analyze-actions`。
云下载相关参数:
@@ -322,7 +323,7 @@ PAM> exit
## 日志
-Agent 默认写入运行日志到 `logs/pam_deploy_agent.log`。日志覆盖 CLI/chat 输入、LLM 请求和响应摘要、action 路由、脚本/MCP 调用、LangGraph 节点、checkpoint 保存、暂停/续跑等关键流程。日志会在本地时间每日 0 点后首次写入时自动切分,历史文件形如 `pam_deploy_agent.log.YYYY-MM-DD`,默认保留 14 个历史日切文件。日志会递归脱敏 `CLIENT_SECRET`、`MCP_CLIENT_SECRET`、token、Authorization、api_key、password 等字段,并截断长文本。
+Agent 默认写入运行日志到 `logs/pam_deploy_agent.log`。日志覆盖 CLI/chat 输入、LLM 请求和响应摘要、action 路由、脚本/MCP 调用、LangGraph 节点、checkpoint 保存、暂停/续跑等关键流程。日志会在本地时间每日 0 点后首次写入时自动切分,历史文件形如 `pam_deploy_agent.log.YYYY-MM-DD`,默认保留 14 个历史日切文件。日志会递归脱敏 `CLIENT_SECRET`、`MCP_CLIENT_SECRET`、token、Authorization、api_key、password 等字段,并截断长文本;chat 普通对话和日志分析的 `` 内容会先过滤,不记录原始思考过程。
可通过环境变量调整日志位置、级别和保留策略:
diff --git a/packaging/README_linux_package.md b/packaging/README_linux_package.md
index d860d84..04fe240 100644
--- a/packaging/README_linux_package.md
+++ b/packaging/README_linux_package.md
@@ -78,15 +78,15 @@ cd pam-deploy-agent-linux-x86_64
- `--analyze-actions` 只控制是否把详细审核结果写入 `events`。
- action 失败或审核阻断后会保存 checkpoint 并暂停;修复外部环境后通过 `resume` 从当前 action 重试。
- 回滚不再属于主 workflow 自动分支;需要时使用 chat 内 `rollback [IP]` 或 CLI `rollback --checkpoint ...` 显式执行。
-- chat 中非内置命令默认交给当前 LLM 普通对话,不会自动触发部署 workflow;完整部署仍需 `analyze` / `run` 并人工确认。
-- chat 支持 `ask <问题>`、`log analyze <路径>`、`action propose <需求>`、`action run ...`,可用于普通问答、日志尾部分析和确认后执行单 action。
+- chat 中非内置命令默认交给当前 LLM 普通对话,不会自动触发部署 workflow;普通对话优先流式展示,`...` 思考内容会被过滤;完整部署仍需 `analyze` / `run` 并人工确认。
+- chat 支持 `ask <问题>`、`log analyze <路径>`、`action propose <需求>`、`action run ...`,可用于普通问答、日志尾部分析和确认后执行单 action;日志分析输出同样会过滤 `` 内容。
- chat 支持执行中 `Ctrl+C` 中断后保存 checkpoint,再通过 `resume` 重试当前 action。
- chat 支持 `set KEY=VALUE` 和 `load params <路径>` 热更新当前运行任务参数。
- 进度查询和健康检查重试参数可通过 `POLL_INTERVAL_SEC`、`DOWNLOAD_POLL_MAX_ATTEMPTS`、`UPGRADE_POLL_MAX_ATTEMPTS`、`VERIFY_INTERVAL_SEC`、`VERIFY_MAX_ATTEMPTS` 配置。
- 支持通过 `--llm-action-analysis-prompt-file` 或 chat 内 `llm config action_analysis_prompt_file=...` 自定义 action 审核提示词。
- chat 支持 `llm test [文本]` 测试当前 LLM client 是否正常加载。
- 默认运行日志写入 `logs/pam_deploy_agent.log`,按天切分并默认保留 14 个历史日切文件,可通过 `PAM_AGENT_LOG_FILE`、`PAM_AGENT_LOG_LEVEL` 和 `PAM_AGENT_LOG_RETENTION_DAYS` 调整。
-- 日志会脱敏 token、secret、api_key、Authorization 等字段;checkpoint 仍保存完整运行参数,请放在受控目录。
+- 日志会脱敏 token、secret、api_key、Authorization 等字段;chat 普通对话和日志分析不会记录原始 `` 内容;checkpoint 仍保存完整运行参数,请放在受控目录。
## 包大小评估
diff --git a/packaging/README_packaged_agent.md b/packaging/README_packaged_agent.md
index 7df546b..8430140 100644
--- a/packaging/README_packaged_agent.md
+++ b/packaging/README_packaged_agent.md
@@ -249,8 +249,8 @@ MCP token 获取方式与 HOME 一致,默认按 `client_credentials` POST 到
- 执行真实 action 前请确认配置文件中的 `HOME_BASE_URL`、`CLIENT_ID`、`CLIENT_SECRET`、`AIRPORT_CODE`、`APP_NAME`、`MODULE_NAME`、`VERSION_NUMBER`、`ZIP_FILE_PATH`。
- `PARENT_VERSION_NUMBER` 是云下载可选参数;非空时会传给 `download-cloud` 的 `parentVersionNumber`,空值不会发送。
-- `chat` 中非内置命令默认交给当前 LLM 做普通对话,不会自动触发部署 workflow;需要分析部署需求时请显式使用 `analyze <需求>`,完整部署仍需 `run` 并逐步确认。
-- `ask <问题>` 可显式普通对话;`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM。
+- `chat` 中非内置命令默认交给当前 LLM 做普通对话,不会自动触发部署 workflow;普通对话优先流式展示,`...` 思考内容会被过滤;需要分析部署需求时请显式使用 `analyze <需求>`,完整部署仍需 `run` 并逐步确认。
+- `ask <问题>` 可显式普通对话;`log analyze <路径> [问题] [--tail N] [--max-bytes N]` 默认只读取日志尾部并脱敏后交给 LLM,日志分析输出同样会过滤 `` 内容。
- `action propose <需求>` 只展示 LLM 解析出的单 action 计划;`action run [ip=...] [KEY=VALUE...]` 和 `action run llm <需求>` 会在用户输入 `yes` 后才执行单 action。
- 每个 action 完成后都会自动执行一次 LLM/规则审核;`--analyze-actions` 和 `llm action-analysis on` 只控制是否把详细审核结果写入 `events`。
- action 审核输入不包含完整运行态 `state_summary`,只包含当前 action 的结构化结果和必要诊断日志。
diff --git a/packaging/build_linux_self_contained.sh b/packaging/build_linux_self_contained.sh
index 0c44dab..df93f9b 100644
--- a/packaging/build_linux_self_contained.sh
+++ b/packaging/build_linux_self_contained.sh
@@ -199,9 +199,10 @@ LLM 环境变量:
6. chat 会在执行前归一化并展示实际写入脚本配置的参数;script_only / hybrid_node_mcp 会先检查 ZIP_FILE_PATH 是否存在。
7. PARENT_VERSION_NUMBER 是云下载可选参数;空值不发送,非空时传给 parentVersionNumber。
8. chat 执行过程中会播报每个 action 的开始、完成或失败;非内置输入默认交给 LLM 普通对话,不会自动触发部署 workflow。
- 9. chat 内可使用 ask、log analyze、action propose、action run、params、events、rollback、list checkpoints、load checkpoint、load params、llm config、llm test、mcp config 等命令。
- 10. 日志默认写入 logs/pam_deploy_agent.log,按天切分并默认保留 14 个历史日切文件;日志会脱敏 token、secret、api_key、Authorization 等字段。
- 11. checkpoint 会保存完整运行参数,请放在受控目录。
+ 9. chat 普通对话优先流式展示;模型返回的 ... 思考内容会被过滤,不展示也不写入日志。
+ 10. chat 内可使用 ask、log analyze、action propose、action run、params、events、rollback、list checkpoints、load checkpoint、load params、llm config、llm test、mcp config 等命令。
+ 11. 日志默认写入 logs/pam_deploy_agent.log,按天切分并默认保留 14 个历史日切文件;日志会脱敏 token、secret、api_key、Authorization 等字段。
+ 12. checkpoint 会保存完整运行参数,请放在受控目录。
HELP_TEXT
}
diff --git a/pam_deploy_graph/interactive.py b/pam_deploy_graph/interactive.py
index 28cde0c..8ba23c4 100644
--- a/pam_deploy_graph/interactive.py
+++ b/pam_deploy_graph/interactive.py
@@ -17,6 +17,7 @@ from .checkpoint_store import load_agent_state, redact_mapping
from .langgraph_runtime import LangGraphDeploymentRuntime, LangGraphRunResult
from .llm import build_llm_client
from .llm.rule_based import RuleBasedLlmClient
+from .llm.text_filter import filter_thinking_chunks, strip_thinking_text
from .logging_utils import configure_logging, json_for_log, redact_for_log
from .mcp_factory import build_mcp_runner_from_config
from .models import ActionResult, AgentState, ExecutionStrategy, LlmSingleActionProposal
@@ -29,6 +30,7 @@ OutputFunc = Callable[[str], None]
logger = logging.getLogger(__name__)
DEFAULT_LOG_ANALYSIS_TAIL_LINES = 400
DEFAULT_LOG_ANALYSIS_MAX_BYTES = 64 * 1024
+LLM_STREAM_FLUSH_CHARS = 120
COMMAND_HELP = """可用命令:
help 显示帮助
@@ -292,8 +294,15 @@ class InteractiveCliSession:
client_name = type(self.agent.llm_client).__name__
self.output(f"正在询问 LLM: {client_name}")
logger.info("chat 普通 LLM 对话开始 client=%s text=%s", client_name, redact_for_log(text, max_text_len=800))
+ context = self._llm_chat_context()
+ answer = self._try_stream_llm_chat(text, context, client_name)
+ if answer is not None:
+ if not answer:
+ self.output("LLM 未返回内容。")
+ logger.info("chat 普通 LLM 流式对话完成 client=%s answer=%s", client_name, redact_for_log(answer, max_text_len=1200))
+ return
try:
- answer = self.agent.llm_client.chat(text, context=self._llm_chat_context())
+ answer = strip_thinking_text(self.agent.llm_client.chat(text, context=context))
except Exception as exc:
logger.exception("chat 普通 LLM 对话失败 client=%s", client_name)
self.output(f"LLM 对话失败: {exc}")
@@ -301,6 +310,19 @@ class InteractiveCliSession:
self.output(answer or "LLM 未返回内容。")
logger.info("chat 普通 LLM 对话完成 client=%s answer=%s", client_name, redact_for_log(answer, max_text_len=1200))
+ def _try_stream_llm_chat(self, text: str, context: dict[str, Any], client_name: str) -> str | None:
+ """优先使用 LLM 流式 chat;不支持或失败时交给非流式 fallback。"""
+ stream_method = getattr(self.agent.llm_client, "chat_stream", None)
+ if not callable(stream_method):
+ return None
+ try:
+ raw_chunks = stream_method(text, context=context)
+ return _output_llm_stream(filter_thinking_chunks(raw_chunks), self.output)
+ except Exception as exc:
+ logger.exception("chat 普通 LLM 流式对话失败 client=%s", client_name)
+ self.output(f"LLM 流式输出失败,改用普通请求: {exc}")
+ return None
+
def _handle_log_command(self, text: str) -> None:
"""处理日志分析命令。"""
try:
@@ -345,6 +367,7 @@ class InteractiveCliSession:
logger.exception("chat 日志分析失败 client=%s path=%s", client_name, path)
self.output(f"日志分析失败: {exc}")
return
+ answer = strip_thinking_text(answer)
self.output(answer or "LLM 未返回日志分析结果。")
logger.info("chat 日志分析完成 path=%s answer=%s", path, redact_for_log(answer, max_text_len=1200))
@@ -1352,6 +1375,56 @@ def _format_action_result(result: ActionResult) -> str:
return json.dumps(redact_mapping(payload), ensure_ascii=False, indent=2, default=str)
+def _output_llm_stream(chunks: Any, output_func: OutputFunc) -> str:
+ """把 LLM 流式分片按自然停顿点输出,并返回完整可见回答。"""
+ answer_parts: list[str] = []
+ buffer = ""
+ for chunk in chunks:
+ text = str(chunk)
+ if not text:
+ continue
+ answer_parts.append(text)
+ buffer += text
+ buffer = _flush_llm_stream_buffer(buffer, output_func, force=False)
+ if buffer:
+ output_func(buffer.strip())
+ return "".join(answer_parts).strip()
+
+
+def _flush_llm_stream_buffer(buffer: str, output_func: OutputFunc, *, force: bool) -> str:
+ """在换行、句末标点或长度阈值处刷新一段 LLM 输出。"""
+ while buffer:
+ split_at = _llm_stream_split_index(buffer)
+ if split_at <= 0:
+ break
+ segment = buffer[:split_at].strip()
+ if segment:
+ output_func(segment)
+ buffer = buffer[split_at:]
+ if force and buffer.strip():
+ output_func(buffer.strip())
+ return ""
+ if len(buffer) >= LLM_STREAM_FLUSH_CHARS:
+ segment = buffer.strip()
+ if segment:
+ output_func(segment)
+ return ""
+ return buffer
+
+
+def _llm_stream_split_index(buffer: str) -> int:
+ """查找适合把流式回答输出给用户的分段位置。"""
+ newline = buffer.find("\n")
+ if newline >= 0:
+ return newline + 1
+ for index, char in enumerate(buffer):
+ if char in "。!?!?":
+ return index + 1
+ if len(buffer) >= LLM_STREAM_FLUSH_CHARS:
+ return len(buffer)
+ return -1
+
+
def _positive_int(value: str, name: str) -> int:
"""解析正整数参数。"""
try:
diff --git a/pam_deploy_graph/llm/base.py b/pam_deploy_graph/llm/base.py
index c8524c9..f674ce2 100644
--- a/pam_deploy_graph/llm/base.py
+++ b/pam_deploy_graph/llm/base.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+from collections.abc import Iterable
from typing import Any, Protocol
from pam_deploy_graph.models import (
@@ -49,6 +50,10 @@ class LlmClient(Protocol):
"""进行普通自然语言对话,不触发部署 workflow。"""
...
+ def chat_stream(self, text: str, context: dict[str, Any] | None = None) -> Iterable[str]:
+ """流式进行普通自然语言对话,不触发部署 workflow。"""
+ ...
+
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""分析日志文本并给出异常摘要、原因和建议。"""
...
diff --git a/pam_deploy_graph/llm/openai_compatible.py b/pam_deploy_graph/llm/openai_compatible.py
index e330ce5..cbfdb36 100644
--- a/pam_deploy_graph/llm/openai_compatible.py
+++ b/pam_deploy_graph/llm/openai_compatible.py
@@ -11,7 +11,7 @@ import logging
import time
from pathlib import Path
import urllib.request
-from collections.abc import Callable
+from collections.abc import Callable, Iterable, Iterator
from typing import Any
from pam_deploy_graph.constants import (
@@ -36,8 +36,10 @@ from .prompts import (
SINGLE_ACTION_PROMPT,
SYSTEM_PROMPT,
)
+from .text_filter import filter_thinking_chunks, strip_thinking_text
JsonTransport = Callable[[str, dict[str, str], dict[str, Any], float], dict[str, Any]]
+StreamTransport = Callable[[str, dict[str, str], dict[str, Any], float], Iterable[str]]
logger = logging.getLogger(__name__)
@@ -54,6 +56,7 @@ class OpenAICompatibleLlmClient:
timeout_sec: float = 30,
temperature: float = 0,
transport: JsonTransport | None = None,
+ stream_transport: StreamTransport | None = None,
) -> None:
"""保存连接参数、模型参数和可替换的 HTTP transport。"""
if not base_url:
@@ -67,8 +70,9 @@ class OpenAICompatibleLlmClient:
self.timeout_sec = timeout_sec
self.temperature = temperature
self.transport = transport or _default_transport
+ self.stream_transport = stream_transport or _default_stream_transport
logger.info(
- "OpenAI-compatible LLM client 初始化 base_url=%s endpoint=%s model=%s has_api_key=%s timeout=%s temperature=%s custom_transport=%s",
+ "OpenAI-compatible LLM client 初始化 base_url=%s endpoint=%s model=%s has_api_key=%s timeout=%s temperature=%s custom_transport=%s custom_stream_transport=%s",
self.base_url,
_chat_completions_url(self.base_url),
self.model,
@@ -76,6 +80,7 @@ class OpenAICompatibleLlmClient:
self.timeout_sec,
self.temperature,
transport is not None,
+ stream_transport is not None,
)
def understand_request(self, text: str) -> LlmIntentResult:
@@ -192,6 +197,17 @@ class OpenAICompatibleLlmClient:
},
)
+ def chat_stream(self, text: str, context: dict[str, Any] | None = None) -> Iterable[str]:
+ """调用 LLM 做普通流式对话,不要求 JSON 响应。"""
+ return self._complete_text_stream(
+ "chat",
+ CHAT_PROMPT,
+ {
+ "user_text": text,
+ "context": _redact_sensitive(context or {}),
+ },
+ )
+
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""调用 LLM 分析日志尾部摘要。"""
return self._complete_text(
@@ -328,7 +344,7 @@ class OpenAICompatibleLlmClient:
)
try:
response = self.transport(endpoint, headers, request_payload, self.timeout_sec)
- content = str(_message_content(response))
+ content = strip_thinking_text(str(_message_content(response)))
except Exception:
logger.exception(
"LLM 文本请求失败 operation=%s endpoint=%s duration_ms=%s input=%s",
@@ -344,7 +360,55 @@ class OpenAICompatibleLlmClient:
int((time.perf_counter() - started_at) * 1000),
redact_for_log(content, max_text_len=1600),
)
- return content.strip()
+ return content
+
+ def _complete_text_stream(self, operation: str, instruction: str, input_payload: dict[str, Any]) -> Iterable[str]:
+ """发送 stream chat/completions 请求,并返回过滤后的普通文本分片。"""
+ started_at = time.perf_counter()
+ endpoint = _chat_completions_url(self.base_url)
+ request_payload = {
+ "model": self.model,
+ "temperature": self.temperature,
+ "stream": True,
+ "messages": [
+ {"role": "system", "content": instruction},
+ {
+ "role": "user",
+ "content": "输入 JSON:\n" + json.dumps(input_payload, ensure_ascii=False, sort_keys=True),
+ },
+ ],
+ }
+ headers = {"Content-Type": "application/json"}
+ if self.api_key:
+ headers["Authorization"] = f"Bearer {self.api_key}"
+ logger.info(
+ "LLM 流式文本请求开始 operation=%s endpoint=%s model=%s timeout=%s has_api_key=%s input=%s",
+ operation,
+ endpoint,
+ self.model,
+ self.timeout_sec,
+ bool(self.api_key),
+ json_for_log(input_payload, max_text_len=1600),
+ )
+ try:
+ raw_chunks = self.stream_transport(endpoint, headers, request_payload, self.timeout_sec)
+ for chunk in filter_thinking_chunks(raw_chunks):
+ if chunk:
+ yield chunk
+ except Exception:
+ logger.exception(
+ "LLM 流式文本请求失败 operation=%s endpoint=%s duration_ms=%s input=%s",
+ operation,
+ endpoint,
+ int((time.perf_counter() - started_at) * 1000),
+ json_for_log(input_payload, max_text_len=1600),
+ )
+ raise
+ logger.info(
+ "LLM 流式文本请求完成 operation=%s duration_ms=%s",
+ operation,
+ int((time.perf_counter() - started_at) * 1000),
+ )
def _default_transport(
@@ -368,6 +432,41 @@ def _default_transport(
return decoded
+def _default_stream_transport(
+ url: str,
+ headers: dict[str, str],
+ payload: dict[str, Any],
+ timeout_sec: float,
+) -> Iterator[str]:
+ """使用标准库 urllib 发送 OpenAI-compatible SSE 流式请求。"""
+ request = urllib.request.Request(
+ url,
+ data=json.dumps(payload).encode("utf-8"),
+ headers=headers,
+ method="POST",
+ )
+ with urllib.request.urlopen(request, timeout=timeout_sec) as response:
+ for raw_line in response:
+ line = raw_line.decode("utf-8", errors="replace").strip()
+ if not line or line.startswith(":"):
+ continue
+ if line.startswith("event:") or line.startswith("id:"):
+ continue
+ if not line.startswith("data:"):
+ raise ValueError("LLM 流式响应不是 SSE data 格式")
+ data = line[len("data:") :].strip()
+ if data == "[DONE]":
+ break
+ try:
+ decoded = json.loads(data)
+ except json.JSONDecodeError:
+ logger.debug("忽略无法解析的 LLM stream data: %s", redact_for_log(data, max_text_len=300))
+ continue
+ chunk = _stream_delta_content(decoded)
+ if chunk:
+ yield chunk
+
+
def load_prompt_text(path: str | None) -> str:
"""读取自定义提示词文件。"""
if not path:
@@ -401,6 +500,35 @@ def _message_content(response: dict[str, Any]) -> Any:
return content
+def _stream_delta_content(response: dict[str, Any]) -> str:
+ """从 OpenAI-compatible stream chunk 中提取 delta.content。"""
+ try:
+ choice = response["choices"][0]
+ except (KeyError, IndexError, TypeError):
+ return ""
+ delta = choice.get("delta") if isinstance(choice, dict) else None
+ if isinstance(delta, dict) and "content" in delta:
+ return str(_content_parts_to_text(delta.get("content")))
+ message = choice.get("message") if isinstance(choice, dict) else None
+ if isinstance(message, dict) and "content" in message:
+ return str(_content_parts_to_text(message.get("content")))
+ text = choice.get("text") if isinstance(choice, dict) else None
+ return str(text) if text is not None else ""
+
+
+def _content_parts_to_text(content: Any) -> str:
+ """把 OpenAI content parts 或字符串转换为纯文本。"""
+ if isinstance(content, list):
+ parts: list[str] = []
+ for item in content:
+ if isinstance(item, dict) and item.get("type") == "text":
+ parts.append(str(item.get("text", "")))
+ elif isinstance(item, str):
+ parts.append(item)
+ return "".join(parts)
+ return "" if content is None else str(content)
+
+
def _loads_json_object(content: Any) -> Any:
"""把 message.content 解析为 JSON 对象。"""
if isinstance(content, dict):
diff --git a/pam_deploy_graph/llm/prompts.py b/pam_deploy_graph/llm/prompts.py
index d4e988c..9ba03f1 100644
--- a/pam_deploy_graph/llm/prompts.py
+++ b/pam_deploy_graph/llm/prompts.py
@@ -109,6 +109,7 @@ CHAT_PROMPT = """你是 PAM 部署 Agent 的交互助手。
- 如果用户想执行完整部署,提示使用 `analyze <需求>` 先分析,确认后再输入 `run`。
- 如果用户想单独执行 action,提示使用 `action propose <需求>` 或 `action run ...`,执行前仍需要人工确认。
- 不要输出密钥、token、Authorization、CLIENT_SECRET 或 api_key。
+- 不要输出 ``、``、推理过程、内部思考或隐藏分析内容。
"""
LOG_ANALYSIS_PROMPT = """分析 PAM Agent 或部署脚本日志。
@@ -118,6 +119,7 @@ LOG_ANALYSIS_PROMPT = """分析 PAM Agent 或部署脚本日志。
- 不要输出密钥、token、Authorization、CLIENT_SECRET 或 api_key。
- 输入通常是日志尾部摘要,不代表完整文件。
- 不要因为日志来自 stderr 就直接判定失败,要结合 ERROR、Exception、fail、状态码和上下文判断。
+- 不要输出 ``、``、推理过程、内部思考或隐藏分析内容。
"""
SINGLE_ACTION_PROMPT = """把用户自然语言解析成一次 PAM action 调用建议。
diff --git a/pam_deploy_graph/llm/rule_based.py b/pam_deploy_graph/llm/rule_based.py
index 0f919d4..4e86998 100644
--- a/pam_deploy_graph/llm/rule_based.py
+++ b/pam_deploy_graph/llm/rule_based.py
@@ -6,6 +6,7 @@
from __future__ import annotations
+from collections.abc import Iterable
import logging
import re
from dataclasses import asdict
@@ -23,6 +24,8 @@ from pam_deploy_graph.models import (
LlmSingleActionProposal,
)
+from .text_filter import strip_thinking_text
+
logger = logging.getLogger(__name__)
KEY_ALIASES = {
@@ -62,16 +65,20 @@ class RuleBasedLlmClient:
logger.info("规则 LLM 普通对话 text=%s context=%s", redact_for_log(text, max_text_len=800), json_for_log(context or {}))
lowered = text.lower()
if any(word in lowered for word in ("help", "帮助", "怎么用", "命令")):
- return (
+ return strip_thinking_text(
"当前是本地规则 LLM fallback。可用 `analyze <需求>` 分析部署需求,`run` 执行完整 workflow,"
"`action propose <需求>` 解析单个 action,`action run ...` 确认后执行单个 action,"
"`log analyze <路径>` 分析日志尾部。"
)
- return (
+ return strip_thinking_text(
"当前未配置真实 LLM,已使用本地规则 fallback。普通闲聊只能给出有限说明;"
"如需自然语言问答、日志深度分析或更准确的 action 解析,请配置真实 LLM。"
)
+ def chat_stream(self, text: str, context: dict[str, Any] | None = None) -> Iterable[str]:
+ """规则 fallback 的流式对话兼容实现。"""
+ yield self.chat(text, context=context)
+
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
"""用本地规则分析日志尾部。"""
logger.info("规则 LLM 日志分析 source=%s question=%s text_len=%s", source_path, redact_for_log(question or "", max_text_len=300), len(log_text))
@@ -94,7 +101,7 @@ class RuleBasedLlmClient:
else:
summary.append("未在日志尾部发现明显 ERROR/Exception/fail/timeout 关键字。")
summary.append("建议:如问题仍存在,请扩大 `--tail` 或提供更具体的问题描述。")
- return "\n".join(summary)
+ return strip_thinking_text("\n".join(summary))
def propose_action(
self,
diff --git a/pam_deploy_graph/llm/text_filter.py b/pam_deploy_graph/llm/text_filter.py
new file mode 100644
index 0000000..31bcc78
--- /dev/null
+++ b/pam_deploy_graph/llm/text_filter.py
@@ -0,0 +1,109 @@
+"""LLM 文本输出过滤工具。"""
+
+from __future__ import annotations
+
+from collections.abc import Iterable, Iterator
+
+OPEN_THINK_TAG = ""
+CLOSE_THINK_TAG = ""
+
+
+def strip_thinking_text(text: str) -> str:
+ """移除 LLM 普通文本输出里的思考标签和内容。"""
+ filter_ = ThinkingTextStreamFilter()
+ visible = filter_.feed(text) + filter_.finish()
+ return visible.strip()
+
+
+def filter_thinking_chunks(chunks: Iterable[str]) -> Iterator[str]:
+ """按流式分片移除 `...`,避免跨分片泄露思考内容。"""
+ filter_ = ThinkingTextStreamFilter()
+ for chunk in chunks:
+ visible = filter_.feed(str(chunk))
+ if visible:
+ yield visible
+ tail = filter_.finish()
+ if tail:
+ yield tail
+
+
+class ThinkingTextStreamFilter:
+ """支持跨 chunk 识别 think 标签的流式过滤器。"""
+
+ def __init__(self) -> None:
+ """初始化可见/隐藏状态和待判定缓冲区。"""
+ self._pending = ""
+ self._inside_think = False
+
+ def feed(self, chunk: str) -> str:
+ """输入一个文本分片,返回当前可安全展示的可见文本。"""
+ if not chunk:
+ return ""
+ self._pending += chunk
+ output: list[str] = []
+ while self._pending:
+ lowered = self._pending.lower()
+ if self._inside_think:
+ close_index = lowered.find(CLOSE_THINK_TAG)
+ if close_index >= 0:
+ self._pending = self._pending[close_index + len(CLOSE_THINK_TAG) :]
+ self._inside_think = False
+ continue
+ keep = _longest_suffix_prefix(lowered, [CLOSE_THINK_TAG])
+ self._pending = self._pending[-keep:] if keep else ""
+ break
+
+ open_index = lowered.find(OPEN_THINK_TAG)
+ close_index = lowered.find(CLOSE_THINK_TAG)
+ if open_index >= 0 and (close_index < 0 or open_index < close_index):
+ output.append(self._pending[:open_index])
+ self._pending = self._pending[open_index + len(OPEN_THINK_TAG) :]
+ self._inside_think = True
+ continue
+ if close_index >= 0:
+ output.append(self._pending[:close_index])
+ self._pending = self._pending[close_index + len(CLOSE_THINK_TAG) :]
+ continue
+
+ keep = _longest_suffix_prefix(lowered, [OPEN_THINK_TAG, CLOSE_THINK_TAG])
+ if keep:
+ output.append(self._pending[:-keep])
+ self._pending = self._pending[-keep:]
+ else:
+ output.append(self._pending)
+ self._pending = ""
+ break
+ return "".join(output)
+
+ def finish(self) -> str:
+ """结束流式过滤,丢弃未闭合 think 内容和未完成标签。"""
+ if self._inside_think:
+ self._pending = ""
+ self._inside_think = False
+ return ""
+ lowered = self._pending.lower()
+ if lowered in _tag_prefixes():
+ self._pending = ""
+ return ""
+ tail = self._pending
+ self._pending = ""
+ return tail
+
+
+def _longest_suffix_prefix(text: str, targets: list[str]) -> int:
+ """返回 text 末尾与任一目标标签前缀匹配的最长长度。"""
+ best = 0
+ for target in targets:
+ max_len = min(len(text), len(target) - 1)
+ for length in range(1, max_len + 1):
+ if text.endswith(target[:length]):
+ best = max(best, length)
+ return best
+
+
+def _tag_prefixes() -> set[str]:
+ """生成 think 标签的所有非完整前缀,用于收尾时丢弃半截标签。"""
+ prefixes = {""}
+ for tag in (OPEN_THINK_TAG, CLOSE_THINK_TAG):
+ prefixes.update(tag[:index] for index in range(1, len(tag)))
+ return prefixes
diff --git a/tests/test_interactive_cli.py b/tests/test_interactive_cli.py
index 30a4f61..9a3f6bd 100644
--- a/tests/test_interactive_cli.py
+++ b/tests/test_interactive_cli.py
@@ -83,6 +83,23 @@ class FakeTestableLlmClient:
)
+class StreamingChatLlmClient(FakeTestableLlmClient):
+ def __init__(self) -> None:
+ super().__init__()
+ self.stream_requests: list[tuple[str, dict]] = []
+
+ def chat_stream(self, text, context=None):
+ self.stream_requests.append((text, context or {}))
+ yield "第一句。"
+ yield "隐藏思考"
+ yield "第二句。"
+
+
+class BrokenStreamingChatLlmClient(FakeTestableLlmClient):
+ def chat_stream(self, text, context=None):
+ raise RuntimeError("stream unavailable")
+
+
class FlakyVerifyRunner(FakeActionRunner):
"""第一次 verify-ip 失败,后续恢复成功,用于覆盖断点重试。"""
@@ -232,6 +249,60 @@ def test_chat_ask_command_uses_llm_chat(tmp_path: Path):
assert any("chat answer: 这个 agent 能做什么" in item for item in output)
+def test_chat_ask_uses_streaming_chat_when_available(tmp_path: Path):
+ llm = StreamingChatLlmClient()
+ session = InteractiveCliSession(
+ agent=PamDeployAgent(llm_client=llm),
+ params=PARAMS,
+ strategy="fake",
+ checkpoint_path=str(tmp_path / "checkpoint.json"),
+ )
+
+ output = run_session(session, ["ask 你好", "exit"])
+
+ assert llm.stream_requests[0][0] == "你好"
+ assert llm.chat_requests == []
+ assert any("第一句。" in item for item in output)
+ assert any("第二句。" in item for item in output)
+ assert not any("隐藏思考" in item or "" in item for item in output)
+
+
+def test_chat_ask_falls_back_when_streaming_fails(tmp_path: Path):
+ llm = BrokenStreamingChatLlmClient()
+ session = InteractiveCliSession(
+ agent=PamDeployAgent(llm_client=llm),
+ params=PARAMS,
+ strategy="fake",
+ checkpoint_path=str(tmp_path / "checkpoint.json"),
+ )
+
+ output = run_session(session, ["ask 你好", "exit"])
+
+ assert llm.chat_requests[0][0] == "你好"
+ assert any("LLM 流式输出失败,改用普通请求" in item for item in output)
+ assert any("chat answer: 你好" in item for item in output)
+
+
+def test_chat_ask_strips_think_from_non_streaming_chat(tmp_path: Path):
+ class ThinkChatLlmClient(FakeTestableLlmClient):
+ def chat(self, text, context=None):
+ self.chat_requests.append((text, context or {}))
+ return "可见隐藏思考结论"
+
+ llm = ThinkChatLlmClient()
+ session = InteractiveCliSession(
+ agent=PamDeployAgent(llm_client=llm),
+ params=PARAMS,
+ strategy="fake",
+ checkpoint_path=str(tmp_path / "checkpoint.json"),
+ )
+
+ output = run_session(session, ["ask 你好", "exit"])
+
+ assert any("可见结论" in item for item in output)
+ assert not any("隐藏思考" in item or "" in item for item in output)
+
+
def test_chat_log_analyze_reads_tail_and_redacts(tmp_path: Path):
llm = FakeTestableLlmClient()
log_path = tmp_path / "agent.log"
@@ -263,6 +334,28 @@ def test_chat_log_analyze_reads_tail_and_redacts(tmp_path: Path):
assert any("log analysis answer" in item for item in output)
+def test_chat_log_analyze_strips_think_from_answer(tmp_path: Path):
+ class ThinkLogLlmClient(FakeTestableLlmClient):
+ def analyze_log(self, log_text, question=None, source_path=""):
+ self.log_requests.append((log_text, question, source_path))
+ return "隐藏日志分析日志结论"
+
+ llm = ThinkLogLlmClient()
+ log_path = tmp_path / "agent.log"
+ log_path.write_text("ERROR failed", encoding="utf-8")
+ session = InteractiveCliSession(
+ agent=PamDeployAgent(llm_client=llm),
+ params=PARAMS,
+ strategy="fake",
+ checkpoint_path=str(tmp_path / "checkpoint.json"),
+ )
+
+ output = run_session(session, [f"log analyze {log_path}", "exit"])
+
+ assert any("日志结论" in item for item in output)
+ assert not any("隐藏日志分析" in item or "" in item for item in output)
+
+
def test_chat_action_propose_only_shows_plan_without_execution(tmp_path: Path):
llm = FakeTestableLlmClient()
fake = FakeActionRunner()
diff --git a/tests/test_llm_structured.py b/tests/test_llm_structured.py
index 4c445e6..732a546 100644
--- a/tests/test_llm_structured.py
+++ b/tests/test_llm_structured.py
@@ -308,6 +308,47 @@ def test_openai_compatible_client_supports_plain_chat():
assert "response_format" not in calls[0]
assert "real-secret" not in serialized_prompt
assert "不要自动触发部署" in calls[0]["messages"][0]["content"]
+ assert "不要输出 ``" in calls[0]["messages"][0]["content"]
+
+
+def test_openai_compatible_client_strips_think_from_plain_chat():
+ def transport(url, headers, payload, timeout_sec):
+ return {"choices": [{"message": {"content": "开头内部思考结论"}}]}
+
+ client = OpenAICompatibleLlmClient(
+ base_url="https://llm.example/v1",
+ api_key="secret-key",
+ model="model-a",
+ transport=transport,
+ )
+
+ answer = client.chat("你好")
+
+ assert answer == "开头结论"
+
+
+def test_openai_compatible_client_streams_plain_chat_and_filters_think():
+ calls = []
+
+ def stream_transport(url, headers, payload, timeout_sec):
+ calls.append((url, headers, payload, timeout_sec))
+ return iter(["开头", "内部思考", "结论", "。"])
+
+ client = OpenAICompatibleLlmClient(
+ base_url="https://llm.example/v1",
+ api_key="secret-key",
+ model="model-a",
+ stream_transport=stream_transport,
+ )
+
+ answer = "".join(client.chat_stream("你好", context={"CLIENT_SECRET": "real-secret"}))
+
+ assert answer == "开头结论。"
+ assert calls[0][0] == "https://llm.example/v1/chat/completions"
+ assert calls[0][1]["Authorization"] == "Bearer secret-key"
+ assert calls[0][2]["stream"] is True
+ assert "response_format" not in calls[0][2]
+ assert "real-secret" not in json.dumps(calls[0][2], ensure_ascii=False)
def test_openai_compatible_client_analyzes_log_with_redaction():
@@ -315,7 +356,7 @@ def test_openai_compatible_client_analyzes_log_with_redaction():
def transport(url, headers, payload, timeout_sec):
calls.append(payload)
- return {"choices": [{"message": {"content": "日志分析"}}]}
+ return {"choices": [{"message": {"content": "隐藏分析日志分析"}}]}
client = OpenAICompatibleLlmClient(
base_url="https://llm.example/v1",
@@ -332,6 +373,7 @@ def test_openai_compatible_client_analyzes_log_with_redaction():
assert input_payload["question"] == "为什么失败"
assert "real-secret" not in json.dumps(input_payload, ensure_ascii=False)
assert "不要因为日志来自 stderr" in calls[0]["messages"][0]["content"]
+ assert "不要输出 ``" in calls[0]["messages"][0]["content"]
def test_openai_compatible_client_proposes_single_action():
diff --git a/tests/test_llm_text_filter.py b/tests/test_llm_text_filter.py
new file mode 100644
index 0000000..3385958
--- /dev/null
+++ b/tests/test_llm_text_filter.py
@@ -0,0 +1,29 @@
+from pam_deploy_graph.llm.text_filter import filter_thinking_chunks, strip_thinking_text
+
+
+def test_strip_thinking_text_removes_complete_block():
+ text = "开头这里是很长的内部思考\n不应该展示结论"
+
+ assert strip_thinking_text(text) == "开头结论"
+
+
+def test_strip_thinking_text_removes_unclosed_block():
+ text = "可见内容\n未闭合的内部思考不应该展示"
+
+ assert strip_thinking_text(text) == "可见内容"
+
+
+def test_filter_thinking_chunks_handles_split_tags():
+ chunks = ["回答", "隐藏", "内容继续。"]
+
+ visible = list(filter_thinking_chunks(chunks))
+
+ assert "".join(visible) == "回答继续。"
+
+
+def test_filter_thinking_chunks_drops_unclosed_think_tail():
+ chunks = ["回答", "", "隐藏内容"]
+
+ visible = list(filter_thinking_chunks(chunks))
+
+ assert "".join(visible) == "回答"