2521690 ce299cbb18 feat: 增加 Agent 演示入口与 app_metadata 驱动验证链路
- 新增 app_metadata 模型、仓储与服务
- 将默认 edge 验证步骤改为由 app_metadata 驱动生成
- 新增 chat_session / chat_message 会话层模型与 chat service
- 新增 demo chat API,支持会话创建、消息发送、任务确认
- 新增最小 Web Demo 页面,形成聊天式演示入口
- 增强任务报告,补充 audit_summary 与更细粒度 task_metrics
- 增强 edge-agent 执行器:tcp_probe、日志时间范围过滤、进程指标与更灵活健康检查
- 更新 README 与当前进度总结,MVP 进度推进到约 94%
2026-04-09 14:10:13 +08:00

535 lines
21 KiB
Python

from __future__ import annotations
import json
from typing import Annotated
from uuid import uuid4
from fastapi import APIRouter, Depends, Header, HTTPException, status
from sqlalchemy.orm import Session
from app.core.config import get_settings
from app.core.constants import ERROR_CODE_OK
from app.core.time import compute_duration_ms, format_now, parse_timestamp
from app.db.session import get_db
from app.repositories.approval_repository import ApprovalRepository
from app.repositories.audit_repository import AuditRepository
from app.repositories.edge_repository import EdgeTaskRepository
from app.repositories.tool_call_repository import ToolCallRepository
from app.adapters.software_a.minimal_adapter import MinimalSoftwareAAdapter
from app.schemas.common import ApiResponse
from app.schemas.task import (
AuditSummary,
ApprovalSummary,
ApprovalTraceItem,
AuditTraceItem,
CancelTaskRequest,
ConfirmTaskData,
ConfirmTaskRequest,
CreateTaskData,
CreateTaskRequest,
ParsedIntent,
ResultSummaryDetail,
SoftwareAResultSummary,
TaskBasic,
TaskDetailData,
TaskMetrics,
TaskReportData,
ToolTraceItem,
ToolCallItem,
VerificationResultSummary,
VerificationTraceItem,
)
from app.services.task_service import TaskConflictError, TaskNotFoundError, TaskService
from app.services.task_service import TaskPermissionError
router = APIRouter(prefix="/api/agent/tasks", tags=["agent-task"])
def build_request_id(header_value: str | None) -> str:
return header_value or f"req-{uuid4().hex[:12]}"
def build_result_summary_detail(task, approval, software_a_detail: dict | None, edge_tasks) -> ResultSummaryDetail:
latest_edge_task = edge_tasks[0] if edge_tasks else None
final_reason = task.summary
if software_a_detail and software_a_detail.get("error_detail"):
final_reason = software_a_detail["error_detail"]
elif edge_tasks:
failed_message = next((item.message for item in edge_tasks if item.step_status == "FAILED" and item.message), None)
if failed_message:
final_reason = failed_message
elif latest_edge_task and latest_edge_task.message:
final_reason = latest_edge_task.message
elif approval and approval.approval_status == "REJECTED" and approval.reason:
final_reason = approval.reason
approval_summary = None
if approval:
approval_summary = ApprovalSummary(
approval_id=approval.approval_id,
approval_status=approval.approval_status,
reason=approval.reason,
)
software_a_summary = None
if software_a_detail or task.software_a_task_id or task.software_a_task_status:
software_a_summary = SoftwareAResultSummary(
software_a_task_id=task.software_a_task_id,
task_status=(software_a_detail or {}).get("task_status", task.software_a_task_status),
progress_percent=(software_a_detail or {}).get("progress_percent"),
error_detail=(software_a_detail or {}).get("error_detail"),
started_at=(software_a_detail or {}).get("started_at"),
finished_at=(software_a_detail or {}).get("finished_at"),
)
verification_summary = None
if latest_edge_task:
verification_success_values = [bool(item.success) for item in edge_tasks if item.success is not None]
verification_success = None if not verification_success_values else all(verification_success_values)
if any(item.step_status == "FAILED" for item in edge_tasks):
verification_status = "FAILED"
elif all(item.step_status == "SUCCEEDED" for item in edge_tasks):
verification_status = "SUCCEEDED"
elif any(item.step_status == "RUNNING" for item in edge_tasks):
verification_status = "RUNNING"
else:
verification_status = latest_edge_task.step_status
verification_message = next((item.message for item in edge_tasks if item.step_status == "FAILED" and item.message), latest_edge_task.message)
verification_summary = VerificationResultSummary(
step_id=latest_edge_task.step_id if len(edge_tasks) == 1 else None,
step_status=verification_status,
success=verification_success,
duration_ms=sum_duration_ms([item.duration_ms for item in edge_tasks]),
message=verification_message,
)
return ResultSummaryDetail(
final_status=task.task_status,
final_reason=final_reason,
approval=approval_summary,
software_a=software_a_summary,
verification=verification_summary,
)
def pick_latest_timestamp(*values: str | None) -> str | None:
candidates = [value for value in values if value and parse_timestamp(value)]
if not candidates:
return None
return max(candidates, key=lambda value: parse_timestamp(value))
def sum_duration_ms(values: list[int | None]) -> int:
return sum(value for value in values if value is not None)
def build_task_metrics(task, approval, software_a_detail: dict | None, tool_calls, edge_tasks, audit_logs) -> TaskMetrics:
tool_call_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls])
verification_duration_ms_total = sum_duration_ms([item.duration_ms for item in edge_tasks])
software_a_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls if item.tool_name.startswith("software_a")])
verification_queue_wait_duration_ms_total = sum_duration_ms([compute_duration_ms(item.created_at, item.started_at) for item in edge_tasks])
verification_end_to_end_duration_ms_total = sum_duration_ms([compute_duration_ms(item.created_at, item.finished_at) for item in edge_tasks])
tool_call_count = len(tool_calls)
tool_call_success_count = sum(1 for item in tool_calls if bool(item.success))
tool_call_failed_count = sum(1 for item in tool_calls if not bool(item.success))
verification_step_count = len(edge_tasks)
verification_success_count = sum(1 for item in edge_tasks if item.success == 1)
verification_failed_count = sum(1 for item in edge_tasks if item.success == 0)
audit_failure_count = sum(1 for item in audit_logs if item.result in {"FAILED", "REJECTED"})
latest_observed_at = pick_latest_timestamp(
task.updated_at,
approval.updated_at if approval else None,
*(item.finished_at for item in tool_calls),
*(item.finished_at for item in edge_tasks),
software_a_detail.get("finished_at") if software_a_detail else None,
)
total_duration_ms = compute_duration_ms(task.created_at, latest_observed_at)
confirm_wait_duration_ms = compute_duration_ms(task.created_at, task.confirmed_at)
approval_duration_ms = None
if approval:
approval_finished_at = approval.updated_at
if approval.approval_status == "PENDING":
approval_finished_at = latest_observed_at
approval_duration_ms = compute_duration_ms(approval.created_at, approval_finished_at)
execution_started_at = task.confirmed_at
if approval and approval.approval_status == "APPROVED":
execution_started_at = approval.updated_at
execution_duration_ms = None
if execution_started_at:
execution_finished_at = pick_latest_timestamp(
*(item.finished_at for item in tool_calls),
*(item.finished_at for item in edge_tasks),
software_a_detail.get("finished_at") if software_a_detail else None,
task.updated_at,
)
execution_duration_ms = compute_duration_ms(execution_started_at, execution_finished_at)
return TaskMetrics(
total_duration_ms=total_duration_ms,
confirm_wait_duration_ms=confirm_wait_duration_ms,
approval_duration_ms=approval_duration_ms,
execution_duration_ms=execution_duration_ms,
software_a_duration_ms_total=software_a_duration_ms_total,
tool_call_duration_ms_total=tool_call_duration_ms_total,
verification_duration_ms_total=verification_duration_ms_total,
verification_queue_wait_duration_ms_total=verification_queue_wait_duration_ms_total,
verification_end_to_end_duration_ms_total=verification_end_to_end_duration_ms_total,
tool_call_count=tool_call_count,
tool_call_success_count=tool_call_success_count,
tool_call_failed_count=tool_call_failed_count,
verification_step_count=verification_step_count,
verification_success_count=verification_success_count,
verification_failed_count=verification_failed_count,
audit_event_count=len(audit_logs),
audit_failure_count=audit_failure_count,
)
def build_audit_summary(audit_logs) -> AuditSummary:
result_counts: dict[str, int] = {}
action_types = sorted({item.action for item in audit_logs})
operator_user_names = sorted({item.operator_user_name for item in audit_logs if item.operator_user_name})
for item in audit_logs:
result_counts[item.result] = result_counts.get(item.result, 0) + 1
return AuditSummary(
audit_event_count=len(audit_logs),
failure_count=sum(1 for item in audit_logs if item.result in {"FAILED", "REJECTED"}),
pending_count=sum(1 for item in audit_logs if item.result == "PENDING"),
cancelled_count=sum(1 for item in audit_logs if item.result == "CANCELLED"),
reported_count=sum(1 for item in audit_logs if item.result == "REPORTED"),
action_types=action_types,
operator_user_names=operator_user_names,
result_counts=result_counts,
)
def build_verification_result(edge_tasks) -> dict | None:
if not edge_tasks:
return None
def latest_success(tool_name: str) -> bool | None:
for item in edge_tasks:
if item.tool_name == tool_name and item.success is not None:
return bool(item.success)
return None
grep_success = latest_success("grep_log")
port_related = [latest_success(name) for name in ("check_port", "tcp_probe")]
port_values = [value for value in port_related if value is not None]
return {
"http_ok": latest_success("http_health_check"),
"process_ok": latest_success("check_process"),
"port_ok": all(port_values) if port_values else None,
"log_error_count": 0 if grep_success is True else (1 if grep_success is False else None),
}
@router.post("", response_model=ApiResponse[CreateTaskData])
def create_task(
payload: CreateTaskRequest,
db: Annotated[Session, Depends(get_db)],
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
) -> ApiResponse[CreateTaskData]:
settings = get_settings()
request_id = build_request_id(x_request_id)
service = TaskService(db, settings.default_timezone)
try:
task = service.create_task(payload, request_id)
except TaskConflictError as exc:
message = exc.args[0] if exc.args else "task state conflict"
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={"code": exc.code, "message": message},
) from exc
missing_slots = json.loads(task.missing_slots_json)
next_action = "CONFIRM_TASK" if not missing_slots else "FILL_MISSING_SLOTS"
return ApiResponse[CreateTaskData](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="success",
data=CreateTaskData(
task_id=task.task_id,
parsed_intent=ParsedIntent(**json.loads(task.parsed_intent_json)),
missing_slots=missing_slots,
risk_level=task.risk_level,
task_status=task.task_status,
next_action=next_action,
),
timestamp=format_now(settings.default_timezone),
)
@router.post("/{task_id}/confirm", response_model=ApiResponse[ConfirmTaskData])
def confirm_task(
task_id: str,
payload: ConfirmTaskRequest,
db: Annotated[Session, Depends(get_db)],
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
) -> ApiResponse[ConfirmTaskData]:
settings = get_settings()
request_id = build_request_id(x_request_id)
service = TaskService(db, settings.default_timezone)
try:
task, approval_id = service.confirm_task(task_id, payload, request_id=request_id)
except TaskNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"code": exc.code, "message": "task not found"},
) from exc
except TaskConflictError as exc:
message = exc.args[0] if exc.args else "task state conflict"
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={"code": exc.code, "message": message},
) from exc
except TaskPermissionError as exc:
message = exc.args[0] if exc.args else "permission denied"
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={"code": exc.code, "message": message},
) from exc
return ApiResponse[ConfirmTaskData](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="task confirmed",
data=ConfirmTaskData(
task_id=task.task_id,
task_status=task.task_status,
approval_status=task.approval_status,
approval_id=approval_id,
software_a_task_id=task.software_a_task_id,
software_a_task_status=task.software_a_task_status,
),
timestamp=format_now(settings.default_timezone),
)
@router.post("/{task_id}/cancel", response_model=ApiResponse[ConfirmTaskData])
def cancel_task(
task_id: str,
payload: CancelTaskRequest,
db: Annotated[Session, Depends(get_db)],
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
) -> ApiResponse[ConfirmTaskData]:
settings = get_settings()
request_id = build_request_id(x_request_id)
service = TaskService(db, settings.default_timezone)
try:
task = service.cancel_task(task_id, payload.reason, request_id=request_id)
except TaskNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"code": exc.code, "message": "task not found"},
) from exc
except TaskConflictError as exc:
message = exc.args[0] if exc.args else "task state conflict"
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={"code": exc.code, "message": message},
) from exc
return ApiResponse[ConfirmTaskData](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="task cancelled",
data=ConfirmTaskData(
task_id=task.task_id,
task_status=task.task_status,
approval_status=task.approval_status,
approval_id=None,
software_a_task_id=task.software_a_task_id,
software_a_task_status=task.software_a_task_status,
),
timestamp=format_now(settings.default_timezone),
)
@router.get("/{task_id}", response_model=ApiResponse[TaskDetailData])
def get_task(
task_id: str,
db: Annotated[Session, Depends(get_db)],
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
) -> ApiResponse[TaskDetailData]:
settings = get_settings()
request_id = build_request_id(x_request_id)
service = TaskService(db, settings.default_timezone)
try:
task = service.get_task(task_id)
except TaskNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"code": exc.code, "message": "task not found"},
) from exc
edge_tasks = EdgeTaskRepository(db).list_by_task_id(task_id)
tool_calls = ToolCallRepository(db).list_by_task_id(task_id)
approval = ApprovalRepository(db).get_by_task_id(task_id)
software_a_detail = None
if task.software_a_task_id:
software_a_detail = MinimalSoftwareAAdapter(settings.default_timezone).get_deploy_task(task.software_a_task_id)
verification_result = build_verification_result(edge_tasks)
return ApiResponse[TaskDetailData](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="success",
data=TaskDetailData(
task_id=task.task_id,
task_status=task.task_status,
approval_status=task.approval_status,
risk_level=task.risk_level,
intent=ParsedIntent(**json.loads(task.parsed_intent_json)),
software_a_task_id=task.software_a_task_id,
software_a_task_status=task.software_a_task_status,
tool_calls=[
ToolCallItem(
tool_name=item.tool_name,
success=bool(item.success),
)
for item in tool_calls
],
verification_result=verification_result,
summary=task.summary,
result_summary_detail=build_result_summary_detail(task, approval, software_a_detail, edge_tasks),
),
timestamp=format_now(settings.default_timezone),
)
@router.get("/{task_id}/report", response_model=ApiResponse[TaskReportData])
def get_task_report(
task_id: str,
db: Annotated[Session, Depends(get_db)],
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
) -> ApiResponse[TaskReportData]:
settings = get_settings()
request_id = build_request_id(x_request_id)
service = TaskService(db, settings.default_timezone)
try:
task = service.get_task(task_id)
except TaskNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"code": exc.code, "message": "task not found"},
) from exc
approval = ApprovalRepository(db).get_by_task_id(task_id)
tool_calls = ToolCallRepository(db).list_by_task_id(task_id)
edge_tasks = EdgeTaskRepository(db).list_by_task_id(task_id)
audit_logs = AuditRepository(db).list_by_task_id(task_id)
software_a_detail = None
if task.software_a_task_id:
software_a_detail = MinimalSoftwareAAdapter(settings.default_timezone).get_deploy_task(task.software_a_task_id)
approval_trace = []
if approval:
approval_trace.append(
ApprovalTraceItem(
approval_id=approval.approval_id,
approval_status=approval.approval_status,
risk_level=approval.risk_level,
approvers=json.loads(approval.approver_user_ids_json),
reason=approval.reason,
created_at=approval.created_at,
updated_at=approval.updated_at,
)
)
tool_trace = [
ToolTraceItem(
tool_call_id=item.tool_call_id,
request_id=item.request_id,
operator_user_id=item.operator_user_id,
operator_user_name=item.operator_user_name,
tool_name=item.tool_name,
success=bool(item.success),
duration_ms=item.duration_ms,
started_at=item.started_at,
finished_at=item.finished_at,
request_payload=json.loads(item.request_payload_json),
response_payload=json.loads(item.response_payload_json),
)
for item in tool_calls
]
verification_trace = [
VerificationTraceItem(
step_id=item.step_id,
edge_id=item.edge_id,
tool_name=item.tool_name,
step_status=item.step_status,
success=None if item.success is None else bool(item.success),
duration_ms=item.duration_ms,
message=item.message,
params=json.loads(item.params_json),
result_data=json.loads(item.result_data_json),
evidence=json.loads(item.evidence_json),
started_at=item.started_at,
finished_at=item.finished_at,
)
for item in edge_tasks
]
audit_trace = [
AuditTraceItem(
audit_id=item.audit_id,
request_id=item.request_id,
action=item.action,
result=item.result,
operator_user_id=item.operator_user_id,
operator_user_name=item.operator_user_name,
target=item.target,
detail=json.loads(item.detail_json),
timestamp=item.timestamp,
)
for item in audit_logs
]
return ApiResponse[TaskReportData](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="success",
data=TaskReportData(
task_basic=TaskBasic(
task_id=task.task_id,
task_status=task.task_status,
approval_status=task.approval_status,
risk_level=task.risk_level,
created_at=task.created_at,
updated_at=task.updated_at,
confirmed_at=task.confirmed_at,
),
intent_snapshot=ParsedIntent(**json.loads(task.parsed_intent_json)),
approval_trace=approval_trace,
tool_trace=tool_trace,
verification_trace=verification_trace,
task_metrics=build_task_metrics(task, approval, software_a_detail, tool_calls, edge_tasks, audit_logs),
audit_summary=build_audit_summary(audit_logs),
result_summary=task.summary,
result_summary_detail=build_result_summary_detail(task, approval, software_a_detail, edge_tasks),
audit_trace=audit_trace,
),
timestamp=format_now(settings.default_timezone),
)