- 新增 app_metadata 模型、仓储与服务 - 将默认 edge 验证步骤改为由 app_metadata 驱动生成 - 新增 chat_session / chat_message 会话层模型与 chat service - 新增 demo chat API,支持会话创建、消息发送、任务确认 - 新增最小 Web Demo 页面,形成聊天式演示入口 - 增强任务报告,补充 audit_summary 与更细粒度 task_metrics - 增强 edge-agent 执行器:tcp_probe、日志时间范围过滤、进程指标与更灵活健康检查 - 更新 README 与当前进度总结,MVP 进度推进到约 94%
360 lines
14 KiB
Python
360 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from uuid import uuid4
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.constants import (
|
|
EDGE_NODE_STATUS_ONLINE,
|
|
EDGE_STEP_STATUS_CANCELLED,
|
|
EDGE_STEP_STATUS_FAILED,
|
|
EDGE_STEP_STATUS_PENDING,
|
|
EDGE_STEP_STATUS_RUNNING,
|
|
EDGE_STEP_STATUS_SUCCEEDED,
|
|
ERROR_CODE_CONFLICT,
|
|
ERROR_CODE_NOT_FOUND,
|
|
TASK_STATUS_FAILED,
|
|
TASK_STATUS_RUNNING,
|
|
TASK_STATUS_SUCCEEDED,
|
|
TASK_STATUS_VERIFYING,
|
|
)
|
|
from app.core.time import compute_duration_ms, format_now
|
|
from app.models.audit_log import AuditLog
|
|
from app.models.edge_node import EdgeNode
|
|
from app.models.edge_task import EdgeTask
|
|
from app.models.tool_call import ToolCall
|
|
from app.repositories.audit_repository import AuditRepository
|
|
from app.repositories.edge_repository import EdgeNodeRepository, EdgeTaskRepository
|
|
from app.repositories.task_repository import TaskRepository
|
|
from app.repositories.tool_call_repository import ToolCallRepository
|
|
from app.services.metadata_service import MetadataService
|
|
|
|
|
|
class EdgeTaskConflictError(Exception):
|
|
code = ERROR_CODE_CONFLICT
|
|
|
|
|
|
class EdgeTaskNotFoundError(Exception):
|
|
code = ERROR_CODE_NOT_FOUND
|
|
|
|
|
|
class EdgeService:
|
|
def __init__(self, db: Session, timezone_name: str) -> None:
|
|
self.db = db
|
|
self.timezone_name = timezone_name
|
|
self.node_repository = EdgeNodeRepository(db)
|
|
self.edge_task_repository = EdgeTaskRepository(db)
|
|
self.task_repository = TaskRepository(db)
|
|
self.audit_repository = AuditRepository(db)
|
|
self.tool_call_repository = ToolCallRepository(db)
|
|
|
|
def heartbeat(self, edge_id: str, hostname: str, os_type: str, agent_version: str, capabilities: list[str]) -> EdgeNode:
|
|
current_time = format_now(self.timezone_name)
|
|
node = self.node_repository.get_by_edge_id(edge_id)
|
|
if node:
|
|
node.hostname = hostname
|
|
node.os_type = os_type
|
|
node.agent_version = agent_version
|
|
node.capabilities_json = json.dumps(capabilities, ensure_ascii=False)
|
|
node.node_status = EDGE_NODE_STATUS_ONLINE
|
|
node.last_heartbeat_at = current_time
|
|
node.updated_at = current_time
|
|
return self.node_repository.add_or_update(node)
|
|
|
|
node = EdgeNode(
|
|
edge_id=edge_id,
|
|
hostname=hostname,
|
|
os_type=os_type,
|
|
agent_version=agent_version,
|
|
capabilities_json=json.dumps(capabilities, ensure_ascii=False),
|
|
node_status=EDGE_NODE_STATUS_ONLINE,
|
|
last_heartbeat_at=current_time,
|
|
created_at=current_time,
|
|
updated_at=current_time,
|
|
)
|
|
return self.node_repository.add_or_update(node)
|
|
|
|
def schedule_default_verification(self, task_id: str, edge_id: str = "edge-shanghai-001") -> list[EdgeTask]:
|
|
task = self.task_repository.get_by_task_id(task_id)
|
|
if not task:
|
|
raise EdgeTaskNotFoundError()
|
|
if task.task_status != TASK_STATUS_RUNNING:
|
|
raise EdgeTaskConflictError("current task status does not allow scheduling edge verification steps")
|
|
if self.edge_task_repository.list_active_by_task_id(task_id):
|
|
raise EdgeTaskConflictError("task already has active edge verification steps")
|
|
|
|
current_time = format_now(self.timezone_name)
|
|
created_items: list[EdgeTask] = []
|
|
for tool_name, params in self._build_default_verification_steps(task):
|
|
edge_task = EdgeTask(
|
|
edge_task_id=f"edge-task-{uuid4().hex[:12]}",
|
|
step_id=f"step-{uuid4().hex[:12]}",
|
|
task_id=task_id,
|
|
edge_id=edge_id,
|
|
tool_name=tool_name,
|
|
params_json=json.dumps(params, ensure_ascii=False),
|
|
step_status=EDGE_STEP_STATUS_PENDING,
|
|
success=None,
|
|
message=None,
|
|
result_data_json="{}",
|
|
evidence_json="{}",
|
|
duration_ms=None,
|
|
expire_at=current_time,
|
|
started_at=None,
|
|
finished_at=None,
|
|
created_at=current_time,
|
|
updated_at=current_time,
|
|
)
|
|
created_item = self.edge_task_repository.add(edge_task)
|
|
created_items.append(created_item)
|
|
self._write_audit_log(
|
|
task_id=task_id,
|
|
request_id=None,
|
|
action="EDGE_TASK_SCHEDULED",
|
|
result="PENDING",
|
|
target=edge_id,
|
|
operator_user_id=None,
|
|
operator_user_name=None,
|
|
detail={"step_id": created_item.step_id, "tool_name": created_item.tool_name},
|
|
)
|
|
return created_items
|
|
|
|
def pull_tasks(self, edge_id: str, max_tasks: int) -> list[EdgeTask]:
|
|
items = self.edge_task_repository.list_pending_by_edge_id(edge_id)[:max_tasks]
|
|
current_time = format_now(self.timezone_name)
|
|
pulled_items: list[EdgeTask] = []
|
|
for item in items:
|
|
task = self.task_repository.get_by_task_id(item.task_id)
|
|
if not task or task.task_status not in {TASK_STATUS_RUNNING, TASK_STATUS_VERIFYING}:
|
|
item.step_status = EDGE_STEP_STATUS_CANCELLED
|
|
item.updated_at = current_time
|
|
item.message = "task state no longer allows edge execution"
|
|
self.edge_task_repository.update(item)
|
|
continue
|
|
|
|
item.step_status = EDGE_STEP_STATUS_RUNNING
|
|
item.started_at = current_time
|
|
item.updated_at = current_time
|
|
self.edge_task_repository.update(item)
|
|
|
|
task.task_status = TASK_STATUS_VERIFYING
|
|
task.updated_at = current_time
|
|
task.summary = "task entered edge verification stage"
|
|
self.task_repository.update(task)
|
|
pulled_items.append(item)
|
|
return pulled_items
|
|
|
|
def report_task(
|
|
self,
|
|
edge_id: str,
|
|
step_id: str,
|
|
success: bool,
|
|
message: str,
|
|
data: dict,
|
|
evidence: dict,
|
|
started_at: str,
|
|
finished_at: str,
|
|
) -> tuple[EdgeTask, str]:
|
|
edge_task = self.edge_task_repository.get_by_step_id(step_id)
|
|
if not edge_task:
|
|
raise EdgeTaskNotFoundError()
|
|
if edge_task.edge_id != edge_id:
|
|
raise EdgeTaskConflictError("edge_id does not match the assigned edge task")
|
|
if edge_task.step_status not in {EDGE_STEP_STATUS_RUNNING, EDGE_STEP_STATUS_PENDING}:
|
|
raise EdgeTaskConflictError("edge step state does not allow duplicate report")
|
|
|
|
task = self.task_repository.get_by_task_id(edge_task.task_id)
|
|
if not task:
|
|
raise EdgeTaskConflictError("edge task references a non-existent task")
|
|
if task.task_status not in {TASK_STATUS_RUNNING, TASK_STATUS_VERIFYING}:
|
|
raise EdgeTaskConflictError("task state does not allow edge report")
|
|
|
|
edge_task.step_status = EDGE_STEP_STATUS_SUCCEEDED if success else EDGE_STEP_STATUS_FAILED
|
|
edge_task.success = 1 if success else 0
|
|
edge_task.message = message
|
|
edge_task.result_data_json = json.dumps(data, ensure_ascii=False)
|
|
edge_task.evidence_json = json.dumps(evidence, ensure_ascii=False)
|
|
edge_task.started_at = started_at
|
|
edge_task.finished_at = finished_at
|
|
edge_task.duration_ms = compute_duration_ms(started_at, finished_at)
|
|
edge_task.updated_at = format_now(self.timezone_name)
|
|
updated_edge_task = self.edge_task_repository.update(edge_task)
|
|
self._write_tool_call(
|
|
task_id=updated_edge_task.task_id,
|
|
request_id=None,
|
|
operator_user_id=edge_id,
|
|
operator_user_name=edge_id,
|
|
step_id=updated_edge_task.step_id,
|
|
tool_name=updated_edge_task.tool_name,
|
|
request_payload=json.loads(updated_edge_task.params_json),
|
|
response_payload={"data": data, "evidence": evidence, "message": message},
|
|
success=success,
|
|
started_at=started_at,
|
|
finished_at=finished_at,
|
|
)
|
|
|
|
all_steps = self.edge_task_repository.list_by_task_id(task.task_id)
|
|
if not success:
|
|
for item in all_steps:
|
|
if item.step_status in {EDGE_STEP_STATUS_PENDING, EDGE_STEP_STATUS_RUNNING} and item.step_id != updated_edge_task.step_id:
|
|
item.step_status = EDGE_STEP_STATUS_CANCELLED
|
|
item.updated_at = format_now(self.timezone_name)
|
|
item.message = "cancelled because another verification step failed"
|
|
self.edge_task_repository.update(item)
|
|
task.task_status = TASK_STATUS_FAILED
|
|
task.summary = "edge verification failed"
|
|
elif all(item.step_status == EDGE_STEP_STATUS_SUCCEEDED for item in all_steps):
|
|
task.task_status = TASK_STATUS_SUCCEEDED
|
|
task.summary = "all edge verification steps succeeded"
|
|
else:
|
|
task.task_status = TASK_STATUS_VERIFYING
|
|
task.summary = "edge verification is still in progress"
|
|
|
|
task.updated_at = format_now(self.timezone_name)
|
|
self.task_repository.update(task)
|
|
self._write_audit_log(
|
|
task_id=task.task_id,
|
|
request_id=None,
|
|
action="EDGE_TASK_REPORTED",
|
|
result=task.task_status,
|
|
target=edge_id,
|
|
operator_user_id=edge_id,
|
|
operator_user_name=edge_id,
|
|
detail={"step_id": step_id, "tool_name": edge_task.tool_name, "message": message},
|
|
)
|
|
return updated_edge_task, task.task_status
|
|
|
|
def record_event(self, edge_id: str, event_type: str, message: str, detail: dict) -> AuditLog:
|
|
current_time = format_now(self.timezone_name)
|
|
audit = AuditLog(
|
|
audit_id=f"audit-{uuid4().hex[:12]}",
|
|
task_id=f"edge-event:{edge_id}",
|
|
action=f"EDGE_EVENT:{event_type}",
|
|
operator_user_id=edge_id,
|
|
operator_user_name=edge_id,
|
|
target=edge_id,
|
|
result="REPORTED",
|
|
detail_json=json.dumps({"message": message, "detail": detail}, ensure_ascii=False),
|
|
timestamp=current_time,
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
self.db.refresh(audit)
|
|
return audit
|
|
|
|
def _build_default_verification_steps(self, task) -> list[tuple[str, dict]]:
|
|
app_code = task.app_code or "demo-app"
|
|
confirmed_at = task.confirmed_at or format_now(self.timezone_name)
|
|
metadata = MetadataService(self.db, self.timezone_name).get_app_metadata(task.app_code, task.env)
|
|
host = "127.0.0.1"
|
|
port = metadata.listen_port if metadata and metadata.listen_port else 8080
|
|
command_contains = metadata.command_contains if metadata and metadata.command_contains else app_code
|
|
health_check_url = (
|
|
metadata.health_check_url
|
|
if metadata and metadata.health_check_url
|
|
else f"http://{app_code}.{task.env or 'env'}.demo/actuator/health"
|
|
)
|
|
log_path = metadata.log_path if metadata and metadata.log_path else f"logs/{app_code}.log"
|
|
startup_keyword = metadata.startup_keyword if metadata and metadata.startup_keyword else "Started"
|
|
process_name = metadata.process_name if metadata and metadata.process_name else "java"
|
|
return [
|
|
(
|
|
"check_process",
|
|
{
|
|
"process_name": process_name,
|
|
"command_contains": command_contains,
|
|
},
|
|
),
|
|
(
|
|
"check_port",
|
|
{
|
|
"host": host,
|
|
"port": port,
|
|
"timeout_ms": 3000,
|
|
},
|
|
),
|
|
(
|
|
"tcp_probe",
|
|
{
|
|
"host": host,
|
|
"port": port,
|
|
"timeout_ms": 3000,
|
|
},
|
|
),
|
|
(
|
|
"http_health_check",
|
|
{
|
|
"url": health_check_url,
|
|
"timeout_ms": 3000,
|
|
"expected_status": 200,
|
|
"body_contains": "UP",
|
|
},
|
|
),
|
|
(
|
|
"grep_log",
|
|
{
|
|
"path": log_path,
|
|
"keyword": startup_keyword,
|
|
"start_at": confirmed_at,
|
|
"limit": 20,
|
|
},
|
|
),
|
|
]
|
|
|
|
def _write_tool_call(
|
|
self,
|
|
task_id: str,
|
|
request_id: str | None,
|
|
operator_user_id: str | None,
|
|
operator_user_name: str | None,
|
|
step_id: str | None,
|
|
tool_name: str,
|
|
request_payload: dict,
|
|
response_payload: dict,
|
|
success: bool,
|
|
started_at: str | None,
|
|
finished_at: str | None,
|
|
) -> ToolCall:
|
|
tool_call = ToolCall(
|
|
tool_call_id=f"tool-call-{uuid4().hex[:12]}",
|
|
task_id=task_id,
|
|
request_id=request_id,
|
|
operator_user_id=operator_user_id,
|
|
operator_user_name=operator_user_name,
|
|
step_id=step_id,
|
|
tool_name=tool_name,
|
|
request_payload_json=json.dumps(request_payload, ensure_ascii=False),
|
|
response_payload_json=json.dumps(response_payload, ensure_ascii=False),
|
|
success=1 if success else 0,
|
|
duration_ms=compute_duration_ms(started_at, finished_at),
|
|
started_at=started_at,
|
|
finished_at=finished_at,
|
|
)
|
|
return self.tool_call_repository.add(tool_call)
|
|
|
|
def _write_audit_log(
|
|
self,
|
|
task_id: str,
|
|
request_id: str | None,
|
|
action: str,
|
|
result: str,
|
|
target: str | None,
|
|
operator_user_id: str | None,
|
|
operator_user_name: str | None,
|
|
detail: dict,
|
|
) -> AuditLog:
|
|
audit_log = AuditLog(
|
|
audit_id=f"audit-{uuid4().hex[:12]}",
|
|
task_id=task_id,
|
|
request_id=request_id,
|
|
action=action,
|
|
operator_user_id=operator_user_id,
|
|
operator_user_name=operator_user_name,
|
|
target=target,
|
|
result=result,
|
|
detail_json=json.dumps(detail, ensure_ascii=False),
|
|
timestamp=format_now(self.timezone_name),
|
|
)
|
|
return self.audit_repository.add(audit_log)
|