agent_deply/tests/test_interactive_cli.py
dark 87c48a74a5 新增 <think>...</think> 过滤器,支持完整标签、跨流式 chunk 标签、未闭合 <think>。
OpenAICompatibleLlmClient 新增 chat_stream(),使用 OpenAI-compatible /chat/completions 的 stream: true。
chat 普通对话现在优先流式分段输出;流式不可用或服务端不返回 SSE 时,会提示并自动 fallback 到非流式 chat()。
普通 chat 和 log analyze 都会过滤 think 内容,并且日志只记录过滤后的摘要。
更新了 chat/log 分析提示词,明确禁止输出 think/内部思考。
同步 README、打包 README、run.sh --help。
补充了过滤器、OpenAI 流式、CLI fallback、日志分析过滤等测试。
2026-06-05 12:32:58 +08:00

662 lines
23 KiB
Python

import builtins
import sys
from pathlib import Path
import pytest
from pam_deploy_graph.agent import PamDeployAgent
from pam_deploy_graph.fake_runner import FakeActionRunner
from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input
from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult, LlmSingleActionProposal
PARAMS = {
"HOME_BASE_URL": "https://pam.home.example.com",
"CLIENT_ID": "client",
"CLIENT_SECRET": "secret",
"AIRPORT_CODE": "HET",
"APP_NAME": "PAM",
"MODULE_NAME": "Node",
"VERSION_NUMBER": "2.0.5",
"ZIP_FILE_PATH": "C:/pkg.zip",
"VERIFY_INTERVAL_SEC": 0,
"VERIFY_MAX_ATTEMPTS": 2,
}
class BlockingReviewLlmClient:
def analyze_action_result(self, *, action, result):
return LlmActionAnalysis(
action=action,
has_anomaly=True,
severity="high",
possible_reason="review blocked",
suggested_action="stop and inspect",
requires_confirmation=True,
should_continue=False,
notes=["blocked by test llm"],
)
class FakeTestableLlmClient:
def __init__(self) -> None:
self.requests: list[str] = []
self.chat_requests: list[tuple[str, dict]] = []
self.log_requests: list[tuple[str, str | None, str]] = []
self.proposal_requests: list[str] = []
def understand_request(self, text: str) -> LlmIntentResult:
self.requests.append(text)
return LlmIntentResult(
intent="deploy",
mode_preference="MCP",
strategy_preference="hybrid_node_mcp",
confidence=0.91,
reasons=["test ok"],
)
def extract_params(self, text, base_params=None):
raise AssertionError("llm test should only call understand_request")
def generate_plan(self, *, params, intent, strategy):
raise AssertionError("llm test should only call understand_request")
def analyze_action_result(self, *, action, result):
return LlmActionAnalysis(action=action)
def chat(self, text, context=None):
self.chat_requests.append((text, context or {}))
return f"chat answer: {text}"
def analyze_log(self, log_text, question=None, source_path=""):
self.log_requests.append((log_text, question, source_path))
return "log analysis answer"
def propose_action(self, text, allowed_actions, params, state_summary=None):
self.proposal_requests.append(text)
return LlmSingleActionProposal(
action="verify-ip" if "verify" in text else "get-online-ips",
ip="192.168.1.10" if "192.168.1.10" in text else "",
reason="test proposal",
risk_level="medium",
requires_confirmation=True,
)
class StreamingChatLlmClient(FakeTestableLlmClient):
def __init__(self) -> None:
super().__init__()
self.stream_requests: list[tuple[str, dict]] = []
def chat_stream(self, text, context=None):
self.stream_requests.append((text, context or {}))
yield "第一句。"
yield "<think>隐藏思考</think>"
yield "第二句。"
class BrokenStreamingChatLlmClient(FakeTestableLlmClient):
def chat_stream(self, text, context=None):
raise RuntimeError("stream unavailable")
class FlakyVerifyRunner(FakeActionRunner):
"""第一次 verify-ip 失败,后续恢复成功,用于覆盖断点重试。"""
def __init__(self) -> None:
super().__init__()
self.verify_calls = 0
def _fixture_for(self, action, kwargs):
if action == "verify-ip" and kwargs.get("ip") == "192.168.1.10":
self.verify_calls += 1
if self.verify_calls == 1:
return {
"ACTION": "verify-ip",
"IP": "192.168.1.10",
"SUCCESS": "false",
"MESSAGE": "health check failed",
}
return super()._fixture_for(action, kwargs)
class ChatProgressRunner(FakeActionRunner):
"""让 chat fake 部署产生一次可见的进度更新。"""
def __init__(self) -> None:
super().__init__()
self.download_progress = ["40", "100"]
def _fixture_for(self, action, kwargs):
if action == "poll-download-progress":
rate = self.download_progress.pop(0) if self.download_progress else "100"
return {
"ACTION": action,
"STEP": "DONE" if rate == "100" else "RUNNING",
"RATE_OF_PROGRESS": rate,
"MSG": "success" if rate == "100" else "running",
"MESSAGE": f"download {rate}%",
}
return super()._fixture_for(action, kwargs)
def run_session(session: InteractiveCliSession, inputs: list[str]) -> list[str]:
output: list[str] = []
iterator = iter(inputs)
session.input = lambda _prompt: next(iterator)
session.output = output.append
session.run()
return output
def test_chat_analyzes_natural_language_and_updates_context(tmp_path: Path):
session = InteractiveCliSession(
agent=PamDeployAgent(),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["analyze please use MCP deploy 192.168.1.10", "exit"])
assert session.strategy == "hybrid_node_mcp"
assert session.target_ips == ["192.168.1.10"]
assert any("执行请输 run" in item for item in output)
def test_chat_run_executes_fake_deploy_and_writes_checkpoint(tmp_path: Path):
checkpoint = tmp_path / "checkpoint.json"
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=FakeActionRunner()),
params=PARAMS,
strategy="fake",
checkpoint_path=str(checkpoint),
)
run_session(session, ["run", "yes", "yes", "yes", "exit"])
assert checkpoint.exists()
assert session.state is not None
assert session.state.pending_confirmation == ""
assert all(item["status"] == "SUCCESS" for item in session.state.ip_states.values())
def test_chat_run_prints_action_progress(tmp_path: Path):
checkpoint = tmp_path / "checkpoint.json"
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=FakeActionRunner()),
params=PARAMS,
strategy="fake",
checkpoint_path=str(checkpoint),
)
output = run_session(session, ["run", "yes", "yes", "yes", "exit"])
assert any("开始执行 action: get-token" in item for item in output)
assert any("完成 action: verify-ip" in item for item in output)
assert any("开始分析 action 结果: get-token" in item for item in output)
assert any("分析完成: verify-ip" in item for item in output)
def test_chat_run_prints_progress_poll_updates(tmp_path: Path):
checkpoint = tmp_path / "checkpoint.json"
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=ChatProgressRunner()),
params={**PARAMS, "POLL_INTERVAL_SEC": 0},
strategy="fake",
checkpoint_path=str(checkpoint),
)
output = run_session(session, ["run", "yes", "yes", "yes", "exit"])
assert any("进度更新: poll-download-progress" in item for item in output)
assert any("RATE_OF_PROGRESS=40" in item for item in output)
assert session.state is not None
assert "poll-download-progress" in session.state.completed_global_steps
def test_chat_greeting_goes_to_llm_chat_without_structured_analysis(tmp_path: Path):
llm = FakeTestableLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["你好", "exit"])
assert session.last_analysis is None
assert llm.chat_requests[0][0] == "你好"
assert any("正在询问 LLM: FakeTestableLlmClient" in item for item in output)
assert any("chat answer: 你好" in item for item in output)
assert not any("已生成结构化理解" in item for item in output)
def test_chat_ask_command_uses_llm_chat(tmp_path: Path):
llm = FakeTestableLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 这个 agent 能做什么", "exit"])
assert llm.chat_requests[0][0] == "这个 agent 能做什么"
assert llm.chat_requests[0][1]["params"]["CLIENT_SECRET"] == "***"
assert any("chat answer: 这个 agent 能做什么" in item for item in output)
def test_chat_ask_uses_streaming_chat_when_available(tmp_path: Path):
llm = StreamingChatLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 你好", "exit"])
assert llm.stream_requests[0][0] == "你好"
assert llm.chat_requests == []
assert any("第一句。" in item for item in output)
assert any("第二句。" in item for item in output)
assert not any("隐藏思考" in item or "<think>" in item for item in output)
def test_chat_ask_falls_back_when_streaming_fails(tmp_path: Path):
llm = BrokenStreamingChatLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 你好", "exit"])
assert llm.chat_requests[0][0] == "你好"
assert any("LLM 流式输出失败,改用普通请求" in item for item in output)
assert any("chat answer: 你好" in item for item in output)
def test_chat_ask_strips_think_from_non_streaming_chat(tmp_path: Path):
class ThinkChatLlmClient(FakeTestableLlmClient):
def chat(self, text, context=None):
self.chat_requests.append((text, context or {}))
return "可见<think>隐藏思考</think>结论"
llm = ThinkChatLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["ask 你好", "exit"])
assert any("可见结论" in item for item in output)
assert not any("隐藏思考" in item or "<think>" in item for item in output)
def test_chat_log_analyze_reads_tail_and_redacts(tmp_path: Path):
llm = FakeTestableLlmClient()
log_path = tmp_path / "agent.log"
log_path.write_text(
"\n".join(
[
"line 1 CLIENT_SECRET=real-secret",
"line 2 ok",
"line 3 ERROR failed",
]
),
encoding="utf-8",
)
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, [f"log analyze {log_path} 请找异常 --tail 2", "exit"])
log_text, question, source_path = llm.log_requests[0]
assert "line 1" not in log_text
assert "real-secret" not in log_text
assert "line 3 ERROR failed" in log_text
assert question == "请找异常"
assert source_path == str(log_path)
assert any("log analysis answer" in item for item in output)
def test_chat_log_analyze_strips_think_from_answer(tmp_path: Path):
class ThinkLogLlmClient(FakeTestableLlmClient):
def analyze_log(self, log_text, question=None, source_path=""):
self.log_requests.append((log_text, question, source_path))
return "<think>隐藏日志分析</think>日志结论"
llm = ThinkLogLlmClient()
log_path = tmp_path / "agent.log"
log_path.write_text("ERROR failed", encoding="utf-8")
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, [f"log analyze {log_path}", "exit"])
assert any("日志结论" in item for item in output)
assert not any("隐藏日志分析" in item or "<think>" in item for item in output)
def test_chat_action_propose_only_shows_plan_without_execution(tmp_path: Path):
llm = FakeTestableLlmClient()
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake, llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action propose 请 verify-ip 192.168.1.10", "exit"])
assert llm.proposal_requests == ["请 verify-ip 192.168.1.10"]
assert fake.calls == []
assert any("单 action 计划" in item for item in output)
assert any("- action: verify-ip" in item for item in output)
def test_chat_action_run_llm_requires_confirmation_before_execution(tmp_path: Path):
llm = FakeTestableLlmClient()
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake, llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "no", "exit"])
assert fake.calls == []
assert any("已取消单 action 执行" in item for item in output)
output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "yes", "exit"])
assert ("verify-ip", {"ip": "192.168.1.10"}) in fake.calls
assert session.state is not None
assert any(event["type"] == "SINGLE_ACTION_DONE" for event in session.state.events)
assert any("单 action 执行完成" in item for item in output)
def test_chat_action_run_missing_ip_is_friendly(tmp_path: Path):
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run verify-ip", "exit"])
assert fake.calls == []
assert any("需要提供 ip" in item for item in output)
def test_chat_action_run_manual_executes_fake_action(tmp_path: Path):
fake = FakeActionRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["action run get-online-ips", "yes", "exit"])
assert ("get-online-ips", {"ip": None}) in fake.calls
assert session.state is not None
assert session.state.online_ips == ["192.168.1.10", "192.168.1.11"]
assert any("单 action 执行完成" in item for item in output)
def test_chat_preflight_blocks_missing_zip_path_before_confirm(tmp_path: Path):
missing_package = tmp_path / "missing.zip"
session = InteractiveCliSession(
agent=PamDeployAgent(),
params={**PARAMS, "ZIP_FILE_PATH": str(missing_package)},
strategy="script_only",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["run", "exit"])
assert session.state is None
assert any("执行前检查未通过" in item for item in output)
assert any("ZIP_FILE_PATH 不存在" in item for item in output)
def test_chat_action_failure_does_not_report_langgraph_unavailable(tmp_path: Path):
fake = FakeActionRunner({"upload-package": {"ACTION": "upload-package"}})
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["run", "yes", "yes", "yes", "exit"])
assert session.state is not None
assert session.state.last_failed_step == "upload-package"
assert any("执行已停止" in item for item in output)
assert not any("LangGraph 确认运行器不可用" in item for item in output)
def test_chat_resume_retries_failed_ip_without_rollback(tmp_path: Path):
fake = FlakyVerifyRunner()
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["run", "yes", "yes", "yes", "exit"])
assert session.state is not None
assert session.state.pending_confirmation == ""
assert session.state.paused is False
assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN"
assert session.state.ip_states["192.168.1.10"]["status"] == "SUCCESS"
assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS"
assert not any(call[0] == "rollback-ip" for call in fake.calls)
assert any("进度更新: verify-ip" in item for item in output)
def test_chat_explicit_rollback_command_rolls_back_failed_ip(tmp_path: Path):
fake = FakeActionRunner(
{
"verify-ip:192.168.1.10": {
"ACTION": "verify-ip",
"IP": "192.168.1.10",
"SUCCESS": "false",
"MESSAGE": "health check failed",
}
}
)
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=fake),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["run", "yes", "yes", "yes", "rollback", "resume", "exit"])
assert session.state is not None
assert session.state.pending_confirmation == ""
assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE"
assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS"
assert any(call[0] == "rollback-ip" for call in fake.calls)
assert any("回滚已完成;如需继续主流程,输入 resume" in item for item in output)
def test_chat_params_events_and_checkpoint_commands(tmp_path: Path):
checkpoint = tmp_path / "checkpoint.json"
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=FakeActionRunner(), action_analysis_enabled=True),
params=PARAMS,
strategy="fake",
checkpoint_path=str(checkpoint),
)
output = run_session(
session,
[
"params",
"llm action-analysis on",
"run",
"yes",
"yes",
"yes",
"events 20",
"list checkpoints",
"load checkpoint " + str(checkpoint),
"exit",
],
)
assert session.state is not None
assert any("CLIENT_SECRET: ***" in item for item in output)
assert any("ACTION_ANALYSIS" in item for item in output)
assert any("checkpoint 列表" in item for item in output)
def test_chat_load_params_hot_updates_running_state_and_config(tmp_path: Path):
checkpoint = tmp_path / "checkpoint.json"
params_file = tmp_path / "params.txt"
params_file.write_text(
"\n".join(
[
"APP_NAME=PAM-HOT",
f"ZIP_FILE_PATH={tmp_path / 'updated.zip'}",
]
)
+ "\n",
encoding="utf-8",
)
session = InteractiveCliSession(
agent=PamDeployAgent(fake_runner=FakeActionRunner()),
params=PARAMS,
strategy="fake",
checkpoint_path=str(checkpoint),
)
run_session(
session,
[
"run",
"yes",
"yes",
"yes",
"load params " + str(params_file),
"exit",
],
)
assert session.state is not None
assert session.state.params["APP_NAME"] == "PAM-HOT"
assert session.state.params["ZIP_FILE_PATH"] == str((tmp_path / "updated.zip").resolve())
config_text = Path(session.state.config_path).read_text(encoding="utf-8")
assert "APP_NAME=PAM-HOT" in config_text
assert f"ZIP_FILE_PATH={(tmp_path / 'updated.zip').resolve()}" in config_text
def test_chat_llm_review_block_message_is_visible(tmp_path: Path):
checkpoint = tmp_path / "checkpoint.json"
session = InteractiveCliSession(
agent=PamDeployAgent(
fake_runner=FakeActionRunner(),
llm_client=BlockingReviewLlmClient(),
),
params=PARAMS,
strategy="fake",
checkpoint_path=str(checkpoint),
)
output = run_session(session, ["run", "yes", "yes", "yes", "exit"])
assert session.state is not None
assert session.state.paused is True
assert session.state.pause_reason == "llm_review_blocked"
assert any("当前流程已暂停: llm_review_blocked" in item for item in output)
assert any("- suggestion: stop and inspect" in item for item in output)
assert any("如需继续,输入 resume" in item for item in output)
def test_chat_llm_test_command_uses_current_client(tmp_path: Path):
llm = FakeTestableLlmClient()
session = InteractiveCliSession(
agent=PamDeployAgent(llm_client=llm),
params=PARAMS,
strategy="fake",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["llm test 检查模型", "exit"])
assert llm.requests == ["检查模型"]
assert any("正在测试 LLM: FakeTestableLlmClient" in item for item in output)
assert any("LLM 测试通过" in item for item in output)
assert any("- intent: deploy" in item for item in output)
assert any("- strategy: hybrid_node_mcp" in item for item in output)
def test_chat_can_hot_load_mcp_config(tmp_path: Path):
mcp_config = tmp_path / "mcp.json"
mcp_config.write_text('{"transport": "stdio", "command": "python"}', encoding="utf-8")
session = InteractiveCliSession(
agent=PamDeployAgent(),
params=PARAMS,
strategy="hybrid_node_mcp",
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
output = run_session(session, ["mcp config " + mcp_config.as_posix(), "exit"])
assert session.agent.mcp_runner is not None
assert session.agent.router.mcp_runner is not None
assert any("MCP 配置已加载" in item for item in output)
def test_prompt_history_creates_runtime_dir(tmp_path: Path, monkeypatch):
pytest.importorskip("prompt_toolkit")
monkeypatch.chdir(tmp_path)
prompt = _build_prompt_input(builtins.input)
assert callable(prompt)
assert (tmp_path / "runtime").is_dir()
def test_prompt_toolkit_enabled_when_frozen(tmp_path: Path, monkeypatch):
pytest.importorskip("prompt_toolkit")
monkeypatch.chdir(tmp_path)
monkeypatch.setattr(sys, "frozen", True, raising=False)
prompt = _build_prompt_input(builtins.input)
assert callable(prompt)
assert prompt is not builtins.input
assert (tmp_path / "runtime").is_dir()