from __future__ import annotations import json from typing import Annotated from uuid import uuid4 from fastapi import APIRouter, Depends, Header, HTTPException, status from sqlalchemy.orm import Session from app.core.config import get_settings from app.core.constants import ERROR_CODE_OK from app.core.time import compute_duration_ms, format_now, parse_timestamp from app.db.session import get_db from app.repositories.approval_repository import ApprovalRepository from app.repositories.audit_repository import AuditRepository from app.repositories.edge_repository import EdgeTaskRepository from app.repositories.tool_call_repository import ToolCallRepository from app.adapters.software_a.minimal_adapter import MinimalSoftwareAAdapter from app.schemas.common import ApiResponse from app.schemas.task import ( AuditSummary, ApprovalSummary, ApprovalTraceItem, AuditTraceItem, CancelTaskRequest, ConfirmTaskData, ConfirmTaskRequest, CreateTaskData, CreateTaskRequest, ParsedIntent, ResultSummaryDetail, SoftwareAResultSummary, TaskBasic, TaskDetailData, TaskMetrics, TaskReportData, ToolTraceItem, ToolCallItem, VerificationResultSummary, VerificationTraceItem, ) from app.services.task_service import TaskConflictError, TaskNotFoundError, TaskService from app.services.task_service import TaskPermissionError router = APIRouter(prefix="/api/agent/tasks", tags=["agent-task"]) def build_request_id(header_value: str | None) -> str: return header_value or f"req-{uuid4().hex[:12]}" def build_result_summary_detail(task, approval, software_a_detail: dict | None, edge_tasks) -> ResultSummaryDetail: latest_edge_task = edge_tasks[0] if edge_tasks else None final_reason = task.summary if software_a_detail and software_a_detail.get("error_detail"): final_reason = software_a_detail["error_detail"] elif edge_tasks: failed_message = next((item.message for item in edge_tasks if item.step_status == "FAILED" and item.message), None) if failed_message: final_reason = failed_message elif latest_edge_task and latest_edge_task.message: final_reason = latest_edge_task.message elif approval and approval.approval_status == "REJECTED" and approval.reason: final_reason = approval.reason approval_summary = None if approval: approval_summary = ApprovalSummary( approval_id=approval.approval_id, approval_status=approval.approval_status, reason=approval.reason, ) software_a_summary = None if software_a_detail or task.software_a_task_id or task.software_a_task_status: software_a_summary = SoftwareAResultSummary( software_a_task_id=task.software_a_task_id, task_status=(software_a_detail or {}).get("task_status", task.software_a_task_status), progress_percent=(software_a_detail or {}).get("progress_percent"), error_detail=(software_a_detail or {}).get("error_detail"), started_at=(software_a_detail or {}).get("started_at"), finished_at=(software_a_detail or {}).get("finished_at"), ) verification_summary = None if latest_edge_task: verification_success_values = [bool(item.success) for item in edge_tasks if item.success is not None] verification_success = None if not verification_success_values else all(verification_success_values) if any(item.step_status == "FAILED" for item in edge_tasks): verification_status = "FAILED" elif all(item.step_status == "SUCCEEDED" for item in edge_tasks): verification_status = "SUCCEEDED" elif any(item.step_status == "RUNNING" for item in edge_tasks): verification_status = "RUNNING" else: verification_status = latest_edge_task.step_status verification_message = next((item.message for item in edge_tasks if item.step_status == "FAILED" and item.message), latest_edge_task.message) verification_summary = VerificationResultSummary( step_id=latest_edge_task.step_id if len(edge_tasks) == 1 else None, step_status=verification_status, success=verification_success, duration_ms=sum_duration_ms([item.duration_ms for item in edge_tasks]), message=verification_message, ) return ResultSummaryDetail( final_status=task.task_status, final_reason=final_reason, approval=approval_summary, software_a=software_a_summary, verification=verification_summary, ) def pick_latest_timestamp(*values: str | None) -> str | None: candidates = [value for value in values if value and parse_timestamp(value)] if not candidates: return None return max(candidates, key=lambda value: parse_timestamp(value)) def sum_duration_ms(values: list[int | None]) -> int: return sum(value for value in values if value is not None) def build_task_metrics(task, approval, software_a_detail: dict | None, tool_calls, edge_tasks, audit_logs) -> TaskMetrics: tool_call_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls]) verification_duration_ms_total = sum_duration_ms([item.duration_ms for item in edge_tasks]) software_a_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls if item.tool_name.startswith("software_a")]) verification_queue_wait_duration_ms_total = sum_duration_ms([compute_duration_ms(item.created_at, item.started_at) for item in edge_tasks]) verification_end_to_end_duration_ms_total = sum_duration_ms([compute_duration_ms(item.created_at, item.finished_at) for item in edge_tasks]) tool_call_count = len(tool_calls) tool_call_success_count = sum(1 for item in tool_calls if bool(item.success)) tool_call_failed_count = sum(1 for item in tool_calls if not bool(item.success)) verification_step_count = len(edge_tasks) verification_success_count = sum(1 for item in edge_tasks if item.success == 1) verification_failed_count = sum(1 for item in edge_tasks if item.success == 0) audit_failure_count = sum(1 for item in audit_logs if item.result in {"FAILED", "REJECTED"}) latest_observed_at = pick_latest_timestamp( task.updated_at, approval.updated_at if approval else None, *(item.finished_at for item in tool_calls), *(item.finished_at for item in edge_tasks), software_a_detail.get("finished_at") if software_a_detail else None, ) total_duration_ms = compute_duration_ms(task.created_at, latest_observed_at) confirm_wait_duration_ms = compute_duration_ms(task.created_at, task.confirmed_at) approval_duration_ms = None if approval: approval_finished_at = approval.updated_at if approval.approval_status == "PENDING": approval_finished_at = latest_observed_at approval_duration_ms = compute_duration_ms(approval.created_at, approval_finished_at) execution_started_at = task.confirmed_at if approval and approval.approval_status == "APPROVED": execution_started_at = approval.updated_at execution_duration_ms = None if execution_started_at: execution_finished_at = pick_latest_timestamp( *(item.finished_at for item in tool_calls), *(item.finished_at for item in edge_tasks), software_a_detail.get("finished_at") if software_a_detail else None, task.updated_at, ) execution_duration_ms = compute_duration_ms(execution_started_at, execution_finished_at) return TaskMetrics( total_duration_ms=total_duration_ms, confirm_wait_duration_ms=confirm_wait_duration_ms, approval_duration_ms=approval_duration_ms, execution_duration_ms=execution_duration_ms, software_a_duration_ms_total=software_a_duration_ms_total, tool_call_duration_ms_total=tool_call_duration_ms_total, verification_duration_ms_total=verification_duration_ms_total, verification_queue_wait_duration_ms_total=verification_queue_wait_duration_ms_total, verification_end_to_end_duration_ms_total=verification_end_to_end_duration_ms_total, tool_call_count=tool_call_count, tool_call_success_count=tool_call_success_count, tool_call_failed_count=tool_call_failed_count, verification_step_count=verification_step_count, verification_success_count=verification_success_count, verification_failed_count=verification_failed_count, audit_event_count=len(audit_logs), audit_failure_count=audit_failure_count, ) def build_audit_summary(audit_logs) -> AuditSummary: result_counts: dict[str, int] = {} action_types = sorted({item.action for item in audit_logs}) operator_user_names = sorted({item.operator_user_name for item in audit_logs if item.operator_user_name}) for item in audit_logs: result_counts[item.result] = result_counts.get(item.result, 0) + 1 return AuditSummary( audit_event_count=len(audit_logs), failure_count=sum(1 for item in audit_logs if item.result in {"FAILED", "REJECTED"}), pending_count=sum(1 for item in audit_logs if item.result == "PENDING"), cancelled_count=sum(1 for item in audit_logs if item.result == "CANCELLED"), reported_count=sum(1 for item in audit_logs if item.result == "REPORTED"), action_types=action_types, operator_user_names=operator_user_names, result_counts=result_counts, ) def build_verification_result(edge_tasks) -> dict | None: if not edge_tasks: return None def latest_success(tool_name: str) -> bool | None: for item in edge_tasks: if item.tool_name == tool_name and item.success is not None: return bool(item.success) return None grep_success = latest_success("grep_log") port_related = [latest_success(name) for name in ("check_port", "tcp_probe")] port_values = [value for value in port_related if value is not None] return { "http_ok": latest_success("http_health_check"), "process_ok": latest_success("check_process"), "port_ok": all(port_values) if port_values else None, "log_error_count": 0 if grep_success is True else (1 if grep_success is False else None), } @router.post("", response_model=ApiResponse[CreateTaskData]) def create_task( payload: CreateTaskRequest, db: Annotated[Session, Depends(get_db)], x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, ) -> ApiResponse[CreateTaskData]: settings = get_settings() request_id = build_request_id(x_request_id) service = TaskService(db, settings.default_timezone) try: task = service.create_task(payload, request_id) except TaskConflictError as exc: message = exc.args[0] if exc.args else "task state conflict" raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={"code": exc.code, "message": message}, ) from exc missing_slots = json.loads(task.missing_slots_json) next_action = "CONFIRM_TASK" if not missing_slots else "FILL_MISSING_SLOTS" return ApiResponse[CreateTaskData]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="success", data=CreateTaskData( task_id=task.task_id, parsed_intent=ParsedIntent(**json.loads(task.parsed_intent_json)), missing_slots=missing_slots, risk_level=task.risk_level, task_status=task.task_status, next_action=next_action, ), timestamp=format_now(settings.default_timezone), ) @router.post("/{task_id}/confirm", response_model=ApiResponse[ConfirmTaskData]) def confirm_task( task_id: str, payload: ConfirmTaskRequest, db: Annotated[Session, Depends(get_db)], x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, ) -> ApiResponse[ConfirmTaskData]: settings = get_settings() request_id = build_request_id(x_request_id) service = TaskService(db, settings.default_timezone) try: task, approval_id = service.confirm_task(task_id, payload, request_id=request_id) except TaskNotFoundError as exc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": exc.code, "message": "task not found"}, ) from exc except TaskConflictError as exc: message = exc.args[0] if exc.args else "task state conflict" raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={"code": exc.code, "message": message}, ) from exc except TaskPermissionError as exc: message = exc.args[0] if exc.args else "permission denied" raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail={"code": exc.code, "message": message}, ) from exc return ApiResponse[ConfirmTaskData]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="task confirmed", data=ConfirmTaskData( task_id=task.task_id, task_status=task.task_status, approval_status=task.approval_status, approval_id=approval_id, software_a_task_id=task.software_a_task_id, software_a_task_status=task.software_a_task_status, ), timestamp=format_now(settings.default_timezone), ) @router.post("/{task_id}/cancel", response_model=ApiResponse[ConfirmTaskData]) def cancel_task( task_id: str, payload: CancelTaskRequest, db: Annotated[Session, Depends(get_db)], x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, ) -> ApiResponse[ConfirmTaskData]: settings = get_settings() request_id = build_request_id(x_request_id) service = TaskService(db, settings.default_timezone) try: task = service.cancel_task(task_id, payload.reason, request_id=request_id) except TaskNotFoundError as exc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": exc.code, "message": "task not found"}, ) from exc except TaskConflictError as exc: message = exc.args[0] if exc.args else "task state conflict" raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={"code": exc.code, "message": message}, ) from exc return ApiResponse[ConfirmTaskData]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="task cancelled", data=ConfirmTaskData( task_id=task.task_id, task_status=task.task_status, approval_status=task.approval_status, approval_id=None, software_a_task_id=task.software_a_task_id, software_a_task_status=task.software_a_task_status, ), timestamp=format_now(settings.default_timezone), ) @router.get("/{task_id}", response_model=ApiResponse[TaskDetailData]) def get_task( task_id: str, db: Annotated[Session, Depends(get_db)], x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, ) -> ApiResponse[TaskDetailData]: settings = get_settings() request_id = build_request_id(x_request_id) service = TaskService(db, settings.default_timezone) try: task = service.get_task(task_id) except TaskNotFoundError as exc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": exc.code, "message": "task not found"}, ) from exc edge_tasks = EdgeTaskRepository(db).list_by_task_id(task_id) tool_calls = ToolCallRepository(db).list_by_task_id(task_id) approval = ApprovalRepository(db).get_by_task_id(task_id) software_a_detail = None if task.software_a_task_id: software_a_detail = MinimalSoftwareAAdapter(settings.default_timezone).get_deploy_task(task.software_a_task_id) verification_result = build_verification_result(edge_tasks) return ApiResponse[TaskDetailData]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="success", data=TaskDetailData( task_id=task.task_id, task_status=task.task_status, approval_status=task.approval_status, risk_level=task.risk_level, intent=ParsedIntent(**json.loads(task.parsed_intent_json)), software_a_task_id=task.software_a_task_id, software_a_task_status=task.software_a_task_status, tool_calls=[ ToolCallItem( tool_name=item.tool_name, success=bool(item.success), ) for item in tool_calls ], verification_result=verification_result, summary=task.summary, result_summary_detail=build_result_summary_detail(task, approval, software_a_detail, edge_tasks), ), timestamp=format_now(settings.default_timezone), ) @router.get("/{task_id}/report", response_model=ApiResponse[TaskReportData]) def get_task_report( task_id: str, db: Annotated[Session, Depends(get_db)], x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, ) -> ApiResponse[TaskReportData]: settings = get_settings() request_id = build_request_id(x_request_id) service = TaskService(db, settings.default_timezone) try: task = service.get_task(task_id) except TaskNotFoundError as exc: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": exc.code, "message": "task not found"}, ) from exc approval = ApprovalRepository(db).get_by_task_id(task_id) tool_calls = ToolCallRepository(db).list_by_task_id(task_id) edge_tasks = EdgeTaskRepository(db).list_by_task_id(task_id) audit_logs = AuditRepository(db).list_by_task_id(task_id) software_a_detail = None if task.software_a_task_id: software_a_detail = MinimalSoftwareAAdapter(settings.default_timezone).get_deploy_task(task.software_a_task_id) approval_trace = [] if approval: approval_trace.append( ApprovalTraceItem( approval_id=approval.approval_id, approval_status=approval.approval_status, risk_level=approval.risk_level, approvers=json.loads(approval.approver_user_ids_json), reason=approval.reason, created_at=approval.created_at, updated_at=approval.updated_at, ) ) tool_trace = [ ToolTraceItem( tool_call_id=item.tool_call_id, request_id=item.request_id, operator_user_id=item.operator_user_id, operator_user_name=item.operator_user_name, tool_name=item.tool_name, success=bool(item.success), duration_ms=item.duration_ms, started_at=item.started_at, finished_at=item.finished_at, request_payload=json.loads(item.request_payload_json), response_payload=json.loads(item.response_payload_json), ) for item in tool_calls ] verification_trace = [ VerificationTraceItem( step_id=item.step_id, edge_id=item.edge_id, tool_name=item.tool_name, step_status=item.step_status, success=None if item.success is None else bool(item.success), duration_ms=item.duration_ms, message=item.message, params=json.loads(item.params_json), result_data=json.loads(item.result_data_json), evidence=json.loads(item.evidence_json), started_at=item.started_at, finished_at=item.finished_at, ) for item in edge_tasks ] audit_trace = [ AuditTraceItem( audit_id=item.audit_id, request_id=item.request_id, action=item.action, result=item.result, operator_user_id=item.operator_user_id, operator_user_name=item.operator_user_name, target=item.target, detail=json.loads(item.detail_json), timestamp=item.timestamp, ) for item in audit_logs ] return ApiResponse[TaskReportData]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="success", data=TaskReportData( task_basic=TaskBasic( task_id=task.task_id, task_status=task.task_status, approval_status=task.approval_status, risk_level=task.risk_level, created_at=task.created_at, updated_at=task.updated_at, confirmed_at=task.confirmed_at, ), intent_snapshot=ParsedIntent(**json.loads(task.parsed_intent_json)), approval_trace=approval_trace, tool_trace=tool_trace, verification_trace=verification_trace, task_metrics=build_task_metrics(task, approval, software_a_detail, tool_calls, edge_tasks, audit_logs), audit_summary=build_audit_summary(audit_logs), result_summary=task.summary, result_summary_detail=build_result_summary_detail(task, approval, software_a_detail, edge_tasks), audit_trace=audit_trace, ), timestamp=format_now(settings.default_timezone), )