163 lines
5.6 KiB
Python
163 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Annotated
|
|
from uuid import uuid4
|
|
|
|
from fastapi import APIRouter, Depends, Header, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.constants import ERROR_CODE_OK
|
|
from app.core.time import format_now
|
|
from app.db.session import get_db
|
|
from app.schemas.common import ApiResponse
|
|
from app.schemas.edge import (
|
|
EdgeEventData,
|
|
EdgeEventRequest,
|
|
EdgeHeartbeatData,
|
|
EdgeHeartbeatRequest,
|
|
EdgePullTasksData,
|
|
EdgePullTasksRequest,
|
|
EdgeTaskItem,
|
|
EdgeTaskReportData,
|
|
EdgeTaskReportRequest,
|
|
)
|
|
from app.services.edge_service import EdgeService, EdgeTaskConflictError, EdgeTaskNotFoundError
|
|
|
|
router = APIRouter(prefix="/api/agent/edge", tags=["agent-edge"])
|
|
|
|
|
|
def build_request_id(header_value: str | None) -> str:
|
|
return header_value or f"req-{uuid4().hex[:12]}"
|
|
|
|
|
|
@router.post("/heartbeat", response_model=ApiResponse[EdgeHeartbeatData])
|
|
def heartbeat(
|
|
payload: EdgeHeartbeatRequest,
|
|
db: Annotated[Session, Depends(get_db)],
|
|
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
|
|
) -> ApiResponse[EdgeHeartbeatData]:
|
|
settings = get_settings()
|
|
request_id = build_request_id(x_request_id)
|
|
node = EdgeService(db, settings.default_timezone).heartbeat(
|
|
edge_id=payload.edge_id,
|
|
hostname=payload.hostname,
|
|
os_type=payload.os_type,
|
|
agent_version=payload.agent_version,
|
|
capabilities=payload.capabilities,
|
|
)
|
|
return ApiResponse[EdgeHeartbeatData](
|
|
request_id=request_id,
|
|
success=True,
|
|
code=ERROR_CODE_OK,
|
|
message="success",
|
|
data=EdgeHeartbeatData(
|
|
edge_id=node.edge_id,
|
|
node_status=node.node_status,
|
|
last_heartbeat_at=node.last_heartbeat_at,
|
|
),
|
|
timestamp=format_now(settings.default_timezone),
|
|
)
|
|
|
|
|
|
@router.post("/tasks/pull", response_model=ApiResponse[EdgePullTasksData])
|
|
def pull_tasks(
|
|
payload: EdgePullTasksRequest,
|
|
db: Annotated[Session, Depends(get_db)],
|
|
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
|
|
) -> ApiResponse[EdgePullTasksData]:
|
|
settings = get_settings()
|
|
request_id = build_request_id(x_request_id)
|
|
tasks = EdgeService(db, settings.default_timezone).pull_tasks(payload.edge_id, payload.max_tasks)
|
|
return ApiResponse[EdgePullTasksData](
|
|
request_id=request_id,
|
|
success=True,
|
|
code=ERROR_CODE_OK,
|
|
message="success",
|
|
data=EdgePullTasksData(
|
|
tasks=[
|
|
EdgeTaskItem(
|
|
task_id=item.task_id,
|
|
step_id=item.step_id,
|
|
tool_name=item.tool_name,
|
|
params=json.loads(item.params_json),
|
|
expire_at=item.expire_at,
|
|
)
|
|
for item in tasks
|
|
]
|
|
),
|
|
timestamp=format_now(settings.default_timezone),
|
|
)
|
|
|
|
|
|
@router.post("/tasks/report", response_model=ApiResponse[EdgeTaskReportData])
|
|
def report_task(
|
|
payload: EdgeTaskReportRequest,
|
|
db: Annotated[Session, Depends(get_db)],
|
|
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
|
|
) -> ApiResponse[EdgeTaskReportData]:
|
|
settings = get_settings()
|
|
request_id = build_request_id(x_request_id)
|
|
try:
|
|
edge_task, task_status = EdgeService(db, settings.default_timezone).report_task(
|
|
edge_id=payload.edge_id,
|
|
step_id=payload.step_id,
|
|
success=payload.success,
|
|
message=payload.message,
|
|
data=payload.data,
|
|
evidence=payload.evidence,
|
|
started_at=payload.started_at,
|
|
finished_at=payload.finished_at,
|
|
)
|
|
except EdgeTaskNotFoundError as exc:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": exc.code, "message": "edge step not found"}) from exc
|
|
except EdgeTaskConflictError as exc:
|
|
message = exc.args[0] if exc.args else "edge task conflict"
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail={"code": exc.code, "message": message}) from exc
|
|
|
|
if payload.task_id != edge_task.task_id or payload.tool_name != edge_task.tool_name:
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail={"code": "CONFLICT", "message": "task_id or tool_name mismatch"})
|
|
|
|
return ApiResponse[EdgeTaskReportData](
|
|
request_id=request_id,
|
|
success=True,
|
|
code=ERROR_CODE_OK,
|
|
message="success",
|
|
data=EdgeTaskReportData(
|
|
task_id=edge_task.task_id,
|
|
step_id=edge_task.step_id,
|
|
step_status=edge_task.step_status,
|
|
task_status=task_status,
|
|
),
|
|
timestamp=format_now(settings.default_timezone),
|
|
)
|
|
|
|
|
|
@router.post("/events", response_model=ApiResponse[EdgeEventData])
|
|
def report_event(
|
|
payload: EdgeEventRequest,
|
|
db: Annotated[Session, Depends(get_db)],
|
|
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
|
|
) -> ApiResponse[EdgeEventData]:
|
|
settings = get_settings()
|
|
request_id = build_request_id(x_request_id)
|
|
EdgeService(db, settings.default_timezone).record_event(
|
|
edge_id=payload.edge_id,
|
|
event_type=payload.event_type,
|
|
message=payload.message,
|
|
detail=payload.detail,
|
|
)
|
|
return ApiResponse[EdgeEventData](
|
|
request_id=request_id,
|
|
success=True,
|
|
code=ERROR_CODE_OK,
|
|
message="success",
|
|
data=EdgeEventData(
|
|
edge_id=payload.edge_id,
|
|
event_type=payload.event_type,
|
|
accepted=True,
|
|
),
|
|
timestamp=format_now(settings.default_timezone),
|
|
)
|