from __future__ import annotations from datetime import datetime from pathlib import Path import re from typing import Any class GrepLogExecutor: def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: path = Path(str(params["path"])) keyword = str(params["keyword"]) limit = int(params.get("limit", 100)) encoding = str(params.get("encoding", "utf-8")) case_sensitive = bool(params.get("case_sensitive", False)) start_at = self._parse_time(params.get("start_at")) end_at = self._parse_time(params.get("end_at")) timestamp_regex = params.get( "timestamp_regex", r"^\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d{3,6})?)", ) if not path.exists(): return False, f"log file not found: {path}", {}, {} keyword_cmp = keyword if case_sensitive else keyword.lower() matches: list[dict[str, Any]] = [] with path.open("r", encoding=encoding, errors="ignore") as handle: for line_number, line in enumerate(handle, start=1): text = line.rstrip("\n") line_timestamp = self._extract_line_time(text, timestamp_regex) if not self._match_time_range(line_timestamp, start_at, end_at): continue text_cmp = text if case_sensitive else text.lower() if keyword_cmp in text_cmp: matches.append( { "line_number": line_number, "content": text, "timestamp": None if line_timestamp is None else line_timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], } ) if len(matches) >= limit: break success = len(matches) > 0 message = "keyword matched" if success else "keyword not found" return ( success, message, { "path": str(path), "keyword": keyword, "matched_count": len(matches), "start_at": params.get("start_at"), "end_at": params.get("end_at"), }, { "matches": matches, }, ) def _extract_line_time(self, text: str, timestamp_regex: str) -> datetime | None: matched = re.search(timestamp_regex, text) if not matched: return None return self._parse_time(matched.group(1)) def _parse_time(self, value: str | None) -> datetime | None: if not value: return None for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): try: return datetime.strptime(str(value), fmt) except ValueError: continue return None def _match_time_range(self, line_timestamp: datetime | None, start_at: datetime | None, end_at: datetime | None) -> bool: if start_at is None and end_at is None: return True if line_timestamp is None: return False if start_at is not None and line_timestamp < start_at: return False if end_at is not None and line_timestamp > end_at: return False return True