- 将 Windows/Linux service control 执行器从占位实现推进到可用 - 新增 service control 测试,覆盖 status/start/stop/restart 主路径 - 增强 edge-agent 启动脚本,优先使用包内私有 Python 运行时 - 增强 Windows/Linux 打包脚本,支持携带私有 Python 运行时 - 更新 edge-agent README 与当前进度总结 - 新增 dist 忽略规则,避免打包产物污染仓库
41 lines
2.0 KiB
Python
41 lines
2.0 KiB
Python
from __future__ import annotations
|
|
|
|
import subprocess
|
|
from typing import Any
|
|
|
|
|
|
class LinuxServiceExecutor:
|
|
def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]:
|
|
service_name = str(params["service_name"])
|
|
action = str(params.get("action", "status")).lower()
|
|
scope = str(params.get("scope", "system")).lower()
|
|
|
|
if action == "status":
|
|
return self._query_status(service_name, action, scope)
|
|
if action in {"start", "stop", "restart"}:
|
|
self._run_systemctl([action, service_name], scope=scope, check=False)
|
|
return self._query_status(service_name, action, scope)
|
|
return False, f"unsupported action: {action}", self._build_data(service_name, action, scope, None), {}
|
|
|
|
def _query_status(self, service_name: str, action: str, scope: str) -> tuple[bool, str, dict[str, Any], dict[str, Any]]:
|
|
result = self._run_systemctl(["is-active", service_name], scope=scope, check=False)
|
|
service_status = result.stdout.strip() or result.stderr.strip() or None
|
|
success = result.returncode == 0
|
|
message = "service status queried" if success else (service_status or "service query failed")
|
|
return success, message, self._build_data(service_name, action, scope, service_status), {"raw_output": (result.stdout + "\n" + result.stderr).strip()}
|
|
|
|
def _run_systemctl(self, command: list[str], scope: str, check: bool) -> subprocess.CompletedProcess[str]:
|
|
full_command = ["systemctl"]
|
|
if scope == "user":
|
|
full_command.append("--user")
|
|
full_command.extend(command)
|
|
return subprocess.run(full_command, capture_output=True, text=True, check=check)
|
|
|
|
def _build_data(self, service_name: str, action: str, scope: str, service_status: str | None) -> dict[str, Any]:
|
|
return {
|
|
"service_name": service_name,
|
|
"action": action,
|
|
"scope": scope,
|
|
"service_status": service_status,
|
|
}
|