from __future__ import annotations from typing import Any from uuid import uuid4 import httpx from app.core.config import Settings from app.core.security import build_auth_headers class BackendClient: def __init__(self, settings: Settings) -> None: self.settings = settings self.client = httpx.Client(timeout=settings.request_timeout_ms / 1000.0) def _headers(self) -> dict[str, str]: headers = build_auth_headers(self.settings) headers["X-Request-Id"] = f"edge-req-{uuid4().hex[:12]}" headers["X-Edge-Id"] = self.settings.edge_id return headers def heartbeat(self, capabilities: list[str]) -> dict[str, Any]: response = self.client.post( f"{self.settings.backend_base_url}/api/agent/edge/heartbeat", headers=self._headers(), json={ "edge_id": self.settings.edge_id, "hostname": self.settings.edge_hostname, "os_type": self.settings.edge_os_type, "agent_version": self.settings.edge_agent_version, "capabilities": capabilities, }, ) response.raise_for_status() return response.json() def pull_tasks(self, max_tasks: int = 5) -> list[dict[str, Any]]: response = self.client.post( f"{self.settings.backend_base_url}/api/agent/edge/tasks/pull", headers=self._headers(), json={ "edge_id": self.settings.edge_id, "max_tasks": max_tasks, }, ) response.raise_for_status() return response.json()["data"]["tasks"] def report_task(self, payload: dict[str, Any]) -> dict[str, Any]: response = self.client.post( f"{self.settings.backend_base_url}/api/agent/edge/tasks/report", headers=self._headers(), json=payload, ) response.raise_for_status() return response.json() def report_event(self, event_type: str, message: str, detail: dict[str, Any] | None = None) -> dict[str, Any]: response = self.client.post( f"{self.settings.backend_base_url}/api/agent/edge/events", headers=self._headers(), json={ "edge_id": self.settings.edge_id, "event_type": event_type, "message": message, "detail": detail or {}, }, ) response.raise_for_status() return response.json() def close(self) -> None: self.client.close()