from __future__ import annotations import json from uuid import uuid4 from sqlalchemy.orm import Session from app.core.constants import ( EDGE_NODE_STATUS_ONLINE, EDGE_STEP_STATUS_CANCELLED, EDGE_STEP_STATUS_FAILED, EDGE_STEP_STATUS_PENDING, EDGE_STEP_STATUS_RUNNING, EDGE_STEP_STATUS_SUCCEEDED, ERROR_CODE_CONFLICT, ERROR_CODE_NOT_FOUND, TASK_STATUS_FAILED, TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_VERIFYING, ) from app.core.time import compute_duration_ms, format_now from app.models.edge_node import EdgeNode from app.models.edge_task import EdgeTask from app.models.audit_log import AuditLog from app.models.tool_call import ToolCall from app.repositories.audit_repository import AuditRepository from app.repositories.edge_repository import EdgeNodeRepository, EdgeTaskRepository from app.repositories.task_repository import TaskRepository from app.repositories.tool_call_repository import ToolCallRepository class EdgeTaskConflictError(Exception): code = ERROR_CODE_CONFLICT class EdgeTaskNotFoundError(Exception): code = ERROR_CODE_NOT_FOUND class EdgeService: def __init__(self, db: Session, timezone_name: str) -> None: self.db = db self.timezone_name = timezone_name self.node_repository = EdgeNodeRepository(db) self.edge_task_repository = EdgeTaskRepository(db) self.task_repository = TaskRepository(db) self.audit_repository = AuditRepository(db) self.tool_call_repository = ToolCallRepository(db) def heartbeat(self, edge_id: str, hostname: str, os_type: str, agent_version: str, capabilities: list[str]) -> EdgeNode: current_time = format_now(self.timezone_name) node = self.node_repository.get_by_edge_id(edge_id) if node: node.hostname = hostname node.os_type = os_type node.agent_version = agent_version node.capabilities_json = json.dumps(capabilities, ensure_ascii=False) node.node_status = EDGE_NODE_STATUS_ONLINE node.last_heartbeat_at = current_time node.updated_at = current_time return self.node_repository.add_or_update(node) node = EdgeNode( edge_id=edge_id, hostname=hostname, os_type=os_type, agent_version=agent_version, capabilities_json=json.dumps(capabilities, ensure_ascii=False), node_status=EDGE_NODE_STATUS_ONLINE, last_heartbeat_at=current_time, created_at=current_time, updated_at=current_time, ) return self.node_repository.add_or_update(node) def schedule_default_verification(self, task_id: str, edge_id: str = "edge-shanghai-001") -> EdgeTask: task = self.task_repository.get_by_task_id(task_id) if not task: raise EdgeTaskNotFoundError() if task.task_status != TASK_STATUS_RUNNING: raise EdgeTaskConflictError("当前任务状态不允许创建 edge 验证步骤。") if self.edge_task_repository.list_active_by_task_id(task_id): raise EdgeTaskConflictError("当前任务已存在待处理的 edge 验证步骤。") current_time = format_now(self.timezone_name) step_id = f"step-{uuid4().hex[:12]}" edge_task = EdgeTask( edge_task_id=f"edge-task-{uuid4().hex[:12]}", step_id=step_id, task_id=task_id, edge_id=edge_id, tool_name="http_health_check", params_json=json.dumps( { "url": f"http://{task.app_code or 'localhost'}.{task.env or 'env'}.demo/actuator/health", "timeout_ms": 3000, }, ensure_ascii=False, ), step_status=EDGE_STEP_STATUS_PENDING, success=None, message=None, result_data_json="{}", evidence_json="{}", duration_ms=None, expire_at=current_time, started_at=None, finished_at=None, created_at=current_time, updated_at=current_time, ) created_edge_task = self.edge_task_repository.add(edge_task) self._write_audit_log( task_id=task_id, request_id=None, action="EDGE_TASK_SCHEDULED", result="PENDING", target=edge_id, operator_user_id=None, operator_user_name=None, detail={"step_id": created_edge_task.step_id, "tool_name": created_edge_task.tool_name}, ) return created_edge_task def pull_tasks(self, edge_id: str, max_tasks: int) -> list[EdgeTask]: items = self.edge_task_repository.list_pending_by_edge_id(edge_id)[:max_tasks] current_time = format_now(self.timezone_name) pulled_items: list[EdgeTask] = [] for item in items: task = self.task_repository.get_by_task_id(item.task_id) if not task or task.task_status != TASK_STATUS_RUNNING: item.step_status = EDGE_STEP_STATUS_CANCELLED item.updated_at = current_time item.message = "task state no longer allows edge execution" self.edge_task_repository.update(item) continue item.step_status = EDGE_STEP_STATUS_RUNNING item.started_at = current_time item.updated_at = current_time self.edge_task_repository.update(item) task.task_status = TASK_STATUS_VERIFYING task.updated_at = current_time task.summary = "任务已进入边缘验证阶段。" self.task_repository.update(task) pulled_items.append(item) return pulled_items def report_task(self, edge_id: str, step_id: str, success: bool, message: str, data: dict, evidence: dict, started_at: str, finished_at: str) -> tuple[EdgeTask, str]: edge_task = self.edge_task_repository.get_by_step_id(step_id) if not edge_task: raise EdgeTaskNotFoundError() if edge_task.edge_id != edge_id: raise EdgeTaskConflictError("edge_id 与任务归属不一致。") if edge_task.step_status not in {EDGE_STEP_STATUS_RUNNING, EDGE_STEP_STATUS_PENDING}: raise EdgeTaskConflictError("当前步骤状态不允许重复回传。") task = self.task_repository.get_by_task_id(edge_task.task_id) if not task: raise EdgeTaskConflictError("edge 步骤关联任务不存在。") if task.task_status not in {TASK_STATUS_RUNNING, TASK_STATUS_VERIFYING}: raise EdgeTaskConflictError("当前任务状态不允许回传 edge 结果。") edge_task.step_status = EDGE_STEP_STATUS_SUCCEEDED if success else EDGE_STEP_STATUS_FAILED edge_task.success = 1 if success else 0 edge_task.message = message edge_task.result_data_json = json.dumps(data, ensure_ascii=False) edge_task.evidence_json = json.dumps(evidence, ensure_ascii=False) edge_task.started_at = started_at edge_task.finished_at = finished_at edge_task.duration_ms = compute_duration_ms(started_at, finished_at) edge_task.updated_at = format_now(self.timezone_name) updated_edge_task = self.edge_task_repository.update(edge_task) self._write_tool_call( task_id=updated_edge_task.task_id, request_id=None, operator_user_id=edge_id, operator_user_name=edge_id, step_id=updated_edge_task.step_id, tool_name=updated_edge_task.tool_name, request_payload=json.loads(updated_edge_task.params_json), response_payload={"data": data, "evidence": evidence, "message": message}, success=success, started_at=started_at, finished_at=finished_at, ) task_status = TASK_STATUS_RUNNING task.task_status = TASK_STATUS_SUCCEEDED if success else TASK_STATUS_FAILED task.updated_at = format_now(self.timezone_name) task.summary = "边缘验证通过,任务完成。" if success else "边缘验证失败,任务失败。" self.task_repository.update(task) task_status = task.task_status self._write_audit_log( task_id=task.task_id, request_id=None, action="EDGE_TASK_REPORTED", result=task.task_status, target=edge_id, operator_user_id=edge_id, operator_user_name=edge_id, detail={"step_id": step_id, "tool_name": edge_task.tool_name, "message": message}, ) return updated_edge_task, task_status def record_event(self, edge_id: str, event_type: str, message: str, detail: dict) -> AuditLog: current_time = format_now(self.timezone_name) audit = AuditLog( audit_id=f"audit-{uuid4().hex[:12]}", task_id=f"edge-event:{edge_id}", action=f"EDGE_EVENT:{event_type}", operator_user_id=edge_id, operator_user_name=edge_id, target=edge_id, result="REPORTED", detail_json=json.dumps({"message": message, "detail": detail}, ensure_ascii=False), timestamp=current_time, ) self.db.add(audit) self.db.commit() self.db.refresh(audit) return audit def _write_tool_call( self, task_id: str, request_id: str | None, operator_user_id: str | None, operator_user_name: str | None, step_id: str | None, tool_name: str, request_payload: dict, response_payload: dict, success: bool, started_at: str | None, finished_at: str | None, ) -> ToolCall: tool_call = ToolCall( tool_call_id=f"tool-call-{uuid4().hex[:12]}", task_id=task_id, request_id=request_id, operator_user_id=operator_user_id, operator_user_name=operator_user_name, step_id=step_id, tool_name=tool_name, request_payload_json=json.dumps(request_payload, ensure_ascii=False), response_payload_json=json.dumps(response_payload, ensure_ascii=False), success=1 if success else 0, duration_ms=compute_duration_ms(started_at, finished_at), started_at=started_at, finished_at=finished_at, ) return self.tool_call_repository.add(tool_call) def _write_audit_log( self, task_id: str, request_id: str | None, action: str, result: str, target: str | None, operator_user_id: str | None, operator_user_name: str | None, detail: dict, ) -> AuditLog: audit_log = AuditLog( audit_id=f"audit-{uuid4().hex[:12]}", task_id=task_id, request_id=request_id, action=action, operator_user_id=operator_user_id, operator_user_name=operator_user_name, target=target, result=result, detail_json=json.dumps(detail, ensure_ascii=False), timestamp=format_now(self.timezone_name), ) return self.audit_repository.add(audit_log)