152 lines
5.5 KiB
Python
152 lines
5.5 KiB
Python
"""把 PAM 部署 Skill 文档加载为简化策略对象。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from pathlib import Path
|
||
|
||
from .constants import (
|
||
ALLOWED_ACTIONS,
|
||
DEFAULT_PARAMS,
|
||
GLOBAL_ACTION_SEQUENCE,
|
||
IP_ACTION_SEQUENCE,
|
||
REQUIRED_PARAMS,
|
||
)
|
||
from .models import SkillPolicy
|
||
|
||
|
||
def load_skill_policy(path: str | Path) -> SkillPolicy:
|
||
"""读取 Skill markdown,并提取真正参与执行的策略约束。"""
|
||
skill_path = Path(path)
|
||
text = skill_path.read_text(encoding="utf-8")
|
||
name = "pam-auto-deply"
|
||
description = ""
|
||
|
||
if text.startswith("---"):
|
||
parts = text.split("---", 2)
|
||
if len(parts) >= 3:
|
||
for line in parts[1].splitlines():
|
||
if line.startswith("name:"):
|
||
name = line.split(":", 1)[1].strip()
|
||
elif line.startswith("description:"):
|
||
description = line.split(":", 1)[1].strip()
|
||
|
||
parsed_actions = _parse_allowed_actions(text)
|
||
action_sequence = _parse_action_sequence(text)
|
||
ip_action_sequence = tuple(action for action in action_sequence if action in IP_ACTION_SEQUENCE) or IP_ACTION_SEQUENCE
|
||
global_sequence = tuple(action for action in action_sequence if action in GLOBAL_ACTION_SEQUENCE) or GLOBAL_ACTION_SEQUENCE
|
||
required_confirmations = _parse_required_confirmations(text)
|
||
forbidden_actions = _parse_forbidden_actions(text)
|
||
required_params = _parse_required_params(text)
|
||
|
||
return SkillPolicy(
|
||
name=name,
|
||
source_path=str(skill_path),
|
||
description=description,
|
||
allowed_actions=parsed_actions or ALLOWED_ACTIONS,
|
||
required_params=required_params or REQUIRED_PARAMS,
|
||
optional_params=DEFAULT_PARAMS.copy(),
|
||
required_confirmations=required_confirmations or ("params", "target_scope", "rollback"),
|
||
action_sequence=global_sequence,
|
||
ip_action_sequence=ip_action_sequence,
|
||
forbidden_actions=forbidden_actions or ("script-main-flow", "auto-rollback", "modify-deploy-scripts"),
|
||
)
|
||
|
||
|
||
def _parse_allowed_actions(text: str) -> tuple[str, ...]:
|
||
"""从 skill 文档的 action 表中提取允许的 action。"""
|
||
section = _section_body(text, "### 6.3 可用 action")
|
||
if not section:
|
||
return ()
|
||
actions: list[str] = []
|
||
for raw_line in section.splitlines():
|
||
line = raw_line.strip()
|
||
if not line.startswith("|"):
|
||
continue
|
||
parts = [item.strip() for item in line.strip("|").split("|")]
|
||
if not parts or parts[0] in ("action", "---"):
|
||
continue
|
||
action = parts[0].strip("` ")
|
||
if action in ALLOWED_ACTIONS and action not in actions:
|
||
actions.append(action)
|
||
return tuple(actions)
|
||
|
||
|
||
def _parse_action_sequence(text: str) -> tuple[str, ...]:
|
||
"""从主流程章节提取推荐执行顺序。"""
|
||
section = _section_body(text, "### 4.1 正式部署主流程")
|
||
if not section:
|
||
return ()
|
||
found: list[str] = []
|
||
for action in [*GLOBAL_ACTION_SEQUENCE, *IP_ACTION_SEQUENCE]:
|
||
if re.search(rf"\b{re.escape(action)}\b", section) and action not in found:
|
||
found.append(action)
|
||
return tuple(found)
|
||
|
||
|
||
def _parse_required_confirmations(text: str) -> tuple[str, ...]:
|
||
"""从强制确认点章节提取确认类型。"""
|
||
section = _section_body(text, "### 4.2 主流程中的强制确认点")
|
||
if not section:
|
||
return ()
|
||
confirmations: list[str] = []
|
||
keyword_map = {
|
||
"参数确认": "params",
|
||
"目标 ip": "target_scope",
|
||
"部署范围": "target_scope",
|
||
"回滚": "rollback",
|
||
"间隔策略": "interval_policy",
|
||
}
|
||
lowered = section.lower()
|
||
for keyword, name in keyword_map.items():
|
||
if keyword in lowered or keyword in section:
|
||
confirmations.append(name)
|
||
return tuple(dict.fromkeys(confirmations))
|
||
|
||
|
||
def _parse_forbidden_actions(text: str) -> tuple[str, ...]:
|
||
"""从禁止事项章节提取禁止项。"""
|
||
section = _section_body(text, "### 7.5 明确禁止的做法")
|
||
if not section:
|
||
return ()
|
||
forbidden: list[str] = []
|
||
if "脚本主流程" in section:
|
||
forbidden.append("script-main-flow")
|
||
if "自动执行回滚" in section or "自动回滚" in section:
|
||
forbidden.append("auto-rollback")
|
||
if "修改脚本" in section or "自动生成" in section:
|
||
forbidden.append("modify-deploy-scripts")
|
||
return tuple(dict.fromkeys(forbidden))
|
||
|
||
|
||
def _parse_required_params(text: str) -> tuple[str, ...]:
|
||
"""从参数表提取必填脚本字段。"""
|
||
section = _section_body(text, "### 3.1 必填业务参数")
|
||
if not section:
|
||
return ()
|
||
params: list[str] = []
|
||
for raw_line in section.splitlines():
|
||
line = raw_line.strip()
|
||
if not line.startswith("|"):
|
||
continue
|
||
parts = [item.strip() for item in line.strip("|").split("|")]
|
||
if len(parts) < 3 or parts[0] in ("规范字段", "---"):
|
||
continue
|
||
script_field = parts[1].strip("` ")
|
||
required_flag = parts[2]
|
||
if script_field in REQUIRED_PARAMS and required_flag == "是":
|
||
params.append(script_field)
|
||
return tuple(params)
|
||
|
||
|
||
def _section_body(text: str, heading: str) -> str:
|
||
"""提取指定 markdown heading 到下一同级 heading 之间的正文。"""
|
||
marker = f"{heading}\n"
|
||
if marker not in text:
|
||
return ""
|
||
_, tail = text.split(marker, 1)
|
||
matches = list(re.finditer(r"^###\s+", tail, flags=re.MULTILINE))
|
||
if matches:
|
||
return tail[: matches[0].start()]
|
||
return tail
|