auto_agent/backend/app/services/task_service.py

396 lines
16 KiB
Python

from __future__ import annotations
import json
from uuid import uuid4
from sqlalchemy.orm import Session
from app.core.constants import (
APPROVAL_STATUS_APPROVED,
APPROVAL_STATUS_NOT_REQUIRED,
APPROVAL_STATUS_PENDING,
EDGE_STEP_STATUS_CANCELLED,
ERROR_CODE_CONFLICT,
ERROR_CODE_NOT_FOUND,
ERROR_CODE_PERMISSION_DENIED,
RISK_LEVEL_HIGH,
SOFTWARE_A_TASK_STATUS_CANCELLED,
SOFTWARE_A_TASK_STATUS_FAILED,
TASK_STATUS_CANCELLED,
TASK_STATUS_CREATED,
TASK_STATUS_FAILED,
TASK_STATUS_PENDING_APPROVAL,
TASK_STATUS_PENDING_CONFIRM,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_VERIFYING,
)
from app.schemas.approval import ApprovalOperator, ApprovalTarget, CreateApprovalRequest
from app.schemas.software_a import CreateDeployTaskRequest, DeployOptions, SoftwareAOperator
from app.core.time import format_now
from app.adapters.approval.demo_adapter import DemoApprovalAdapter
from app.adapters.software_a.demo_adapter import DemoSoftwareAAdapter
from app.models.audit_log import AuditLog
from app.models.tool_call import ToolCall
from app.models.task import Task
from app.repositories.audit_repository import AuditRepository
from app.repositories.edge_repository import EdgeTaskRepository
from app.repositories.task_repository import TaskRepository
from app.repositories.tool_call_repository import ToolCallRepository
from app.schemas.task import ConfirmTaskRequest, CreateTaskRequest
from app.services.intent_service import IntentService
from app.services.risk_service import RiskService
from app.services.edge_service import EdgeService
class TaskConflictError(Exception):
code = ERROR_CODE_CONFLICT
class TaskPermissionError(Exception):
code = ERROR_CODE_PERMISSION_DENIED
class TaskNotFoundError(Exception):
code = ERROR_CODE_NOT_FOUND
class TaskService:
def __init__(self, db: Session, timezone_name: str) -> None:
self.db = db
self.timezone_name = timezone_name
self.repository = TaskRepository(db)
self.audit_repository = AuditRepository(db)
self.edge_task_repository = EdgeTaskRepository(db)
self.tool_call_repository = ToolCallRepository(db)
self.intent_service = IntentService()
self.risk_service = RiskService()
def _require_task_status(self, task: Task, allowed_statuses: set[str], action: str) -> None:
if task.task_status not in allowed_statuses:
allowed_text = ", ".join(sorted(allowed_statuses))
raise TaskConflictError(f"当前任务状态不允许执行 {action},期望状态: {allowed_text},当前状态: {task.task_status}")
def create_task(self, payload: CreateTaskRequest, request_id: str | None) -> Task:
parsed_intent, missing_slots = self.intent_service.parse(payload.input_text)
risk_level = self.risk_service.evaluate(parsed_intent)
current_time = format_now(self.timezone_name)
task_status = TASK_STATUS_PENDING_CONFIRM if not missing_slots else TASK_STATUS_CREATED
summary = None if not missing_slots else "存在缺失槽位,需补充后再确认。"
task = Task(
task_id=self._build_id("task"),
session_id=payload.session_id,
tenant_id=payload.tenant_id,
request_id=request_id,
input_text=payload.input_text,
channel=payload.channel,
action_type=parsed_intent.get("action_type"),
app_code=parsed_intent.get("app_code"),
env=parsed_intent.get("env"),
version=parsed_intent.get("version"),
software_a_task_id=None,
software_a_task_status=None,
risk_level=risk_level,
approval_status=APPROVAL_STATUS_NOT_REQUIRED,
task_status=task_status,
parsed_intent_json=json.dumps(parsed_intent, ensure_ascii=False),
missing_slots_json=json.dumps(missing_slots, ensure_ascii=False),
summary=summary,
created_at=current_time,
updated_at=current_time,
confirmed_at=None,
)
created_task = self.repository.add(task)
self._write_audit_log(
task_id=created_task.task_id,
request_id=request_id,
action="CREATE_TASK",
result="OK",
target=created_task.app_code,
operator_user_id="u1001",
operator_user_name="alice",
detail={
"request_id": request_id,
"parsed_intent": parsed_intent,
"missing_slots": missing_slots,
"risk_level": risk_level,
},
)
return created_task
def confirm_task(self, task_id: str, payload: ConfirmTaskRequest, request_id: str | None = None) -> tuple[Task, str | None]:
task = self.repository.get_by_task_id(task_id)
if not task:
raise TaskNotFoundError()
self._require_task_status(task, {TASK_STATUS_PENDING_CONFIRM}, "CONFIRM_TASK")
if not payload.confirmed:
raise TaskConflictError("当前版本仅支持 confirmed=true。")
current_time = format_now(self.timezone_name)
task.confirmed_at = current_time
task.updated_at = current_time
approval_id = None
if task.risk_level == RISK_LEVEL_HIGH:
task.task_status = TASK_STATUS_PENDING_APPROVAL
task.approval_status = APPROVAL_STATUS_PENDING
task.summary = "高风险任务已确认,等待审批。"
approval = DemoApprovalAdapter(self.db, self.timezone_name).create_request(
CreateApprovalRequest(
task_id=task.task_id,
risk_level=task.risk_level,
operator=ApprovalOperator(user_id="u1001", user_name="alice"),
action_type=task.action_type or "DEPLOY",
target=ApprovalTarget(app_code=task.app_code or "unknown-app", env=task.env or "unknown-env"),
reason=payload.comment or "高风险任务待审批",
approvers=["u2001"],
)
)
approval_id = approval.approval_id
self._write_audit_log(
task_id=task.task_id,
request_id=request_id,
action="CREATE_APPROVAL_REQUEST",
result="PENDING",
target=task.app_code,
operator_user_id="u1001",
operator_user_name="alice",
detail={"approval_id": approval_id, "reason": payload.comment},
)
else:
task.task_status = TASK_STATUS_RUNNING
task.approval_status = APPROVAL_STATUS_NOT_REQUIRED
task.summary = "任务已确认,准备调用 software-a demo 执行。"
updated_task = self.repository.update(task)
self._write_audit_log(
task_id=updated_task.task_id,
request_id=request_id,
action="CONFIRM_TASK",
result="OK",
target=updated_task.app_code,
operator_user_id="u1001",
operator_user_name="alice",
detail={
"approval_status": updated_task.approval_status,
"task_status": updated_task.task_status,
"comment": payload.comment,
},
)
if updated_task.risk_level != RISK_LEVEL_HIGH:
updated_task = self.execute_task(updated_task.task_id, request_id=request_id)
return updated_task, approval_id
def get_task(self, task_id: str) -> Task:
task = self.repository.get_by_task_id(task_id)
if not task:
raise TaskNotFoundError()
return self.refresh_software_a_status(task)
def refresh_software_a_status(self, task: Task) -> Task:
if not task.software_a_task_id:
return task
software_a_task = DemoSoftwareAAdapter(self.timezone_name).get_deploy_task(task.software_a_task_id)
if software_a_task:
task.software_a_task_status = software_a_task["task_status"]
if software_a_task["task_status"] == SOFTWARE_A_TASK_STATUS_FAILED and task.task_status not in {TASK_STATUS_FAILED, TASK_STATUS_CANCELLED}:
task.task_status = TASK_STATUS_FAILED
task.summary = f"software-a demo 执行失败: {software_a_task.get('error_detail') or 'unknown error'}"
task.updated_at = format_now(self.timezone_name)
task = self.repository.update(task)
return task
def _build_id(self, prefix: str) -> str:
return f"{prefix}-{uuid4().hex[:12]}"
def execute_task(self, task_id: str, request_id: str | None = None) -> Task:
task = self.repository.get_by_task_id(task_id)
if not task:
raise TaskNotFoundError()
self._require_task_status(task, {TASK_STATUS_RUNNING}, "EXECUTE_TASK")
if not task.app_code or not task.env or not task.version:
raise TaskConflictError("当前任务缺少 software-a 执行所需的关键字段。")
if task.software_a_task_id:
raise TaskConflictError("当前任务已创建 software-a 执行任务,不允许重复执行。")
if self.edge_task_repository.list_active_by_task_id(task.task_id):
raise TaskConflictError("当前任务已存在待处理的 edge 验证步骤,不允许重复调度。")
allowed, reason = DemoSoftwareAAdapter(self.timezone_name).check_permission(
task.action_type or "DEPLOY",
task.env,
task.approval_status,
)
if not allowed:
raise TaskPermissionError(f"software-a 权限校验未通过: {reason}")
if task.action_type == "DEPLOY":
tool_started_at = format_now(self.timezone_name)
deploy_result = DemoSoftwareAAdapter(self.timezone_name).create_deploy_task(
CreateDeployTaskRequest(
operator=SoftwareAOperator(user_id="u1001", user_name="alice"),
tenant_id=task.tenant_id,
app_code=task.app_code,
env=task.env,
version=task.version,
target_nodes=self._default_target_nodes(task.env),
deploy_options=DeployOptions(graceful=True),
)
)
tool_finished_at = format_now(self.timezone_name)
task.software_a_task_id = deploy_result["software_a_task_id"]
task.software_a_task_status = deploy_result["task_status"]
deploy_success = deploy_result["task_status"] != SOFTWARE_A_TASK_STATUS_FAILED
task.task_status = TASK_STATUS_RUNNING if deploy_success else TASK_STATUS_FAILED
task.summary = (
"software-a demo 部署任务已创建,等待边缘验证。"
if deploy_success
else f"software-a demo 执行失败: {deploy_result.get('error_detail') or 'unknown error'}"
)
self._write_tool_call(
task_id=task.task_id,
request_id=request_id,
operator_user_id="u1001",
operator_user_name="alice",
tool_name="software_a_deploy",
request_payload={
"app_code": task.app_code,
"env": task.env,
"version": task.version,
"tenant_id": task.tenant_id,
},
response_payload=deploy_result,
success=deploy_success,
started_at=tool_started_at,
finished_at=tool_finished_at,
)
elif task.approval_status == APPROVAL_STATUS_APPROVED:
task.task_status = TASK_STATUS_RUNNING
task.summary = "审批通过后任务已进入执行阶段。"
updated_task = self.repository.update(task)
self._write_audit_log(
task_id=updated_task.task_id,
request_id=request_id,
action="EXECUTE_TASK",
result="OK" if updated_task.task_status != TASK_STATUS_FAILED else "FAILED",
target=updated_task.app_code,
operator_user_id="u1001",
operator_user_name="alice",
detail={
"software_a_task_id": updated_task.software_a_task_id,
"software_a_task_status": updated_task.software_a_task_status,
"summary": updated_task.summary,
},
)
if updated_task.task_status == TASK_STATUS_RUNNING:
EdgeService(self.db, self.timezone_name).schedule_default_verification(updated_task.task_id)
return updated_task
def cancel_task(self, task_id: str, reason: str | None = None, request_id: str | None = None) -> Task:
task = self.repository.get_by_task_id(task_id)
if not task:
raise TaskNotFoundError()
self._require_task_status(
task,
{
TASK_STATUS_CREATED,
TASK_STATUS_PENDING_CONFIRM,
TASK_STATUS_PENDING_APPROVAL,
TASK_STATUS_RUNNING,
TASK_STATUS_VERIFYING,
},
"CANCEL_TASK",
)
current_time = format_now(self.timezone_name)
task.task_status = TASK_STATUS_CANCELLED
task.updated_at = current_time
task.summary = reason or "任务已取消。"
if task.software_a_task_status == "RUNNING":
task.software_a_task_status = SOFTWARE_A_TASK_STATUS_CANCELLED
updated_task = self.repository.update(task)
for edge_task in self.edge_task_repository.list_active_by_task_id(task_id):
edge_task.step_status = EDGE_STEP_STATUS_CANCELLED
edge_task.updated_at = current_time
edge_task.message = reason or "任务取消"
self.edge_task_repository.update(edge_task)
self._write_audit_log(
task_id=updated_task.task_id,
request_id=request_id,
action="CANCEL_TASK",
result="CANCELLED",
target=updated_task.app_code,
operator_user_id="u1001",
operator_user_name="alice",
detail={"reason": reason},
)
return updated_task
def _default_target_nodes(self, env: str) -> list[str]:
mapping = {
"test": ["10.0.0.12"],
"staging": ["10.0.1.12"],
"prod": ["10.0.2.12", "10.0.2.13"],
}
return mapping.get(env, ["10.0.0.12"])
def _write_tool_call(
self,
task_id: str,
request_id: str | None,
operator_user_id: str | None,
operator_user_name: str | None,
tool_name: str,
request_payload: dict,
response_payload: dict,
success: bool,
started_at: str | None,
finished_at: str | None,
step_id: str | None = None,
duration_ms: int | None = None,
) -> ToolCall:
tool_call = ToolCall(
tool_call_id=f"tool-call-{uuid4().hex[:12]}",
task_id=task_id,
request_id=request_id,
operator_user_id=operator_user_id,
operator_user_name=operator_user_name,
step_id=step_id,
tool_name=tool_name,
request_payload_json=json.dumps(request_payload, ensure_ascii=False),
response_payload_json=json.dumps(response_payload, ensure_ascii=False),
success=1 if success else 0,
duration_ms=duration_ms,
started_at=started_at,
finished_at=finished_at,
)
return self.tool_call_repository.add(tool_call)
def _write_audit_log(
self,
task_id: str,
request_id: str | None,
action: str,
result: str,
target: str | None,
operator_user_id: str | None,
operator_user_name: str | None,
detail: dict,
) -> AuditLog:
audit_log = AuditLog(
audit_id=f"audit-{uuid4().hex[:12]}",
task_id=task_id,
request_id=request_id,
action=action,
operator_user_id=operator_user_id,
operator_user_name=operator_user_name,
target=target,
result=result,
detail_json=json.dumps(detail, ensure_ascii=False),
timestamp=format_now(self.timezone_name),
)
return self.audit_repository.add(audit_log)