from __future__ import annotations import csv import platform import subprocess from io import StringIO from typing import Any class ProcessCheckExecutor: def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: process_name = params.get("process_name") pid = params.get("pid") if not process_name and pid is None: return False, "process_name or pid is required", {}, {} processes = self._list_processes() matched = [item for item in processes if self._match_process(item, process_name=process_name, pid=pid)] success = len(matched) > 0 message = "process found" if success else "process not found" return ( success, message, { "matched_count": len(matched), "process_name": process_name, "pid": pid, }, { "matches": matched, }, ) def _list_processes(self) -> list[dict[str, Any]]: system_name = platform.system().upper() if system_name.startswith("WIN"): return self._list_windows_processes() return self._list_unix_processes() def _list_windows_processes(self) -> list[dict[str, Any]]: result = subprocess.run( ["tasklist", "/FO", "CSV", "/NH"], capture_output=True, text=True, check=True, ) reader = csv.reader(StringIO(result.stdout)) rows: list[dict[str, Any]] = [] for row in reader: if len(row) < 2: continue rows.append( { "pid": int(row[1]), "process_name": row[0], "command": row[0], } ) return rows def _list_unix_processes(self) -> list[dict[str, Any]]: result = subprocess.run( ["ps", "-eo", "pid=,comm=,args="], capture_output=True, text=True, check=True, ) rows: list[dict[str, Any]] = [] for line in result.stdout.splitlines(): parts = line.strip().split(None, 2) if len(parts) < 2: continue pid_text = parts[0] process_name = parts[1] command = parts[2] if len(parts) > 2 else process_name rows.append( { "pid": int(pid_text), "process_name": process_name, "command": command, } ) return rows def _match_process(self, item: dict[str, Any], process_name: str | None, pid: int | str | None) -> bool: if pid is not None and item["pid"] != int(pid): return False if process_name: name = str(process_name).lower() if name not in item["process_name"].lower() and name not in item["command"].lower(): return False return True