from pathlib import Path import pytest from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.checkpoint_store import load_agent_state from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE from pam_deploy_graph.fake_runner import FakeActionRunner from pam_deploy_graph.models import LlmActionAnalysis PARAMS = { "HOME_BASE_URL": "https://pam.home.example.com", "CLIENT_ID": "client", "CLIENT_SECRET": "secret", "AIRPORT_CODE": "HET", "APP_NAME": "PAM", "MODULE_NAME": "Node", "VERSION_NUMBER": "2.0.5", "ZIP_FILE_PATH": "C:/pkg.zip", "VERIFY_INTERVAL_SEC": 0, "VERIFY_MAX_ATTEMPTS": 2, } class BlockingReviewLlmClient: def analyze_action_result(self, *, action, result): return LlmActionAnalysis( action=action, has_anomaly=True, severity="high", possible_reason="review blocked", suggested_action="stop and inspect", requires_confirmation=True, should_continue=False, notes=["blocked by test llm"], ) class BlockingOnceReviewLlmClient: def __init__(self, blocked_action: str = "get-token") -> None: self.blocked_action = blocked_action self.blocked = False def analyze_action_result(self, *, action, result): if action == self.blocked_action and not self.blocked: self.blocked = True return LlmActionAnalysis( action=action, has_anomaly=True, severity="high", possible_reason="review blocked once", suggested_action="fix then retry current action", requires_confirmation=True, should_continue=False, ) return LlmActionAnalysis(action=action) class BrokenReviewLlmClient: def analyze_action_result(self, *, action, result): raise RuntimeError("review transport failed") class ProgressivePollRunner(FakeActionRunner): """模拟下载和推送进度多次查询后才完成。""" def __init__(self) -> None: super().__init__() self.download_progress = ["10", "55", "100"] self.upgrade_progress: dict[str, list[str]] = {} def _fixture_for(self, action, kwargs): if action == "poll-download-progress": rate = self.download_progress.pop(0) if self.download_progress else "100" return { "ACTION": action, "STEP": "DONE" if rate == "100" else "RUNNING", "RATE_OF_PROGRESS": rate, "MSG": "success" if rate == "100" else "running", "MESSAGE": f"download {rate}%", } if action == "poll-upgrade-progress": ip = kwargs.get("ip", "") values = self.upgrade_progress.setdefault(str(ip), ["30", "100"]) rate = values.pop(0) if values else "100" return { "ACTION": action, "IP": ip, "STEP": "DONE" if rate == "100" else "RUNNING", "RATE_OF_PROGRESS": rate, "MSG": "success" if rate == "100" else "running", "MESSAGE": f"upgrade {rate}%", } return super()._fixture_for(action, kwargs) class FlakyVerifyRunner(FakeActionRunner): """模拟应用启动后第二次健康检查通过。""" def __init__(self) -> None: super().__init__() self.verify_calls = 0 def _fixture_for(self, action, kwargs): if action == "verify-ip" and kwargs.get("ip") == "192.168.1.10": self.verify_calls += 1 if self.verify_calls == 1: return { "ACTION": action, "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "application is starting", } return super()._fixture_for(action, kwargs) def test_run_deploy_flow_success(tmp_path: Path): agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) assert state.pending_confirmation == "" assert set(state.ip_states) == {"192.168.1.10", "192.168.1.11"} assert all(item["status"] == "SUCCESS" for item in state.ip_states.values()) def test_progress_actions_repeat_until_llm_marks_complete(tmp_path: Path): fake = ProgressivePollRunner() agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params={**PARAMS, "POLL_INTERVAL_SEC": 0}, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) agent.run_deploy_flow(state) calls = [call[0] for call in fake.calls] assert calls.count("poll-download-progress") == 3 assert calls.count("poll-upgrade-progress") == 4 assert "poll-download-progress" in state.completed_global_steps assert state.poll_attempts == {} assert all(item["status"] == "SUCCESS" for item in state.ip_states.values()) progress_events = [event for event in state.events if event["type"] == "ACTION_PROGRESS"] assert any(event["stage"] == "poll-download-progress" and "RATE_OF_PROGRESS=10" in event["message"] for event in progress_events) assert any(event["stage"] == "poll-upgrade-progress" and event["ip"] == "192.168.1.10" for event in progress_events) def test_progress_timeout_pauses_on_current_action(tmp_path: Path): fake = FakeActionRunner( { "poll-download-progress": { "ACTION": "poll-download-progress", "STEP": "RUNNING", "RATE_OF_PROGRESS": "20", "MSG": "running", "MESSAGE": "download 20%", } } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params={**PARAMS, "POLL_INTERVAL_SEC": 0, "DOWNLOAD_POLL_MAX_ATTEMPTS": 2}, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) agent.run_deploy_flow(state) assert state.paused is True assert state.pause_reason == "progress_timeout" assert state.last_failed_step == "poll-download-progress" assert "poll-download-progress" not in state.completed_global_steps assert state.review_context["stage"] == "poll-download-progress" assert state.poll_attempts["global:poll-download-progress"] == 2 def test_verify_ip_retries_until_success_before_marking_failed(tmp_path: Path): fake = FlakyVerifyRunner() agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) agent.run_deploy_flow(state) assert fake.verify_calls == 2 assert state.paused is False assert state.poll_attempts == {} assert state.ip_states["192.168.1.10"]["status"] == "SUCCESS" assert any( event["type"] == "ACTION_PROGRESS" and event["stage"] == "verify-ip" and event["ip"] == "192.168.1.10" for event in state.events ) def test_create_state_writes_absolute_script_config_path_and_normalized_zip(tmp_path: Path): package_path = tmp_path / "pkg.zip" params = {**PARAMS, "ZIP_FILE_PATH": str(package_path)} agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( params=params, execution_strategy="fake", config_path=str(tmp_path / "runtime" / "config.txt"), trace_file_path=str(tmp_path / "logs" / "trace.log"), ) assert Path(state.config_path).is_absolute() assert Path(state.trace_file_path).is_absolute() config_text = Path(state.config_path).read_text(encoding="utf-8") assert f"ZIP_FILE_PATH={package_path.resolve()}" in config_text assert "PARENT_VERSION_NUMBER=\n" in config_text def test_global_action_requires_hash_code_from_upload_package(tmp_path: Path): fake = FakeActionRunner({"upload-package": {"ACTION": "upload-package"}}) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) with pytest.raises(RuntimeError, match="缺少必要字段: HASH_CODE"): agent.run_deploy_flow(state) assert state.last_failed_step == "upload-package" assert "upload-package" not in state.completed_global_steps def test_run_deploy_flow_stops_on_verify_failure(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) verify_calls = [call for call in fake.calls if call[0] == "verify-ip" and call[1].get("ip") == "192.168.1.10"] assert len(verify_calls) == 2 assert state.pending_confirmation == "" assert state.paused is True assert state.pause_reason == "action_failed" assert state.ip_states["192.168.1.10"]["status"] == "FAILED" assert state.ip_states["192.168.1.10"]["failed_stage"] == "verify-ip" assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN" assert "192.168.1.11" not in state.ip_states assert any(event["type"] == "ACTION_RETRY_REQUIRED" for event in state.events) assert not any(call[0] == "download-log" for call in fake.calls) def test_resume_retries_failed_ip_action_without_rollback(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) fake.fixtures = {} agent.resume_state(state) agent.run_deploy_flow(state) assert state.pending_confirmation == "" assert state.paused is False assert state.last_failed_step == "" assert state.ip_states["192.168.1.10"]["status"] == "SUCCESS" assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN" assert state.ip_states["192.168.1.11"]["status"] == "SUCCESS" assert not any(call[0] == "rollback-ip" for call in fake.calls) def test_action_analysis_event_is_recorded_when_enabled(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) agent = PamDeployAgent(fake_runner=fake, action_analysis_enabled=True) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) analyses = [event for event in state.events if event["type"] == "ACTION_ANALYSIS"] verify_analysis = [event for event in analyses if event["stage"] == "verify-ip"][0] assert verify_analysis["has_anomaly"] is True assert verify_analysis["severity"] == "high" assert verify_analysis["requires_confirmation"] is True def test_successful_action_can_be_blocked_by_llm_review(tmp_path: Path): agent = PamDeployAgent( fake_runner=FakeActionRunner(), llm_client=BlockingReviewLlmClient(), ) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) agent.run_deploy_flow(state) assert state.paused is True assert state.pause_reason == "llm_review_blocked" assert state.last_failed_step == "get-token" assert state.completed_global_steps == [] assert state.review_context["stage"] == "get-token" assert state.review_context["suggested_action"] == "stop and inspect" def test_resume_retries_llm_blocked_global_action(tmp_path: Path): fake = FakeActionRunner() agent = PamDeployAgent( fake_runner=fake, llm_client=BlockingOnceReviewLlmClient(), ) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) agent.run_deploy_flow(state) agent.resume_state(state) agent.run_deploy_flow(state) called_actions = [call[0] for call in fake.calls] assert called_actions[:2] == ["get-token", "get-token"] assert called_actions.count("get-token") == 2 assert state.paused is False assert state.completed_global_steps[0] == "get-token" def test_action_review_failure_pauses_flow(tmp_path: Path): agent = PamDeployAgent( fake_runner=FakeActionRunner(), llm_client=BrokenReviewLlmClient(), ) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) agent.run_deploy_flow(state) assert state.paused is True assert state.pause_reason == "llm_review_blocked" assert state.review_context["stage"] == "get-token" assert "LLM 审核失败" in state.review_context["possible_reason"] assert state.completed_global_steps == [] assert any(event["type"] == "ACTION_ANALYSIS_FAIL" for event in state.events) def test_explicit_rollback_runs_rollback_and_resume_continues(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) agent.rollback_ip(state, "192.168.1.10") agent.run_deploy_flow(state) assert state.pending_confirmation == "" assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE" assert state.ip_states["192.168.1.11"]["status"] == "SUCCESS" assert any(call[0] == "rollback-ip" for call in fake.calls) def test_failed_explicit_rollback_pauses_without_confirmation(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", }, "rollback-ip:192.168.1.10": { "_fail": True, "ACTION": "rollback-ip", "IP": "192.168.1.10", "MESSAGE": "rollback failed", }, } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) agent.rollback_ip(state, "192.168.1.10") assert state.pending_confirmation == "" assert state.paused is True assert state.pause_reason == "rollback_failed" assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_FAILED" def test_checkpoint_resume_skips_completed_global_and_success_ip(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" fake = FakeActionRunner() agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(checkpoint), ) state.completed_global_steps = list(GLOBAL_ACTION_SEQUENCE) state.online_ips = ["192.168.1.10", "192.168.1.11"] state.target_ips = ["192.168.1.10", "192.168.1.11"] state.ip_states["192.168.1.10"] = { "status": "SUCCESS", "completed_steps": ["upgrade-ip", "poll-upgrade-progress", "start-ip", "verify-ip", "download-log"], "failed_stage": "", "failure_reason": "", "rollback_status": "ROLLBACK_NOT_RUN", "rollback_stop_first": False, "log_file": "logs/fake.zip", } agent.run_deploy_flow(state) loaded = load_agent_state(checkpoint) called_actions = [call[0] for call in fake.calls] assert "get-token" not in called_actions assert all(call[1].get("ip") != "192.168.1.10" for call in fake.calls) assert loaded.ip_states["192.168.1.11"]["status"] == "SUCCESS" def test_update_state_params_rewrites_config_and_checkpoint(tmp_path: Path): initial_package = tmp_path / "pkg-a.zip" updated_package = tmp_path / "pkg-b.zip" checkpoint = tmp_path / "checkpoint.json" config_path = tmp_path / "config.txt" agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( params={**PARAMS, "ZIP_FILE_PATH": str(initial_package)}, execution_strategy="fake", config_path=str(config_path), checkpoint_path=str(checkpoint), ) agent.update_state_params( state, { "APP_NAME": "PAM-NEW", "ZIP_FILE_PATH": str(updated_package), }, ) loaded = load_agent_state(checkpoint) config_text = config_path.read_text(encoding="utf-8") assert state.params["APP_NAME"] == "PAM-NEW" assert state.params["ZIP_FILE_PATH"] == str(updated_package.resolve()) assert loaded.params["APP_NAME"] == "PAM-NEW" assert loaded.params["ZIP_FILE_PATH"] == str(updated_package.resolve()) assert "APP_NAME=PAM-NEW" in config_text assert f"ZIP_FILE_PATH={updated_package.resolve()}" in config_text def test_resume_state_clears_pause_fields(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( params=PARAMS, execution_strategy="fake", checkpoint_path=str(checkpoint), ) agent.pause_state(state, reason="manual_test", review_context={"stage": "get-token"}) resumed = agent.resume_state(state) loaded = load_agent_state(checkpoint) assert resumed.paused is False assert resumed.pause_reason == "" assert resumed.review_context == {} assert loaded.paused is False assert loaded.pause_reason == ""