- 新增 pam_deploy_graph 包,包含 agent、action router、runner、parser 和配置加载能力 - 支持 hybrid_node_mcp 路由策略:PAM_HOME 走脚本 action,PAM_NODE 走 MCP - 新增 fake runner 和 CLI 预演/全局流程验证入口 - 新增路由、输出解析、配置加载、脚本命令构造、Skill 策略加载测试 - 在 README 中记录当前代码骨架、实现进度、使用方式和下一步建议
41 lines
1.1 KiB
Python
41 lines
1.1 KiB
Python
"""Business checkpoint JSON storage."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import asdict, is_dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .constants import SENSITIVE_KEYS
|
|
|
|
|
|
def redact_mapping(value: Any) -> Any:
|
|
if isinstance(value, dict):
|
|
result = {}
|
|
for key, item in value.items():
|
|
if str(key) in SENSITIVE_KEYS:
|
|
result[key] = "***"
|
|
else:
|
|
result[key] = redact_mapping(item)
|
|
return result
|
|
if isinstance(value, list):
|
|
return [redact_mapping(item) for item in value]
|
|
return value
|
|
|
|
|
|
def save_checkpoint(state: Any, path: str | Path) -> Path:
|
|
checkpoint_path = Path(path)
|
|
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)
|
|
payload = asdict(state) if is_dataclass(state) else state
|
|
checkpoint_path.write_text(
|
|
json.dumps(redact_mapping(payload), ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
return checkpoint_path
|
|
|
|
|
|
def load_checkpoint(path: str | Path) -> dict[str, Any]:
|
|
return json.loads(Path(path).read_text(encoding="utf-8"))
|
|
|