from __future__ import annotations import subprocess from typing import Any class LinuxServiceExecutor: def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: service_name = str(params["service_name"]) action = str(params.get("action", "status")).lower() scope = str(params.get("scope", "system")).lower() if action == "status": return self._query_status(service_name, action, scope) if action in {"start", "stop", "restart"}: self._run_systemctl([action, service_name], scope=scope, check=False) return self._query_status(service_name, action, scope) return False, f"unsupported action: {action}", self._build_data(service_name, action, scope, None), {} def _query_status(self, service_name: str, action: str, scope: str) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: result = self._run_systemctl(["is-active", service_name], scope=scope, check=False) service_status = result.stdout.strip() or result.stderr.strip() or None success = result.returncode == 0 message = "service status queried" if success else (service_status or "service query failed") return success, message, self._build_data(service_name, action, scope, service_status), {"raw_output": (result.stdout + "\n" + result.stderr).strip()} def _run_systemctl(self, command: list[str], scope: str, check: bool) -> subprocess.CompletedProcess[str]: full_command = ["systemctl"] if scope == "user": full_command.append("--user") full_command.extend(command) return subprocess.run(full_command, capture_output=True, text=True, check=check) def _build_data(self, service_name: str, action: str, scope: str, service_status: str | None) -> dict[str, Any]: return { "service_name": service_name, "action": action, "scope": scope, "service_status": service_status, }