2026-06-04 10:04:23 +08:00

152 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""把 PAM 部署 Skill 文档加载为简化策略对象。"""
from __future__ import annotations
import re
from pathlib import Path
from .constants import (
ALLOWED_ACTIONS,
DEFAULT_PARAMS,
GLOBAL_ACTION_SEQUENCE,
IP_ACTION_SEQUENCE,
REQUIRED_PARAMS,
)
from .models import SkillPolicy
def load_skill_policy(path: str | Path) -> SkillPolicy:
"""读取 Skill markdown并提取真正参与执行的策略约束。"""
skill_path = Path(path)
text = skill_path.read_text(encoding="utf-8")
name = "pam-auto-deply"
description = ""
if text.startswith("---"):
parts = text.split("---", 2)
if len(parts) >= 3:
for line in parts[1].splitlines():
if line.startswith("name:"):
name = line.split(":", 1)[1].strip()
elif line.startswith("description:"):
description = line.split(":", 1)[1].strip()
parsed_actions = _parse_allowed_actions(text)
action_sequence = _parse_action_sequence(text)
ip_action_sequence = tuple(action for action in action_sequence if action in IP_ACTION_SEQUENCE) or IP_ACTION_SEQUENCE
global_sequence = tuple(action for action in action_sequence if action in GLOBAL_ACTION_SEQUENCE) or GLOBAL_ACTION_SEQUENCE
required_confirmations = _parse_required_confirmations(text)
forbidden_actions = _parse_forbidden_actions(text)
required_params = _parse_required_params(text)
return SkillPolicy(
name=name,
source_path=str(skill_path),
description=description,
allowed_actions=parsed_actions or ALLOWED_ACTIONS,
required_params=required_params or REQUIRED_PARAMS,
optional_params=DEFAULT_PARAMS.copy(),
required_confirmations=required_confirmations or ("params", "target_scope", "rollback"),
action_sequence=global_sequence,
ip_action_sequence=ip_action_sequence,
forbidden_actions=forbidden_actions or ("script-main-flow", "auto-rollback", "modify-deploy-scripts"),
)
def _parse_allowed_actions(text: str) -> tuple[str, ...]:
"""从 skill 文档的 action 表中提取允许的 action。"""
section = _section_body(text, "### 6.3 可用 action")
if not section:
return ()
actions: list[str] = []
for raw_line in section.splitlines():
line = raw_line.strip()
if not line.startswith("|"):
continue
parts = [item.strip() for item in line.strip("|").split("|")]
if not parts or parts[0] in ("action", "---"):
continue
action = parts[0].strip("` ")
if action in ALLOWED_ACTIONS and action not in actions:
actions.append(action)
return tuple(actions)
def _parse_action_sequence(text: str) -> tuple[str, ...]:
"""从主流程章节提取推荐执行顺序。"""
section = _section_body(text, "### 4.1 正式部署主流程")
if not section:
return ()
found: list[str] = []
for action in [*GLOBAL_ACTION_SEQUENCE, *IP_ACTION_SEQUENCE]:
if re.search(rf"\b{re.escape(action)}\b", section) and action not in found:
found.append(action)
return tuple(found)
def _parse_required_confirmations(text: str) -> tuple[str, ...]:
"""从强制确认点章节提取确认类型。"""
section = _section_body(text, "### 4.2 主流程中的强制确认点")
if not section:
return ()
confirmations: list[str] = []
keyword_map = {
"参数确认": "params",
"目标 ip": "target_scope",
"部署范围": "target_scope",
"回滚": "rollback",
"间隔策略": "interval_policy",
}
lowered = section.lower()
for keyword, name in keyword_map.items():
if keyword in lowered or keyword in section:
confirmations.append(name)
return tuple(dict.fromkeys(confirmations))
def _parse_forbidden_actions(text: str) -> tuple[str, ...]:
"""从禁止事项章节提取禁止项。"""
section = _section_body(text, "### 7.5 明确禁止的做法")
if not section:
return ()
forbidden: list[str] = []
if "脚本主流程" in section:
forbidden.append("script-main-flow")
if "自动执行回滚" in section or "自动回滚" in section:
forbidden.append("auto-rollback")
if "修改脚本" in section or "自动生成" in section:
forbidden.append("modify-deploy-scripts")
return tuple(dict.fromkeys(forbidden))
def _parse_required_params(text: str) -> tuple[str, ...]:
"""从参数表提取必填脚本字段。"""
section = _section_body(text, "### 3.1 必填业务参数")
if not section:
return ()
params: list[str] = []
for raw_line in section.splitlines():
line = raw_line.strip()
if not line.startswith("|"):
continue
parts = [item.strip() for item in line.strip("|").split("|")]
if len(parts) < 3 or parts[0] in ("规范字段", "---"):
continue
script_field = parts[1].strip("` ")
required_flag = parts[2]
if script_field in REQUIRED_PARAMS and required_flag == "":
params.append(script_field)
return tuple(params)
def _section_body(text: str, heading: str) -> str:
"""提取指定 markdown heading 到下一同级 heading 之间的正文。"""
marker = f"{heading}\n"
if marker not in text:
return ""
_, tail = text.split(marker, 1)
matches = list(re.finditer(r"^###\s+", tail, flags=re.MULTILINE))
if matches:
return tail[: matches[0].start()]
return tail