"""通过子进程执行 deploy.sh / deploy.ps1 action。""" from __future__ import annotations import logging import subprocess import time from pathlib import Path from typing import Any from .logging_utils import json_for_log from .models import ActionResult from .output_parser import parse_script_result logger = logging.getLogger(__name__) class ScriptActionRunner: """脚本 action runner,负责构造命令、执行脚本并解析结果。""" def __init__(self, script_base_dir: str | Path = "doc_scripts") -> None: """保存脚本所在目录。""" self.script_base_dir = Path(script_base_dir) def run( self, action: str, *, params: dict[str, Any], script_entry: str, config_path: str, ip: str | None = None, hash_code: str | None = None, stop_first: bool = False, trace_file_path: str | None = None, timeout_sec: int | None = None, ) -> ActionResult: """执行一个脚本 action,并返回统一 ActionResult。""" command = self.build_command( action, script_entry=script_entry, config_path=config_path, ip=ip, hash_code=hash_code, stop_first=stop_first, trace_file_path=trace_file_path, ) started_at = time.perf_counter() logger.info( "脚本 action 开始 action=%s command=%s cwd=%s config=%s ip=%s trace=%s timeout=%s", action, json_for_log(command), self.script_base_dir, config_path, ip or "", trace_file_path or "", timeout_sec, ) try: completed = subprocess.run( command, cwd=str(self.script_base_dir), capture_output=True, text=True, timeout=timeout_sec, check=False, ) except Exception: logger.exception("脚本 action 执行异常 action=%s command=%s cwd=%s", action, json_for_log(command), self.script_base_dir) raise duration_ms = int((time.perf_counter() - started_at) * 1000) logger.info( "脚本 action 结束 action=%s exit_code=%s duration_ms=%s stdout=%s stderr=%s", action, completed.returncode, duration_ms, json_for_log(completed.stdout, max_text_len=1200), json_for_log(completed.stderr, max_text_len=1200), ) result = parse_script_result( action=action, stdout=completed.stdout, stderr=completed.stderr, exit_code=completed.returncode, backend="script", tool_name=script_entry, ) logger.info( "脚本 action 解析完成 action=%s ok=%s values=%s error=%s", action, result.ok, json_for_log(result.values), result.error_summary, ) return result def build_command( self, action: str, *, script_entry: str, config_path: str, ip: str | None = None, hash_code: str | None = None, stop_first: bool = False, trace_file_path: str | None = None, ) -> list[str]: """根据脚本类型构造 action 命令行参数。""" if script_entry == "deploy.sh": command = [ "bash", "./deploy.sh", "--config", config_path, "--action", action, ] if ip: command.extend(["--ip", ip]) if hash_code: command.extend(["--hash-code", hash_code]) if stop_first: command.append("--stop-first") if trace_file_path: command.extend(["--trace-file", trace_file_path]) return command if script_entry == "deploy.ps1": command = [ "powershell", "-File", ".\\deploy.ps1", "-ConfigPath", config_path, "-Action", action, ] if ip: command.extend(["-Ip", ip]) if hash_code: command.extend(["-HashCode", hash_code]) if stop_first: command.append("-RollbackStopFirst") return command raise ValueError(f"不支持的脚本入口: {script_entry}") def select_script_entry(os_name: str | None = None) -> str: """根据操作系统选择默认脚本入口。""" import platform name = (os_name or platform.system()).lower() if "windows" in name: return "deploy.ps1" return "deploy.sh"