agent_deply/tests/test_langgraph_runtime.py
dark badcce5d2d verify-ip 等逐 IP action 失败后不再进入自动回滚确认,改为保存 failed_stage 并暂停。
用户修复外部问题后输入 resume,会从失败 action 重新执行,而不是结束整个流程。
回滚从 workflow 中拆出,新增显式命令:
chat:rollback [IP]
CLI:rollback --checkpoint ... [--ip ...] [--stop-first|--no-stop-first]
旧 confirm approve/reject 只保留为旧 checkpoint 兼容入口,新流程不再推荐使用。
LangGraph workflow 已移除回滚确认 interrupt 节点,失败暂停和续跑走业务 checkpoint。
README、打包 README、run.sh --help、流程图、todo、提示词基线和测试都已同步。
2026-06-04 13:46:19 +08:00

61 lines
1.9 KiB
Python

from pathlib import Path
from pam_deploy_graph.agent import PamDeployAgent
from pam_deploy_graph.fake_runner import FakeActionRunner
from pam_deploy_graph.langgraph_runtime import LangGraphDeploymentRuntime
PARAMS = {
"HOME_BASE_URL": "https://pam.home.example.com",
"CLIENT_ID": "client",
"CLIENT_SECRET": "secret",
"AIRPORT_CODE": "HET",
"APP_NAME": "PAM",
"MODULE_NAME": "Node",
"VERSION_NUMBER": "2.0.5",
"ZIP_FILE_PATH": "C:/pkg.zip",
}
def test_langgraph_runtime_pauses_failure_and_resume_retries(tmp_path: Path):
fake = FakeActionRunner(
{
"verify-ip:192.168.1.10": {
"ACTION": "verify-ip",
"IP": "192.168.1.10",
"SUCCESS": "false",
"MESSAGE": "health check failed",
}
}
)
agent = PamDeployAgent(fake_runner=fake)
state = agent.create_state(
params=PARAMS,
execution_strategy="fake",
config_path=str(tmp_path / "config.txt"),
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
runtime = LangGraphDeploymentRuntime(agent=agent)
first = runtime.start(state)
assert first.interrupted is False
assert runtime.waiting_confirmation is False
assert first.confirmation == {}
assert first.state is not None
assert first.state.paused is True
assert first.state.pending_confirmation == ""
assert first.state.ip_states["192.168.1.10"]["failed_stage"] == "verify-ip"
fake.fixtures = {}
agent.resume_state(first.state)
second = runtime.start(first.state)
assert second.interrupted is False
assert runtime.waiting_confirmation is False
assert second.state is not None
assert second.state.pending_confirmation == ""
assert second.state.paused is False
assert second.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_NOT_RUN"
assert second.state.ip_states["192.168.1.11"]["status"] == "SUCCESS"