auto_agent/edge-agent/app/scheduler/polling_runner.py
2521690 2c7714268f feat: 补强 demo 后端任务指标与 edge-agent 执行骨架
- 新增 task_report 任务级聚合指标 task_metrics
- 补充创建任务幂等与失败路径/冲突测试
- 将后端测试基线提升到 20 passed
- 新增 edge-agent 初始化代码、启动脚本与打包脚本
- 新增 http_health_check、check_port、check_process、grep_log 执行器
- 补充 edge-agent 基础测试并提升基线到 10 passed
- 同步更新 backend README 与当前进度总结
2026-04-09 10:51:19 +08:00

115 lines
4.1 KiB
Python

from __future__ import annotations
import logging
import time
import httpx
from app.client.backend_client import BackendClient
from app.core.config import Settings
from app.core.time import format_now
from app.registry.tool_registry import ToolRegistry
logger = logging.getLogger(__name__)
class PollingRunner:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.registry = ToolRegistry()
self.backend_client = BackendClient(settings)
self._last_heartbeat_at: float = 0.0
def run_once(self) -> None:
self._heartbeat_if_needed(force=True)
tasks = self.backend_client.pull_tasks()
for task in tasks:
self._execute_task(task)
def run_forever(self) -> None:
while True:
self._heartbeat_if_needed(force=False)
tasks = self.backend_client.pull_tasks()
for task in tasks:
self._execute_task(task)
time.sleep(self.settings.poll_interval_ms / 1000.0)
def close(self) -> None:
self.backend_client.close()
def _heartbeat_if_needed(self, force: bool) -> None:
current = time.time()
if not force and current - self._last_heartbeat_at < self.settings.heartbeat_interval_ms / 1000.0:
return
self.backend_client.heartbeat(self.registry.capabilities())
self._last_heartbeat_at = current
def _execute_task(self, task: dict) -> None:
executor = self.registry.get(task["tool_name"])
started_at = format_now()
if executor is None:
self.backend_client.report_event(
event_type="UNSUPPORTED_TOOL",
message=f"unsupported tool: {task['tool_name']}",
detail={"task_id": task["task_id"], "step_id": task["step_id"]},
)
self.backend_client.report_task(
{
"edge_id": self.settings.edge_id,
"task_id": task["task_id"],
"step_id": task["step_id"],
"tool_name": task["tool_name"],
"success": False,
"code": "UNSUPPORTED_TOOL",
"message": f"unsupported tool: {task['tool_name']}",
"data": {},
"evidence": {},
"started_at": started_at,
"finished_at": format_now(),
}
)
return
try:
success, message, data, evidence = executor.execute(task.get("params", {}))
code = "OK" if success else "EXECUTION_FAILED"
except httpx.HTTPError as exc:
success = False
code = "HTTP_ERROR"
message = str(exc)
data = {}
evidence = {}
self.backend_client.report_event(
event_type="HTTP_EXECUTOR_EXCEPTION",
message=str(exc),
detail={"task_id": task["task_id"], "step_id": task["step_id"], "tool_name": task["tool_name"]},
)
except Exception as exc: # pragma: no cover - defensive path
success = False
code = "EXECUTION_EXCEPTION"
message = str(exc)
data = {}
evidence = {}
self.backend_client.report_event(
event_type="AGENT_EXCEPTION",
message=str(exc),
detail={"task_id": task["task_id"], "step_id": task["step_id"], "tool_name": task["tool_name"]},
)
payload = {
"edge_id": self.settings.edge_id,
"task_id": task["task_id"],
"step_id": task["step_id"],
"tool_name": task["tool_name"],
"success": success,
"code": code,
"message": message,
"data": data,
"evidence": evidence,
"started_at": started_at,
"finished_at": format_now(),
}
logger.info("report edge step result task_id=%s step_id=%s success=%s", task["task_id"], task["step_id"], success)
self.backend_client.report_task(payload)