import builtins import sys from pathlib import Path import pytest from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.fake_runner import FakeActionRunner from pam_deploy_graph.interactive import InteractiveCliSession, _build_prompt_input from pam_deploy_graph.models import LlmActionAnalysis, LlmIntentResult PARAMS = { "HOME_BASE_URL": "https://pam.home.example.com", "CLIENT_ID": "client", "CLIENT_SECRET": "secret", "AIRPORT_CODE": "HET", "APP_NAME": "PAM", "MODULE_NAME": "Node", "VERSION_NUMBER": "2.0.5", "ZIP_FILE_PATH": "C:/pkg.zip", } class BlockingReviewLlmClient: def analyze_action_result(self, *, action, result, state_summary): return LlmActionAnalysis( action=action, has_anomaly=True, severity="high", possible_reason="review blocked", suggested_action="stop and inspect", requires_confirmation=True, should_continue=False, notes=["blocked by test llm"], ) class FakeTestableLlmClient: def __init__(self) -> None: self.requests: list[str] = [] def understand_request(self, text: str) -> LlmIntentResult: self.requests.append(text) return LlmIntentResult( intent="deploy", mode_preference="MCP", strategy_preference="hybrid_node_mcp", confidence=0.91, reasons=["test ok"], ) def extract_params(self, text, base_params=None): raise AssertionError("llm test should only call understand_request") def generate_plan(self, *, params, intent, strategy): raise AssertionError("llm test should only call understand_request") def analyze_action_result(self, *, action, result, state_summary): return LlmActionAnalysis(action=action) class FlakyVerifyRunner(FakeActionRunner): """第一次 verify-ip 失败,后续恢复成功,用于覆盖断点重试。""" def __init__(self) -> None: super().__init__() self.verify_calls = 0 def _fixture_for(self, action, kwargs): if action == "verify-ip" and kwargs.get("ip") == "192.168.1.10": self.verify_calls += 1 if self.verify_calls == 1: return { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } return super()._fixture_for(action, kwargs) class ChatProgressRunner(FakeActionRunner): """让 chat fake 部署产生一次可见的进度更新。""" def __init__(self) -> None: super().__init__() self.download_progress = ["40", "100"] def _fixture_for(self, action, kwargs): if action == "poll-download-progress": rate = self.download_progress.pop(0) if self.download_progress else "100" return { "ACTION": action, "STEP": "DONE" if rate == "100" else "RUNNING", "RATE_OF_PROGRESS": rate, "MSG": "success" if rate == "100" else "running", "MESSAGE": f"download {rate}%", } return super()._fixture_for(action, kwargs) def run_session(session: InteractiveCliSession, inputs: list[str]) -> list[str]: output: list[str] = [] iterator = iter(inputs) session.input = lambda _prompt: next(iterator) session.output = output.append session.run() return output def test_chat_analyzes_natural_language_and_updates_context(tmp_path: Path): session = InteractiveCliSession( agent=PamDeployAgent(), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["analyze please use MCP deploy 192.168.1.10", "exit"]) assert session.strategy == "hybrid_node_mcp" assert session.target_ips == ["192.168.1.10"] assert any("执行请输 run" in item for item in output) def test_chat_run_executes_fake_deploy_and_writes_checkpoint(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=FakeActionRunner()), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert checkpoint.exists() assert session.state is not None assert session.state.pending_confirmation == "" assert all(item["status"] == "SUCCESS" for item in session.state.ip_states.values()) def test_chat_run_prints_action_progress(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=FakeActionRunner()), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert any("开始执行 action: get-token" in item for item in output) assert any("完成 action: verify-ip" in item for item in output) assert any("开始分析 action 结果: get-token" in item for item in output) assert any("分析完成: verify-ip" in item for item in output) def test_chat_run_prints_progress_poll_updates(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=ChatProgressRunner()), params={**PARAMS, "POLL_INTERVAL_SEC": 0}, strategy="fake", checkpoint_path=str(checkpoint), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert any("进度更新: poll-download-progress" in item for item in output) assert any("RATE_OF_PROGRESS=40" in item for item in output) assert session.state is not None assert "poll-download-progress" in session.state.completed_global_steps def test_chat_greeting_does_not_trigger_structured_analysis(tmp_path: Path): session = InteractiveCliSession( agent=PamDeployAgent(), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["你好", "exit"]) assert session.last_analysis is None assert any("可以输入 help 查看命令" in item for item in output) assert not any("已生成结构化理解" in item for item in output) def test_chat_preflight_blocks_missing_zip_path_before_confirm(tmp_path: Path): missing_package = tmp_path / "missing.zip" session = InteractiveCliSession( agent=PamDeployAgent(), params={**PARAMS, "ZIP_FILE_PATH": str(missing_package)}, strategy="script_only", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["run", "exit"]) assert session.state is None assert any("执行前检查未通过" in item for item in output) assert any("ZIP_FILE_PATH 不存在" in item for item in output) def test_chat_action_failure_does_not_report_langgraph_unavailable(tmp_path: Path): fake = FakeActionRunner({"upload-package": {"ACTION": "upload-package"}}) session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert session.state is not None assert session.state.last_failed_step == "upload-package" assert any("执行已停止" in item for item in output) assert not any("LangGraph 确认运行器不可用" in item for item in output) def test_chat_resume_retries_failed_ip_without_rollback(tmp_path: Path): fake = FlakyVerifyRunner() session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["run", "yes", "yes", "yes", "resume", "exit"]) assert session.state is not None assert session.state.pending_confirmation == "" assert session.state.paused is False assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN" assert session.state.ip_states["192.168.1.10"]["status"] == "SUCCESS" assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS" assert not any(call[0] == "rollback-ip" for call in fake.calls) assert any("如需回滚,输入 rollback 192.168.1.10" in item for item in output) def test_chat_explicit_rollback_command_rolls_back_failed_ip(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=fake), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["run", "yes", "yes", "yes", "rollback", "resume", "exit"]) assert session.state is not None assert session.state.pending_confirmation == "" assert session.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE" assert session.state.ip_states["192.168.1.11"]["status"] == "SUCCESS" assert any(call[0] == "rollback-ip" for call in fake.calls) assert any("回滚已完成;如需继续主流程,输入 resume" in item for item in output) def test_chat_params_events_and_checkpoint_commands(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=FakeActionRunner(), action_analysis_enabled=True), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) output = run_session( session, [ "params", "llm action-analysis on", "run", "yes", "yes", "yes", "events 20", "list checkpoints", "load checkpoint " + str(checkpoint), "exit", ], ) assert session.state is not None assert any("CLIENT_SECRET: ***" in item for item in output) assert any("ACTION_ANALYSIS" in item for item in output) assert any("checkpoint 列表" in item for item in output) def test_chat_load_params_hot_updates_running_state_and_config(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" params_file = tmp_path / "params.txt" params_file.write_text( "\n".join( [ "APP_NAME=PAM-HOT", f"ZIP_FILE_PATH={tmp_path / 'updated.zip'}", ] ) + "\n", encoding="utf-8", ) session = InteractiveCliSession( agent=PamDeployAgent(fake_runner=FakeActionRunner()), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) run_session( session, [ "run", "yes", "yes", "yes", "load params " + str(params_file), "exit", ], ) assert session.state is not None assert session.state.params["APP_NAME"] == "PAM-HOT" assert session.state.params["ZIP_FILE_PATH"] == str((tmp_path / "updated.zip").resolve()) config_text = Path(session.state.config_path).read_text(encoding="utf-8") assert "APP_NAME=PAM-HOT" in config_text assert f"ZIP_FILE_PATH={(tmp_path / 'updated.zip').resolve()}" in config_text def test_chat_llm_review_block_message_is_visible(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" session = InteractiveCliSession( agent=PamDeployAgent( fake_runner=FakeActionRunner(), llm_client=BlockingReviewLlmClient(), ), params=PARAMS, strategy="fake", checkpoint_path=str(checkpoint), ) output = run_session(session, ["run", "yes", "yes", "yes", "exit"]) assert session.state is not None assert session.state.paused is True assert session.state.pause_reason == "llm_review_blocked" assert any("当前流程已暂停: llm_review_blocked" in item for item in output) assert any("- suggestion: stop and inspect" in item for item in output) assert any("如需继续,输入 resume" in item for item in output) def test_chat_llm_test_command_uses_current_client(tmp_path: Path): llm = FakeTestableLlmClient() session = InteractiveCliSession( agent=PamDeployAgent(llm_client=llm), params=PARAMS, strategy="fake", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["llm test 检查模型", "exit"]) assert llm.requests == ["检查模型"] assert any("正在测试 LLM: FakeTestableLlmClient" in item for item in output) assert any("LLM 测试通过" in item for item in output) assert any("- intent: deploy" in item for item in output) assert any("- strategy: hybrid_node_mcp" in item for item in output) def test_chat_can_hot_load_mcp_config(tmp_path: Path): mcp_config = tmp_path / "mcp.json" mcp_config.write_text('{"transport": "stdio", "command": "python"}', encoding="utf-8") session = InteractiveCliSession( agent=PamDeployAgent(), params=PARAMS, strategy="hybrid_node_mcp", checkpoint_path=str(tmp_path / "checkpoint.json"), ) output = run_session(session, ["mcp config " + mcp_config.as_posix(), "exit"]) assert session.agent.mcp_runner is not None assert session.agent.router.mcp_runner is not None assert any("MCP 配置已加载" in item for item in output) def test_prompt_history_creates_runtime_dir(tmp_path: Path, monkeypatch): pytest.importorskip("prompt_toolkit") monkeypatch.chdir(tmp_path) prompt = _build_prompt_input(builtins.input) assert callable(prompt) assert (tmp_path / "runtime").is_dir() def test_prompt_toolkit_enabled_when_frozen(tmp_path: Path, monkeypatch): pytest.importorskip("prompt_toolkit") monkeypatch.chdir(tmp_path) monkeypatch.setattr(sys, "frozen", True, raising=False) prompt = _build_prompt_input(builtins.input) assert callable(prompt) assert prompt is not builtins.input assert (tmp_path / "runtime").is_dir()