import builtins import sys from pathlib import Path import pytest from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.fake_runner import FakeActionRunner from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult, LlmSingleActionProposal PARAMS = { "HOME_BASE_URL": "https://pam.home.example.com", "CLIENT_ID": "client", "CLIENT_SECRET": "secret", "AIRPORT_CODE": "HET", "APP_NAME": "PAM", "MODULE_NAME": "Node", "VERSION_NUMBER": "2.0.5", "ZIP_FILE_PATH": "C:/pkg.zip", "VERIFY_INTERVAL_SEC": 0, "VERIFY_MAX_ATTEMPTS": 2, } class BlockingReviewLlmClient: def analyze_action_result(self, *, action, result): return LlmActionAnalysis( action=action, has_anomaly=True, severity="high", possible_reason="review blocked", suggested_action="stop and inspect", requires_confirmation=True, should_continue=False, notes=["blocked by test llm"], ) class FakeTestableLlmClient: def __init__(self) -> None: self.requests: list[str] = [] self.chat_requests: list[tuple[str, dict]] = [] self.log_requests: list[tuple[str, str | None, str]] = [] self.proposal_requests: list[str] = [] def understand_request(self, text: str) -> LlmIntentResult: self.requests.append(text) return LlmIntentResult( intent="deploy", mode_preference="MCP", strategy_preference="hybrid_node_mcp", confidence=0.91, reasons=["test ok"], ) def extract_params(self, text, base_params=None): raise AssertionError("llm test should only call understand_request") def generate_plan(self, *, params, intent, strategy): raise AssertionError("llm test should only call understand_request") def analyze_action_result(self, *, action, result): return LlmActionAnalysis(action=action) def chat(self, text, context=None): self.chat_requests.append((text, context or {})) return f"chat answer: {text}" def analyze_log(self, log_text, question=None, source_path=""): self.log_requests.append((log_text, question, source_path)) return "log analysis answer" def propose_action(self, text, allowed_actions, params, state_summary=None): self.proposal_requests.append(text) return LlmSingleActionProposal( action="verify-ip" if "verify" in text else "get-online-ips", ip="192.168.1.10" if "192.168.1.10" in text else "", reason="test proposal", risk_level="medium", requires_confirmation=True, ) class StreamingChatLlmClient(FakeTestableLlmClient): def __init__(self) -> None: super().__init__() self.stream_requests: list[tuple[str, dict]] = [] def chat_stream(self, text, context=None): self.stream_requests.append((text, context or {})) yield "第一句。" yield "隐藏思考" yield "第二句。" class BrokenStreamingChatLlmClient(FakeTestableLlmClient): def chat_stream(self, text, context=None): raise RuntimeError("stream unavailable") class FlakyVerifyRunner(FakeActionRunner): """第一次 verify-ip 失败,后续恢复成功,用于覆盖断点重试。""" def __init__(self) -> None: super().__init__() self.verify_calls = 0 def _fixture_for(self, action, kwargs): if action == "verify-ip" and kwargs.get("ip") == "192.168.1.10": self.verify_calls += 1 if self.verify_calls == 1: return { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } return super()._fixture_for(action, kwargs) class ChatProgressRunner(FakeActionRunner): """让 chat fake 部署产生一次可见的进度更新。""" def __init__(self) -> None: super().__init__() self.download_progress = ["40", "100"] def _fixture_for(self, action, kwargs): if action == "poll-download-progress": rate = self.download_progress.pop(0) if self.download_progress else "100" return { "ACTION": action, "STEP": "DONE" if rate == "100" else "RUNNING", "RATE_OF_PROGRESS": rate, "MSG": "success" if rate == "100" else "running", "MESSAGE": f"download {rate}%", } return super()._fixture_for(action, kwargs) def run_session(session: InteractiveCliSession, inputs: list[str]) -> list[str]: output: list[str] = [] iterator = iter(inputs) session.input = lambda _prompt: next(iterator) session.output = output.append session.run() return output def test_chat_analyzes_natural_language_and_updates_context(tmp_path: Path): session = InteractiveCliSession( agent=PamDeployAgent(), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["analyze please use MCP deploy 192.168.1.10", "exit"]) assert session.strategy == "hybrid_node_mcp" assert session.target_ips == ["192.168.1.10"] assert any("执行请输 run" in item for item in output) def test_chat_run_executes_fake_deploy_and_writes_checkpoint(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=FakeActionRunner()), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert checkpoint.exists() assert session.state is not None assert session.state.pending_confirmation == "" assert all(item["status"] == "SUCCESS" for item in session.state.ip_states.values()) def test_chat_run_prints_action_progress(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=FakeActionRunner()), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert any("开始执行 action: get-token" in item for item in output) assert any("完成 action: verify-ip" in item for item in output) assert any("开始分析 action 结果: get-token" in item for item in output) assert any("分析完成: verify-ip" in item for item in output) def test_chat_run_prints_progress_poll_updates(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=ChatProgressRunner()), params={**PARAMS, "POLL_INTERVAL_SEC": 0}, strategy="fake", checkpoint_path=str(checkpoint), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert any("进度更新: poll-download-progress" in item for item in output) assert any("RATE_OF_PROGRESS=40" in item for item in output) assert session.state is not None assert "poll-download-progress" in session.state.completed_global_steps def test_chat_greeting_goes_to_llm_chat_without_structured_analysis(tmp_path: Path): llm = FakeTestableLlmClient() session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["你好", "exit"]) assert session.last_analysis is None assert llm.chat_requests[0][0] == "你好" assert any("正在询问 LLM: FakeTestableLlmClient" in item for item in output) assert any("chat answer: 你好" in item for item in output) assert not any("已生成结构化理解" in item for item in output) def test_chat_ask_command_uses_llm_chat(tmp_path: Path): llm = FakeTestableLlmClient() session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["ask 这个 agent 能做什么", "exit"]) assert llm.chat_requests[0][0] == "这个 agent 能做什么" assert llm.chat_requests[0][1]["params"]["CLIENT_SECRET"] == "***" assert any("chat answer: 这个 agent 能做什么" in item for item in output) def test_chat_ask_uses_streaming_chat_when_available(tmp_path: Path): llm = StreamingChatLlmClient() session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["ask 你好", "exit"]) assert llm.stream_requests[0][0] == "你好" assert llm.chat_requests == [] assert any("第一句。" in item for item in output) assert any("第二句。" in item for item in output) assert not any("隐藏思考" in item or "" in item for item in output) def test_chat_ask_falls_back_when_streaming_fails(tmp_path: Path): llm = BrokenStreamingChatLlmClient() session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["ask 你好", "exit"]) assert llm.chat_requests[0][0] == "你好" assert any("LLM 流式输出失败,改用普通请求" in item for item in output) assert any("chat answer: 你好" in item for item in output) def test_chat_ask_strips_think_from_non_streaming_chat(tmp_path: Path): class ThinkChatLlmClient(FakeTestableLlmClient): def chat(self, text, context=None): self.chat_requests.append((text, context or {})) return "可见隐藏思考结论" llm = ThinkChatLlmClient() session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["ask 你好", "exit"]) assert any("可见结论" in item for item in output) assert not any("隐藏思考" in item or "" in item for item in output) def test_chat_log_analyze_reads_tail_and_redacts(tmp_path: Path): llm = FakeTestableLlmClient() log_path = tmp_path / "agent.log" log_path.write_text( "\n".join( [ "line 1 CLIENT_SECRET=real-secret", "line 2 ok", "line 3 ERROR failed", ] ), encoding="utf-8", ) session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, [f"log analyze {log_path} 请找异常 --tail 2", "exit"]) log_text, question, source_path = llm.log_requests[0] assert "line 1" not in log_text assert "real-secret" not in log_text assert "line 3 ERROR failed" in log_text assert question == "请找异常" assert source_path == str(log_path) assert any("log analysis answer" in item for item in output) def test_chat_log_analyze_strips_think_from_answer(tmp_path: Path): class ThinkLogLlmClient(FakeTestableLlmClient): def analyze_log(self, log_text, question=None, source_path=""): self.log_requests.append((log_text, question, source_path)) return "隐藏日志分析日志结论" llm = ThinkLogLlmClient() log_path = tmp_path / "agent.log" log_path.write_text("ERROR failed", encoding="utf-8") session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, [f"log analyze {log_path}", "exit"]) assert any("日志结论" in item for item in output) assert not any("隐藏日志分析" in item or "" in item for item in output) def test_chat_action_propose_only_shows_plan_without_execution(tmp_path: Path): llm = FakeTestableLlmClient() fake = FakeActionRunner() session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake, llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["action propose 请 verify-ip 192.168.1.10", "exit"]) assert llm.proposal_requests == ["请 verify-ip 192.168.1.10"] assert fake.calls == [] assert any("单 action 计划" in item for item in output) assert any("- action: verify-ip" in item for item in output) def test_chat_action_run_llm_requires_confirmation_before_execution(tmp_path: Path): llm = FakeTestableLlmClient() fake = FakeActionRunner() session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake, llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "no", "exit"]) assert fake.calls == [] assert any("已取消单 action 执行" in item for item in output) output = run_session(session, ["action run llm 请 verify-ip 192.168.1.10", "yes", "exit"]) assert ("verify-ip", {"ip": "192.168.1.10"}) in fake.calls assert session.state is not None assert any(event["type"] == "SINGLE_ACTION_DONE" for event in session.state.events) assert any("单 action 执行完成" in item for item in output) def test_chat_action_run_missing_ip_is_friendly(tmp_path: Path): fake = FakeActionRunner() session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["action run verify-ip", "exit"]) assert fake.calls == [] assert any("需要提供 ip" in item for item in output) def test_chat_action_run_manual_executes_fake_action(tmp_path: Path): fake = FakeActionRunner() session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["action run get-online-ips", "yes", "exit"]) assert ("get-online-ips", {"ip": None}) in fake.calls assert session.state is not None assert session.state.online_ips == ["192.168.1.10", "192.168.1.11"] assert any("单 action 执行完成" in item for item in output) def test_chat_preflight_blocks_missing_zip_path_before_confirm(tmp_path: Path): missing_package = tmp_path / "missing.zip" session = InteractiveCliSession( agent=PamDeployAgent(), params={**PARAMS, "ZIP_FILE_PATH": str(missing_package)}, strategy="script_only", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["run", "exit"]) assert session.state is None assert any("执行前检查未通过" in item for item in output) assert any("ZIP_FILE_PATH 不存在" in item for item in output) def test_chat_action_failure_does_not_report_langgraph_unavailable(tmp_path: Path): fake = FakeActionRunner({"upload-package": {"ACTION": "upload-package"}}) session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert session.state is not None assert session.state.last_failed_step == "upload-package" assert any("执行已停止" in item for item in output) assert not any("LangGraph 确认运行器不可用" in item for item in output) def test_chat_resume_retries_failed_ip_without_rollback(tmp_path: Path): fake = FlakyVerifyRunner() session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert session.state is not None assert session.state.pending_confirmation == "" assert session.state.paused is False assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN" assert session.state.ip_states["192.168.1.10"]["status"] == "SUCCESS" assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS" assert not any(call[0] == "rollback-ip" for call in fake.calls) assert any("进度更新: verify-ip" in item for item in output) def test_chat_explicit_rollback_command_rolls_back_failed_ip(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["run", "yes", "yes", "yes", "rollback", "resume", "exit"]) assert session.state is not None assert session.state.pending_confirmation == "" assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE" assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS" assert any(call[0] == "rollback-ip" for call in fake.calls) assert any("回滚已完成;如需继续主流程,输入 resume" in item for item in output) def test_chat_params_events_and_checkpoint_commands(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=FakeActionRunner(), action_analysis_enabled=True), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) output = run_session( session, [ "params", "llm action-analysis on", "run", "yes", "yes", "yes", "events 20", "list checkpoints", "load checkpoint " + str(checkpoint), "exit", ], ) assert session.state is not None assert any("CLIENT_SECRET: ***" in item for item in output) assert any("ACTION_ANALYSIS" in item for item in output) assert any("checkpoint 列表" in item for item in output) def test_chat_load_params_hot_updates_running_state_and_config(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" params_file = tmp_path / "params.txt" params_file.write_text( "\n".join( [ "APP_NAME=PAM-HOT", f"ZIP_FILE_PATH={tmp_path / 'updated.zip'}", ] ) + "\n", encoding="utf-8", ) session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=FakeActionRunner()), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) run_session( session, [ "run", "yes", "yes", "yes", "load params " + str(params_file), "exit", ], ) assert session.state is not None assert session.state.params["APP_NAME"] == "PAM-HOT" assert session.state.params["ZIP_FILE_PATH"] == str((tmp_path / "updated.zip").resolve()) config_text = Path(session.state.config_path).read_text(encoding="utf-8") assert "APP_NAME=PAM-HOT" in config_text assert f"ZIP_FILE_PATH={(tmp_path / 'updated.zip').resolve()}" in config_text def test_chat_llm_review_block_message_is_visible(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent( fake_runner=FakeActionRunner(), llm_client=BlockingReviewLlmClient(), ), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert session.state is not None assert session.state.paused is True assert session.state.pause_reason == "llm_review_blocked" assert any("当前流程已暂停: llm_review_blocked" in item for item in output) assert any("- suggestion: stop and inspect" in item for item in output) assert any("如需继续,输入 resume" in item for item in output) def test_chat_llm_test_command_uses_current_client(tmp_path: Path): llm = FakeTestableLlmClient() session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["llm test 检查模型", "exit"]) assert llm.requests == ["检查模型"] assert any("正在测试 LLM: FakeTestableLlmClient" in item for item in output) assert any("LLM 测试通过" in item for item in output) assert any("- intent: deploy" in item for item in output) assert any("- strategy: hybrid_node_mcp" in item for item in output) def test_chat_can_hot_load_mcp_config(tmp_path: Path): mcp_config = tmp_path / "mcp.json" mcp_config.write_text('{"transport": "stdio", "command": "python"}', encoding="utf-8") session = InteractiveCliSession( agent=PamDeployAgent(), params=PARAMS, strategy="hybrid_node_mcp", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["mcp config " + mcp_config.as_posix(), "exit"]) assert session.agent.mcp_runner is not None assert session.agent.router.mcp_runner is not None assert any("MCP 配置已加载" in item for item in output) def test_prompt_history_creates_runtime_dir(tmp_path: Path, monkeypatch): pytest.importorskip("prompt_toolkit") monkeypatch.chdir(tmp_path) prompt = _build_prompt_input(builtins.input) assert callable(prompt) assert (tmp_path / "runtime").is_dir() def test_prompt_toolkit_enabled_when_frozen(tmp_path: Path, monkeypatch): pytest.importorskip("prompt_toolkit") monkeypatch.chdir(tmp_path) monkeypatch.setattr(sys, "frozen", True, raising=False) prompt = _build_prompt_input(builtins.input) assert callable(prompt) assert prompt is not builtins.input assert (tmp_path / "runtime").is_dir()