from __future__ import annotations import csv import platform import re import subprocess from io import StringIO from typing import Any class ProcessCheckExecutor: def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: process_name = params.get("process_name") pid = params.get("pid") command_contains = params.get("command_contains") if not process_name and pid is None and not command_contains: return False, "process_name, pid or command_contains is required", {}, {} processes = self._list_processes() matched = [ item for item in processes if self._match_process(item, process_name=process_name, pid=pid, command_contains=command_contains) ] success = len(matched) > 0 message = "process found" if success else "process not found" return ( success, message, { "matched_count": len(matched), "process_name": process_name, "pid": pid, "command_contains": command_contains, "cpu_percent_total": round(sum(float(item.get("cpu_percent") or 0.0) for item in matched), 2), "memory_rss_kb_total": sum(int(item.get("memory_rss_kb") or 0) for item in matched), }, { "matches": matched, }, ) def _list_processes(self) -> list[dict[str, Any]]: system_name = platform.system().upper() if system_name.startswith("WIN"): return self._list_windows_processes() return self._list_unix_processes() def _list_windows_processes(self) -> list[dict[str, Any]]: result = subprocess.run( ["tasklist", "/FO", "CSV", "/NH"], capture_output=True, text=True, check=True, ) reader = csv.reader(StringIO(result.stdout)) rows: list[dict[str, Any]] = [] for row in reader: if len(row) < 2: continue memory_rss_kb = self._parse_windows_memory_kb(row[4]) if len(row) > 4 else None rows.append( { "pid": int(row[1]), "process_name": row[0], "command": row[0], "cpu_percent": None, "memory_rss_kb": memory_rss_kb, } ) return rows def _list_unix_processes(self) -> list[dict[str, Any]]: result = subprocess.run( ["ps", "-eo", "pid=,comm=,pcpu=,pmem=,rss=,args="], capture_output=True, text=True, check=True, ) rows: list[dict[str, Any]] = [] for line in result.stdout.splitlines(): parts = line.strip().split(None, 5) if len(parts) < 5: continue pid_text = parts[0] process_name = parts[1] cpu_percent = float(parts[2]) memory_percent = float(parts[3]) memory_rss_kb = int(parts[4]) command = parts[5] if len(parts) > 5 else process_name rows.append( { "pid": int(pid_text), "process_name": process_name, "command": command, "cpu_percent": cpu_percent, "memory_percent": memory_percent, "memory_rss_kb": memory_rss_kb, } ) return rows def _match_process( self, item: dict[str, Any], process_name: str | None, pid: int | str | None, command_contains: str | None, ) -> bool: if pid is not None and item["pid"] != int(pid): return False if process_name: name = str(process_name).lower() if name not in item["process_name"].lower() and name not in item["command"].lower(): return False if command_contains: keyword = str(command_contains).lower() if keyword not in item["command"].lower(): return False return True def _parse_windows_memory_kb(self, value: str) -> int | None: digits = re.sub(r"[^\d]", "", value or "") if not digits: return None return int(digits)