2521690 ce299cbb18 feat: 增加 Agent 演示入口与 app_metadata 驱动验证链路
- 新增 app_metadata 模型、仓储与服务
- 将默认 edge 验证步骤改为由 app_metadata 驱动生成
- 新增 chat_session / chat_message 会话层模型与 chat service
- 新增 demo chat API,支持会话创建、消息发送、任务确认
- 新增最小 Web Demo 页面,形成聊天式演示入口
- 增强任务报告,补充 audit_summary 与更细粒度 task_metrics
- 增强 edge-agent 执行器:tcp_probe、日志时间范围过滤、进程指标与更灵活健康检查
- 更新 README 与当前进度总结,MVP 进度推进到约 94%
2026-04-09 14:10:13 +08:00

90 lines
3.3 KiB
Python

from __future__ import annotations
from datetime import datetime
from pathlib import Path
import re
from typing import Any
class GrepLogExecutor:
def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]:
path = Path(str(params["path"]))
keyword = str(params["keyword"])
limit = int(params.get("limit", 100))
encoding = str(params.get("encoding", "utf-8"))
case_sensitive = bool(params.get("case_sensitive", False))
start_at = self._parse_time(params.get("start_at"))
end_at = self._parse_time(params.get("end_at"))
timestamp_regex = params.get(
"timestamp_regex",
r"^\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d{3,6})?)",
)
if not path.exists():
return False, f"log file not found: {path}", {}, {}
keyword_cmp = keyword if case_sensitive else keyword.lower()
matches: list[dict[str, Any]] = []
with path.open("r", encoding=encoding, errors="ignore") as handle:
for line_number, line in enumerate(handle, start=1):
text = line.rstrip("\n")
line_timestamp = self._extract_line_time(text, timestamp_regex)
if not self._match_time_range(line_timestamp, start_at, end_at):
continue
text_cmp = text if case_sensitive else text.lower()
if keyword_cmp in text_cmp:
matches.append(
{
"line_number": line_number,
"content": text,
"timestamp": None if line_timestamp is None else line_timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
}
)
if len(matches) >= limit:
break
success = len(matches) > 0
message = "keyword matched" if success else "keyword not found"
return (
success,
message,
{
"path": str(path),
"keyword": keyword,
"matched_count": len(matches),
"start_at": params.get("start_at"),
"end_at": params.get("end_at"),
},
{
"matches": matches,
},
)
def _extract_line_time(self, text: str, timestamp_regex: str) -> datetime | None:
matched = re.search(timestamp_regex, text)
if not matched:
return None
return self._parse_time(matched.group(1))
def _parse_time(self, value: str | None) -> datetime | None:
if not value:
return None
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.strptime(str(value), fmt)
except ValueError:
continue
return None
def _match_time_range(self, line_timestamp: datetime | None, start_at: datetime | None, end_at: datetime | None) -> bool:
if start_at is None and end_at is None:
return True
if line_timestamp is None:
return False
if start_at is not None and line_timestamp < start_at:
return False
if end_at is not None and line_timestamp > end_at:
return False
return True