from pathlib import Path import pytest from pam_deploy_graph.agent import PamDeployAgent from pam_deploy_graph.checkpoint_store import load_agent_state from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE from pam_deploy_graph.fake_runner import FakeActionRunner from pam_deploy_graph.models import LlmActionAnalysis PARAMS = { "HOME_BASE_URL": "https://pam.home.example.com", "CLIENT_ID": "client", "CLIENT_SECRET": "secret", "AIRPORT_CODE": "HET", "APP_NAME": "PAM", "MODULE_NAME": "Node", "VERSION_NUMBER": "2.0.5", "ZIP_FILE_PATH": "C:/pkg.zip", } class BlockingReviewLlmClient: def analyze_action_result(self, *, action, result, state_summary): return LlmActionAnalysis( action=action, has_anomaly=True, severity="high", possible_reason="review blocked", suggested_action="stop and inspect", requires_confirmation=True, should_continue=False, notes=["blocked by test llm"], ) class BrokenReviewLlmClient: def analyze_action_result(self, *, action, result, state_summary): raise RuntimeError("review transport failed") def test_run_deploy_flow_success(tmp_path: Path): agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) assert state.pending_confirmation == "" assert set(state.ip_states) == {"192.168.1.10", "192.168.1.11"} assert all(item["status"] == "SUCCESS" for item in state.ip_states.values()) def test_create_state_writes_absolute_script_config_path_and_normalized_zip(tmp_path: Path): package_path = tmp_path / "pkg.zip" params = {**PARAMS, "ZIP_FILE_PATH": str(package_path)} agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( params=params, execution_strategy="fake", config_path=str(tmp_path / "runtime" / "config.txt"), trace_file_path=str(tmp_path / "logs" / "trace.log"), ) assert Path(state.config_path).is_absolute() assert Path(state.trace_file_path).is_absolute() config_text = Path(state.config_path).read_text(encoding="utf-8") assert f"ZIP_FILE_PATH={package_path.resolve()}" in config_text def test_global_action_requires_hash_code_from_upload_package(tmp_path: Path): fake = FakeActionRunner({"upload-package": {"ACTION": "upload-package"}}) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) with pytest.raises(RuntimeError, match="缺少必要字段: HASH_CODE"): agent.run_deploy_flow(state) assert state.last_failed_step == "upload-package" assert "upload-package" not in state.completed_global_steps def test_run_deploy_flow_stops_on_verify_failure(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) assert state.pending_confirmation == "" assert state.paused is True assert state.pause_reason == "action_failed" assert state.ip_states["192.168.1.10"]["status"] == "FAILED" assert state.ip_states["192.168.1.10"]["failed_stage"] == "verify-ip" assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN" assert "192.168.1.11" not in state.ip_states assert any(event["type"] == "ACTION_RETRY_REQUIRED" for event in state.events) def test_resume_retries_failed_ip_action_without_rollback(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) fake.fixtures = {} agent.resume_state(state) agent.run_deploy_flow(state) assert state.pending_confirmation == "" assert state.paused is False assert state.last_failed_step == "" assert state.ip_states["192.168.1.10"]["status"] == "SUCCESS" assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN" assert state.ip_states["192.168.1.11"]["status"] == "SUCCESS" assert not any(call[0] == "rollback-ip" for call in fake.calls) def test_action_analysis_event_is_recorded_when_enabled(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) agent = PamDeployAgent(fake_runner=fake, action_analysis_enabled=True) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) analyses = [event for event in state.events if event["type"] == "ACTION_ANALYSIS"] verify_analysis = [event for event in analyses if event["stage"] == "verify-ip"][0] assert verify_analysis["has_anomaly"] is True assert verify_analysis["severity"] == "high" assert verify_analysis["requires_confirmation"] is True def test_successful_action_can_be_blocked_by_llm_review(tmp_path: Path): agent = PamDeployAgent( fake_runner=FakeActionRunner(), llm_client=BlockingReviewLlmClient(), ) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) agent.run_deploy_flow(state) assert state.paused is True assert state.pause_reason == "llm_review_blocked" assert state.last_failed_step == "get-token" assert state.completed_global_steps == ["get-token"] assert state.review_context["stage"] == "get-token" assert state.review_context["suggested_action"] == "stop and inspect" def test_action_review_failure_pauses_flow(tmp_path: Path): agent = PamDeployAgent( fake_runner=FakeActionRunner(), llm_client=BrokenReviewLlmClient(), ) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(tmp_path / "checkpoint.json"), ) agent.run_deploy_flow(state) assert state.paused is True assert state.pause_reason == "llm_review_blocked" assert state.review_context["stage"] == "get-token" assert "LLM 审核失败" in state.review_context["possible_reason"] assert any(event["type"] == "ACTION_ANALYSIS_FAIL" for event in state.events) def test_explicit_rollback_runs_rollback_and_resume_continues(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", } } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) agent.rollback_ip(state, "192.168.1.10") agent.run_deploy_flow(state) assert state.pending_confirmation == "" assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE" assert state.ip_states["192.168.1.11"]["status"] == "SUCCESS" assert any(call[0] == "rollback-ip" for call in fake.calls) def test_failed_explicit_rollback_pauses_without_confirmation(tmp_path: Path): fake = FakeActionRunner( { "verify-ip:192.168.1.10": { "ACTION": "verify-ip", "IP": "192.168.1.10", "SUCCESS": "false", "MESSAGE": "health check failed", }, "rollback-ip:192.168.1.10": { "_fail": True, "ACTION": "rollback-ip", "IP": "192.168.1.10", "MESSAGE": "rollback failed", }, } ) agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), ) agent.run_deploy_flow(state) agent.rollback_ip(state, "192.168.1.10") assert state.pending_confirmation == "" assert state.paused is True assert state.pause_reason == "rollback_failed" assert state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_FAILED" def test_checkpoint_resume_skips_completed_global_and_success_ip(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" fake = FakeActionRunner() agent = PamDeployAgent(fake_runner=fake) state = agent.create_state( params=PARAMS, execution_strategy="fake", config_path=str(tmp_path / "config.txt"), checkpoint_path=str(checkpoint), ) state.completed_global_steps = list(GLOBAL_ACTION_SEQUENCE) state.online_ips = ["192.168.1.10", "192.168.1.11"] state.target_ips = ["192.168.1.10", "192.168.1.11"] state.ip_states["192.168.1.10"] = { "status": "SUCCESS", "completed_steps": ["upgrade-ip", "poll-upgrade-progress", "start-ip", "verify-ip", "download-log"], "failed_stage": "", "failure_reason": "", "rollback_status": "ROLLBACK_NOT_RUN", "rollback_stop_first": False, "log_file": "logs/fake.zip", } agent.run_deploy_flow(state) loaded = load_agent_state(checkpoint) called_actions = [call[0] for call in fake.calls] assert "get-token" not in called_actions assert all(call[1].get("ip") != "192.168.1.10" for call in fake.calls) assert loaded.ip_states["192.168.1.11"]["status"] == "SUCCESS" def test_update_state_params_rewrites_config_and_checkpoint(tmp_path: Path): initial_package = tmp_path / "pkg-a.zip" updated_package = tmp_path / "pkg-b.zip" checkpoint = tmp_path / "checkpoint.json" config_path = tmp_path / "config.txt" agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( params={**PARAMS, "ZIP_FILE_PATH": str(initial_package)}, execution_strategy="fake", config_path=str(config_path), checkpoint_path=str(checkpoint), ) agent.update_state_params( state, { "APP_NAME": "PAM-NEW", "ZIP_FILE_PATH": str(updated_package), }, ) loaded = load_agent_state(checkpoint) config_text = config_path.read_text(encoding="utf-8") assert state.params["APP_NAME"] == "PAM-NEW" assert state.params["ZIP_FILE_PATH"] == str(updated_package.resolve()) assert loaded.params["APP_NAME"] == "PAM-NEW" assert loaded.params["ZIP_FILE_PATH"] == str(updated_package.resolve()) assert "APP_NAME=PAM-NEW" in config_text assert f"ZIP_FILE_PATH={updated_package.resolve()}" in config_text def test_resume_state_clears_pause_fields(tmp_path: Path): checkpoint = tmp_path / "checkpoint.json" agent = PamDeployAgent(fake_runner=FakeActionRunner()) state = agent.create_state( params=PARAMS, execution_strategy="fake", checkpoint_path=str(checkpoint), ) agent.pause_state(state, reason="manual_test", review_context={"stage": "get-token"}) resumed = agent.resume_state(state) loaded = load_agent_state(checkpoint) assert resumed.paused is False assert resumed.pause_reason == "" assert resumed.review_context == {} assert loaded.paused is False assert loaded.pause_reason == ""