- 新增 MCP client 配置加载,支持 CLI/chat 通过配置文件接入 MCP - 完善 chat 交互命令,支持参数查看、事件查看、checkpoint 列表与加载 - 增加 LLM action 后诊断能力,支持真实 LLM 和本地规则兜底 - 将 chat 人工确认点接入 LangGraph interrupt/checkpointer - 更新 README、流程图、待办文档和打包说明 - 补充相关单元测试
55 lines
1.7 KiB
Python
55 lines
1.7 KiB
Python
from pathlib import Path
|
|
|
|
from pam_deploy_graph.agent import PamDeployAgent
|
|
from pam_deploy_graph.fake_runner import FakeActionRunner
|
|
from pam_deploy_graph.langgraph_runtime import LangGraphDeploymentRuntime
|
|
|
|
|
|
PARAMS = {
|
|
"HOME_BASE_URL": "https://pam.home.example.com",
|
|
"CLIENT_ID": "client",
|
|
"CLIENT_SECRET": "secret",
|
|
"AIRPORT_CODE": "HET",
|
|
"APP_NAME": "PAM",
|
|
"MODULE_NAME": "Node",
|
|
"VERSION_NUMBER": "2.0.5",
|
|
"ZIP_FILE_PATH": "C:/pkg.zip",
|
|
}
|
|
|
|
|
|
def test_langgraph_runtime_interrupts_and_resumes_confirmation(tmp_path: Path):
|
|
fake = FakeActionRunner(
|
|
{
|
|
"verify-ip:192.168.1.10": {
|
|
"ACTION": "verify-ip",
|
|
"IP": "192.168.1.10",
|
|
"SUCCESS": "false",
|
|
"MESSAGE": "health check failed",
|
|
}
|
|
}
|
|
)
|
|
agent = PamDeployAgent(fake_runner=fake)
|
|
state = agent.create_state(
|
|
params=PARAMS,
|
|
execution_strategy="fake",
|
|
config_path=str(tmp_path / "config.txt"),
|
|
checkpoint_path=str(tmp_path / "checkpoint.json"),
|
|
)
|
|
runtime = LangGraphDeploymentRuntime(agent=agent)
|
|
|
|
first = runtime.start(state)
|
|
|
|
assert first.interrupted is True
|
|
assert runtime.waiting_confirmation is True
|
|
assert first.confirmation["type"] == "rollback-ip"
|
|
assert first.confirmation["ip"] == "192.168.1.10"
|
|
|
|
second = runtime.resume(approved=True)
|
|
|
|
assert second.interrupted is False
|
|
assert runtime.waiting_confirmation is False
|
|
assert second.state is not None
|
|
assert second.state.pending_confirmation == ""
|
|
assert second.state.ip_states["192.168.1.10"]["rollback_status"] == "ROLLBACK_DONE"
|
|
assert second.state.ip_states["192.168.1.11"]["status"] == "SUCCESS"
|