agent_deply/pam_deploy_graph/checkpoint_store.py
dark ab7b839bc6 feat: 新增 PAM 智能部署 Agent 运行时骨架
- 新增 pam_deploy_graph 包,包含 agent、action router、runner、parser 和配置加载能力
- 支持 hybrid_node_mcp 路由策略:PAM_HOME 走脚本 action,PAM_NODE 走 MCP
- 新增 fake runner 和 CLI 预演/全局流程验证入口
- 新增路由、输出解析、配置加载、脚本命令构造、Skill 策略加载测试
- 在 README 中记录当前代码骨架、实现进度、使用方式和下一步建议
2026-05-29 14:49:41 +08:00

41 lines
1.1 KiB
Python

"""Business checkpoint JSON storage."""
from __future__ import annotations
import json
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any
from .constants import SENSITIVE_KEYS
def redact_mapping(value: Any) -> Any:
if isinstance(value, dict):
result = {}
for key, item in value.items():
if str(key) in SENSITIVE_KEYS:
result[key] = "***"
else:
result[key] = redact_mapping(item)
return result
if isinstance(value, list):
return [redact_mapping(item) for item in value]
return value
def save_checkpoint(state: Any, path: str | Path) -> Path:
checkpoint_path = Path(path)
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)
payload = asdict(state) if is_dataclass(state) else state
checkpoint_path.write_text(
json.dumps(redact_mapping(payload), ensure_ascii=False, indent=2),
encoding="utf-8",
)
return checkpoint_path
def load_checkpoint(path: str | Path) -> dict[str, Any]:
return json.loads(Path(path).read_text(encoding="utf-8"))