"""把 PAM 部署 Skill 文档加载为简化策略对象。""" from __future__ import annotations import re from pathlib import Path from .constants import ( ALLOWED_ACTIONS, DEFAULT_PARAMS, GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE, REQUIRED_PARAMS, ) from .models import SkillPolicy def load_skill_policy(path: str | Path) -> SkillPolicy: """读取 Skill markdown,并提取真正参与执行的策略约束。""" skill_path = Path(path) text = skill_path.read_text(encoding="utf-8") name = "pam-auto-deply" description = "" if text.startswith("---"): parts = text.split("---", 2) if len(parts) >= 3: for line in parts[1].splitlines(): if line.startswith("name:"): name = line.split(":", 1)[1].strip() elif line.startswith("description:"): description = line.split(":", 1)[1].strip() parsed_actions = _parse_allowed_actions(text) action_sequence = _parse_action_sequence(text) ip_action_sequence = tuple(action for action in action_sequence if action in IP_ACTION_SEQUENCE) or IP_ACTION_SEQUENCE global_sequence = tuple(action for action in action_sequence if action in GLOBAL_ACTION_SEQUENCE) or GLOBAL_ACTION_SEQUENCE required_confirmations = _parse_required_confirmations(text) forbidden_actions = _parse_forbidden_actions(text) required_params = _parse_required_params(text) return SkillPolicy( name=name, source_path=str(skill_path), description=description, allowed_actions=parsed_actions or ALLOWED_ACTIONS, required_params=required_params or REQUIRED_PARAMS, optional_params=DEFAULT_PARAMS.copy(), required_confirmations=required_confirmations or ("params", "target_scope", "rollback"), action_sequence=global_sequence, ip_action_sequence=ip_action_sequence, forbidden_actions=forbidden_actions or ("script-main-flow", "auto-rollback", "modify-deploy-scripts"), ) def _parse_allowed_actions(text: str) -> tuple[str, ...]: """从 skill 文档的 action 表中提取允许的 action。""" section = _section_body(text, "### 6.3 可用 action") if not section: return () actions: list[str] = [] for raw_line in section.splitlines(): line = raw_line.strip() if not line.startswith("|"): continue parts = [item.strip() for item in line.strip("|").split("|")] if not parts or parts[0] in ("action", "---"): continue action = parts[0].strip("` ") if action in ALLOWED_ACTIONS and action not in actions: actions.append(action) return tuple(actions) def _parse_action_sequence(text: str) -> tuple[str, ...]: """从主流程章节提取推荐执行顺序。""" section = _section_body(text, "### 4.1 正式部署主流程") if not section: return () found: list[str] = [] for action in [*GLOBAL_ACTION_SEQUENCE, *IP_ACTION_SEQUENCE]: if re.search(rf"\b{re.escape(action)}\b", section) and action not in found: found.append(action) return tuple(found) def _parse_required_confirmations(text: str) -> tuple[str, ...]: """从强制确认点章节提取确认类型。""" section = _section_body(text, "### 4.2 主流程中的强制确认点") if not section: return () confirmations: list[str] = [] keyword_map = { "参数确认": "params", "目标 ip": "target_scope", "部署范围": "target_scope", "回滚": "rollback", "间隔策略": "interval_policy", } lowered = section.lower() for keyword, name in keyword_map.items(): if keyword in lowered or keyword in section: confirmations.append(name) return tuple(dict.fromkeys(confirmations)) def _parse_forbidden_actions(text: str) -> tuple[str, ...]: """从禁止事项章节提取禁止项。""" section = _section_body(text, "### 7.5 明确禁止的做法") if not section: return () forbidden: list[str] = [] if "脚本主流程" in section: forbidden.append("script-main-flow") if "自动执行回滚" in section or "自动回滚" in section: forbidden.append("auto-rollback") if "修改脚本" in section or "自动生成" in section: forbidden.append("modify-deploy-scripts") return tuple(dict.fromkeys(forbidden)) def _parse_required_params(text: str) -> tuple[str, ...]: """从参数表提取必填脚本字段。""" section = _section_body(text, "### 3.1 必填业务参数") if not section: return () params: list[str] = [] for raw_line in section.splitlines(): line = raw_line.strip() if not line.startswith("|"): continue parts = [item.strip() for item in line.strip("|").split("|")] if len(parts) < 3 or parts[0] in ("规范字段", "---"): continue script_field = parts[1].strip("` ") required_flag = parts[2] if script_field in REQUIRED_PARAMS and required_flag == "是": params.append(script_field) return tuple(params) def _section_body(text: str, heading: str) -> str: """提取指定 markdown heading 到下一同级 heading 之间的正文。""" marker = f"{heading}\n" if marker not in text: return "" _, tail = text.split(marker, 1) matches = list(re.finditer(r"^###\s+", tail, flags=re.MULTILINE)) if matches: return tail[: matches[0].start()] return tail