agent_deply/tests/test_langgraph_runtime.py
dark d01c4d3d06 feat: 完善交互式部署与 MCP/LLM 配置能力
- 新增 MCP client 配置加载,支持 CLI/chat 通过配置文件接入 MCP
- 完善 chat 交互命令,支持参数查看、事件查看、checkpoint 列表与加载
- 增加 LLM action 后诊断能力,支持真实 LLM 和本地规则兜底
- 将 chat 人工确认点接入 LangGraph interrupt/checkpointer
- 更新 README、流程图、待办文档和打包说明
- 补充相关单元测试
2026-06-01 16:45:52 +08:00

55 lines
1.7 KiB
Python

from pathlib import Path
from pam_deploy_graph.agent import PamDeployAgent
from pam_deploy_graph.fake_runner import FakeActionRunner
from pam_deploy_graph.langgraph_runtime import LangGraphDeploymentRuntime
PARAMS = {
"HOME_BASE_URL": "https://pam.home.example.com",
"CLIENT_ID": "client",
"CLIENT_SECRET": "secret",
"AIRPORT_CODE": "HET",
"APP_NAME": "PAM",
"MODULE_NAME": "Node",
"VERSION_NUMBER": "2.0.5",
"ZIP_FILE_PATH": "C:/pkg.zip",
}
def test_langgraph_runtime_interrupts_and_resumes_confirmation(tmp_path: Path):
fake = FakeActionRunner(
{
"verify-ip:192.168.1.10": {
"ACTION": "verify-ip",
"IP": "192.168.1.10",
"SUCCESS": "false",
"MESSAGE": "health check failed",
}
}
)
agent = PamDeployAgent(fake_runner=fake)
state = agent.create_state(
params=PARAMS,
execution_strategy="fake",
config_path=str(tmp_path / "config.txt"),
checkpoint_path=str(tmp_path / "checkpoint.json"),
)
runtime = LangGraphDeploymentRuntime(agent=agent)
first = runtime.start(state)
assert first.interrupted is True
assert runtime.waiting_confirmation is True
assert first.confirmation["type"] == "rollback-ip"
assert first.confirmation["ip"] == "192.168.1.10"
second = runtime.resume(approved=True)
assert second.interrupted is False
assert runtime.waiting_confirmation is False
assert second.state is not None
assert second.state.pending_confirmation == ""
assert second.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE"
assert second.state.ip_states["192.168.1.11"]["status"] == "SUCCESS"