auto_agent/edge-agent/app/client/backend_client.py
2521690 2c7714268f feat: 补强 demo 后端任务指标与 edge-agent 执行骨架
- 新增 task_report 任务级聚合指标 task_metrics
- 补充创建任务幂等与失败路径/冲突测试
- 将后端测试基线提升到 20 passed
- 新增 edge-agent 初始化代码、启动脚本与打包脚本
- 新增 http_health_check、check_port、check_process、grep_log 执行器
- 补充 edge-agent 基础测试并提升基线到 10 passed
- 同步更新 backend README 与当前进度总结
2026-04-09 10:51:19 +08:00

75 lines
2.5 KiB
Python

from __future__ import annotations
from typing import Any
from uuid import uuid4
import httpx
from app.core.config import Settings
from app.core.security import build_auth_headers
class BackendClient:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.client = httpx.Client(timeout=settings.request_timeout_ms / 1000.0)
def _headers(self) -> dict[str, str]:
headers = build_auth_headers(self.settings)
headers["X-Request-Id"] = f"edge-req-{uuid4().hex[:12]}"
headers["X-Edge-Id"] = self.settings.edge_id
return headers
def heartbeat(self, capabilities: list[str]) -> dict[str, Any]:
response = self.client.post(
f"{self.settings.backend_base_url}/api/agent/edge/heartbeat",
headers=self._headers(),
json={
"edge_id": self.settings.edge_id,
"hostname": self.settings.edge_hostname,
"os_type": self.settings.edge_os_type,
"agent_version": self.settings.edge_agent_version,
"capabilities": capabilities,
},
)
response.raise_for_status()
return response.json()
def pull_tasks(self, max_tasks: int = 5) -> list[dict[str, Any]]:
response = self.client.post(
f"{self.settings.backend_base_url}/api/agent/edge/tasks/pull",
headers=self._headers(),
json={
"edge_id": self.settings.edge_id,
"max_tasks": max_tasks,
},
)
response.raise_for_status()
return response.json()["data"]["tasks"]
def report_task(self, payload: dict[str, Any]) -> dict[str, Any]:
response = self.client.post(
f"{self.settings.backend_base_url}/api/agent/edge/tasks/report",
headers=self._headers(),
json=payload,
)
response.raise_for_status()
return response.json()
def report_event(self, event_type: str, message: str, detail: dict[str, Any] | None = None) -> dict[str, Any]:
response = self.client.post(
f"{self.settings.backend_base_url}/api/agent/edge/events",
headers=self._headers(),
json={
"edge_id": self.settings.edge_id,
"event_type": event_type,
"message": message,
"detail": detail or {},
},
)
response.raise_for_status()
return response.json()
def close(self) -> None:
self.client.close()