diff --git a/.gitignore b/.gitignore index e9e06e8..a04d282 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .venv/ data/ dist/ +tmp-linux-runtime/ __pycache__/ .pytest_cache/ *.pyc diff --git a/backend/README.md b/backend/README.md index 406dab3..4e011d4 100644 --- a/backend/README.md +++ b/backend/README.md @@ -53,21 +53,29 @@ Current backend includes: `POST /api/agent/tasks/{task_id}/cancel` `GET /api/agent/tasks/{task_id}` `GET /api/agent/tasks/{task_id}/report` -2. demo identity +2. demo chat + `POST /api/demo/chat/sessions` + `GET /api/demo/chat/sessions/{session_id}` + `POST /api/demo/chat/sessions/{session_id}/messages` + `POST /api/demo/chat/sessions/{session_id}/tasks/{task_id}/confirm` +3. demo web + `GET /` + `GET /demo/chat` +4. demo identity `POST /api/demo/identity/login` `GET /api/demo/identity/me` `GET /api/demo/identity/users/{user_id}/permissions` `POST /api/demo/identity/token/introspect` -3. demo approval +5. demo approval `POST /api/demo/approval/requests` `GET /api/demo/approval/requests/{approval_id}` `POST /api/demo/approval/requests/{approval_id}/decision` `GET /api/demo/approval/requests` -4. software-a minimal implementation +6. software-a minimal implementation `POST /api/demo/software-a/deploy-tasks` `GET /api/demo/software-a/deploy-tasks/{software_a_task_id}` `POST /api/demo/software-a/permissions/check` -5. edge +7. edge `POST /api/agent/edge/heartbeat` `POST /api/agent/edge/tasks/pull` `POST /api/agent/edge/tasks/report` @@ -75,15 +83,22 @@ Current backend includes: Current execution flow: -1. create task -2. confirm task -3. high-risk task enters approval flow -4. check `software-a` minimal implementation permission -5. create `software-a` minimal implementation deploy task -6. create default edge verification step -7. edge pulls and reports verification result -8. task reaches `SUCCEEDED` / `FAILED` / `CANCELLED` -9. task detail/report returns software-a status, approval trace, tool trace, verification trace and audit trace +1. create chat session or open web demo +2. send one natural-language message +3. create task +4. confirm task +5. high-risk task enters approval flow +6. check `software-a` minimal implementation permission +7. create `software-a` minimal implementation deploy task +8. build metadata-driven multi-step edge verification plan: + `check_process` + `check_port` + `tcp_probe` + `http_health_check` + `grep_log` +9. edge pulls and reports verification results +10. task reaches `SUCCEEDED` / `FAILED` / `CANCELLED` +11. task detail/report returns software-a status, approval trace, tool trace, verification trace and audit trace Current execution metrics: @@ -95,13 +110,19 @@ Current execution metrics: `confirm_wait_duration_ms` `approval_duration_ms` `execution_duration_ms` + `software_a_duration_ms_total` `tool_call_duration_ms_total` `verification_duration_ms_total` + `verification_queue_wait_duration_ms_total` + `verification_end_to_end_duration_ms_total` +5. `task_report.audit_summary` returns audit result counts, action types and operator summary Current result summary capabilities: 1. task detail/report returns `result_summary_detail` 2. summary includes final status, final reason, software-a result, approval result and verification result +3. demo chat API returns assistant-style parse/confirm messages +4. demo web page provides a visual conversation -> confirm -> execute -> report flow Demo failure semantics currently include: @@ -122,12 +143,13 @@ Automated tests currently cover: 7. cancel running task Current baseline: `20 passed` +Current baseline: `23 passed` ## Next Focus Recommended next implementation steps: -1. continue enriching audit details and task-level metric breakdown -2. continue implementing local edge-agent executors beyond `http_health_check` -3. add packaging/bootstrap scripts for portable edge-agent delivery -4. then continue with second-batch OpenAPI +1. continue enriching app-metadata-driven verification templates +2. connect a real Java sample app to the current demo flow +3. validate native Linux packaging in a real bash/Linux environment +4. then continue with second-batch OpenAPI and UI polish diff --git a/backend/app/api/agent/tasks.py b/backend/app/api/agent/tasks.py index 5f808d8..58c5dfd 100644 --- a/backend/app/api/agent/tasks.py +++ b/backend/app/api/agent/tasks.py @@ -18,6 +18,7 @@ from app.repositories.tool_call_repository import ToolCallRepository from app.adapters.software_a.minimal_adapter import MinimalSoftwareAAdapter from app.schemas.common import ApiResponse from app.schemas.task import ( + AuditSummary, ApprovalSummary, ApprovalTraceItem, AuditTraceItem, @@ -53,8 +54,12 @@ def build_result_summary_detail(task, approval, software_a_detail: dict | None, final_reason = task.summary if software_a_detail and software_a_detail.get("error_detail"): final_reason = software_a_detail["error_detail"] - elif latest_edge_task and latest_edge_task.message: - final_reason = latest_edge_task.message + elif edge_tasks: + failed_message = next((item.message for item in edge_tasks if item.step_status == "FAILED" and item.message), None) + if failed_message: + final_reason = failed_message + elif latest_edge_task and latest_edge_task.message: + final_reason = latest_edge_task.message elif approval and approval.approval_status == "REJECTED" and approval.reason: final_reason = approval.reason @@ -79,12 +84,23 @@ def build_result_summary_detail(task, approval, software_a_detail: dict | None, verification_summary = None if latest_edge_task: + verification_success_values = [bool(item.success) for item in edge_tasks if item.success is not None] + verification_success = None if not verification_success_values else all(verification_success_values) + if any(item.step_status == "FAILED" for item in edge_tasks): + verification_status = "FAILED" + elif all(item.step_status == "SUCCEEDED" for item in edge_tasks): + verification_status = "SUCCEEDED" + elif any(item.step_status == "RUNNING" for item in edge_tasks): + verification_status = "RUNNING" + else: + verification_status = latest_edge_task.step_status + verification_message = next((item.message for item in edge_tasks if item.step_status == "FAILED" and item.message), latest_edge_task.message) verification_summary = VerificationResultSummary( - step_id=latest_edge_task.step_id, - step_status=latest_edge_task.step_status, - success=None if latest_edge_task.success is None else bool(latest_edge_task.success), - duration_ms=latest_edge_task.duration_ms, - message=latest_edge_task.message, + step_id=latest_edge_task.step_id if len(edge_tasks) == 1 else None, + step_status=verification_status, + success=verification_success, + duration_ms=sum_duration_ms([item.duration_ms for item in edge_tasks]), + message=verification_message, ) return ResultSummaryDetail( @@ -110,6 +126,9 @@ def sum_duration_ms(values: list[int | None]) -> int: def build_task_metrics(task, approval, software_a_detail: dict | None, tool_calls, edge_tasks, audit_logs) -> TaskMetrics: tool_call_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls]) verification_duration_ms_total = sum_duration_ms([item.duration_ms for item in edge_tasks]) + software_a_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls if item.tool_name.startswith("software_a")]) + verification_queue_wait_duration_ms_total = sum_duration_ms([compute_duration_ms(item.created_at, item.started_at) for item in edge_tasks]) + verification_end_to_end_duration_ms_total = sum_duration_ms([compute_duration_ms(item.created_at, item.finished_at) for item in edge_tasks]) tool_call_count = len(tool_calls) tool_call_success_count = sum(1 for item in tool_calls if bool(item.success)) @@ -118,6 +137,7 @@ def build_task_metrics(task, approval, software_a_detail: dict | None, tool_call verification_step_count = len(edge_tasks) verification_success_count = sum(1 for item in edge_tasks if item.success == 1) verification_failed_count = sum(1 for item in edge_tasks if item.success == 0) + audit_failure_count = sum(1 for item in audit_logs if item.result in {"FAILED", "REJECTED"}) latest_observed_at = pick_latest_timestamp( task.updated_at, @@ -156,8 +176,11 @@ def build_task_metrics(task, approval, software_a_detail: dict | None, tool_call confirm_wait_duration_ms=confirm_wait_duration_ms, approval_duration_ms=approval_duration_ms, execution_duration_ms=execution_duration_ms, + software_a_duration_ms_total=software_a_duration_ms_total, tool_call_duration_ms_total=tool_call_duration_ms_total, verification_duration_ms_total=verification_duration_ms_total, + verification_queue_wait_duration_ms_total=verification_queue_wait_duration_ms_total, + verification_end_to_end_duration_ms_total=verification_end_to_end_duration_ms_total, tool_call_count=tool_call_count, tool_call_success_count=tool_call_success_count, tool_call_failed_count=tool_call_failed_count, @@ -165,9 +188,51 @@ def build_task_metrics(task, approval, software_a_detail: dict | None, tool_call verification_success_count=verification_success_count, verification_failed_count=verification_failed_count, audit_event_count=len(audit_logs), + audit_failure_count=audit_failure_count, ) +def build_audit_summary(audit_logs) -> AuditSummary: + result_counts: dict[str, int] = {} + action_types = sorted({item.action for item in audit_logs}) + operator_user_names = sorted({item.operator_user_name for item in audit_logs if item.operator_user_name}) + + for item in audit_logs: + result_counts[item.result] = result_counts.get(item.result, 0) + 1 + + return AuditSummary( + audit_event_count=len(audit_logs), + failure_count=sum(1 for item in audit_logs if item.result in {"FAILED", "REJECTED"}), + pending_count=sum(1 for item in audit_logs if item.result == "PENDING"), + cancelled_count=sum(1 for item in audit_logs if item.result == "CANCELLED"), + reported_count=sum(1 for item in audit_logs if item.result == "REPORTED"), + action_types=action_types, + operator_user_names=operator_user_names, + result_counts=result_counts, + ) + + +def build_verification_result(edge_tasks) -> dict | None: + if not edge_tasks: + return None + + def latest_success(tool_name: str) -> bool | None: + for item in edge_tasks: + if item.tool_name == tool_name and item.success is not None: + return bool(item.success) + return None + + grep_success = latest_success("grep_log") + port_related = [latest_success(name) for name in ("check_port", "tcp_probe")] + port_values = [value for value in port_related if value is not None] + return { + "http_ok": latest_success("http_health_check"), + "process_ok": latest_success("check_process"), + "port_ok": all(port_values) if port_values else None, + "log_error_count": 0 if grep_success is True else (1 if grep_success is False else None), + } + + @router.post("", response_model=ApiResponse[CreateTaskData]) def create_task( payload: CreateTaskRequest, @@ -320,16 +385,7 @@ def get_task( software_a_detail = None if task.software_a_task_id: software_a_detail = MinimalSoftwareAAdapter(settings.default_timezone).get_deploy_task(task.software_a_task_id) - verification_result = None - if edge_tasks: - latest_edge_task = edge_tasks[0] - if latest_edge_task.success is not None: - verification_result = { - "http_ok": bool(latest_edge_task.success), - "process_ok": None, - "port_ok": None, - "log_error_count": 0 if latest_edge_task.success else 1, - } + verification_result = build_verification_result(edge_tasks) return ApiResponse[TaskDetailData]( request_id=request_id, @@ -469,6 +525,7 @@ def get_task_report( tool_trace=tool_trace, verification_trace=verification_trace, task_metrics=build_task_metrics(task, approval, software_a_detail, tool_calls, edge_tasks, audit_logs), + audit_summary=build_audit_summary(audit_logs), result_summary=task.summary, result_summary_detail=build_result_summary_detail(task, approval, software_a_detail, edge_tasks), audit_trace=audit_trace, diff --git a/backend/app/api/demo/chat.py b/backend/app/api/demo/chat.py new file mode 100644 index 0000000..e6abec8 --- /dev/null +++ b/backend/app/api/demo/chat.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +from typing import Annotated +from uuid import uuid4 + +from fastapi import APIRouter, Depends, Header, HTTPException, status +from sqlalchemy.orm import Session + +from app.core.config import get_settings +from app.core.constants import ERROR_CODE_OK +from app.core.time import format_now +from app.db.session import get_db +from app.schemas.chat import ( + ChatConfirmTaskData, + ChatConfirmTaskRequest, + ChatSendMessageData, + ChatSendMessageRequest, + ChatSessionCreateRequest, + ChatSessionData, +) +from app.schemas.common import ApiResponse +from app.services.chat_service import ChatService, ChatSessionNotFoundError + +router = APIRouter(prefix="/api/demo/chat", tags=["demo-chat"]) + + +def build_request_id(header_value: str | None) -> str: + return header_value or f"req-{uuid4().hex[:12]}" + + +@router.post("/sessions", response_model=ApiResponse[ChatSessionData]) +def create_session( + payload: ChatSessionCreateRequest, + db: Annotated[Session, Depends(get_db)], + x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, +) -> ApiResponse[ChatSessionData]: + settings = get_settings() + request_id = build_request_id(x_request_id) + service = ChatService(db, settings.default_timezone) + session = service.create_session(payload.tenant_id, payload.channel) + messages = [service.to_message_item(item) for item in service.list_messages(session.session_id)] + return ApiResponse[ChatSessionData]( + request_id=request_id, + success=True, + code=ERROR_CODE_OK, + message="success", + data=ChatSessionData( + session_id=session.session_id, + tenant_id=session.tenant_id, + channel=session.channel, + title=session.title, + last_task_id=session.last_task_id, + sample_prompts=ChatService.SAMPLE_PROMPTS, + messages=messages, + ), + timestamp=format_now(settings.default_timezone), + ) + + +@router.get("/sessions/{session_id}", response_model=ApiResponse[ChatSessionData]) +def get_session( + session_id: str, + db: Annotated[Session, Depends(get_db)], + x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, +) -> ApiResponse[ChatSessionData]: + settings = get_settings() + request_id = build_request_id(x_request_id) + service = ChatService(db, settings.default_timezone) + try: + session = service.get_session(session_id) + except ChatSessionNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": "NOT_FOUND", "message": "chat session not found"}) from exc + messages = [service.to_message_item(item) for item in service.list_messages(session.session_id)] + return ApiResponse[ChatSessionData]( + request_id=request_id, + success=True, + code=ERROR_CODE_OK, + message="success", + data=ChatSessionData( + session_id=session.session_id, + tenant_id=session.tenant_id, + channel=session.channel, + title=session.title, + last_task_id=session.last_task_id, + sample_prompts=ChatService.SAMPLE_PROMPTS, + messages=messages, + ), + timestamp=format_now(settings.default_timezone), + ) + + +@router.post("/sessions/{session_id}/messages", response_model=ApiResponse[ChatSendMessageData]) +def send_message( + session_id: str, + payload: ChatSendMessageRequest, + db: Annotated[Session, Depends(get_db)], + x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, +) -> ApiResponse[ChatSendMessageData]: + settings = get_settings() + request_id = build_request_id(x_request_id) + service = ChatService(db, settings.default_timezone) + try: + _, assistant_message, task_data = service.handle_user_message(session_id, payload.content, payload.context) + except ChatSessionNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": "NOT_FOUND", "message": "chat session not found"}) from exc + + return ApiResponse[ChatSendMessageData]( + request_id=request_id, + success=True, + code=ERROR_CODE_OK, + message="success", + data=ChatSendMessageData( + session_id=session_id, + task_id=task_data["task_id"], + task_status=task_data["task_status"], + parsed_intent=task_data["parsed_intent"], + missing_slots=task_data["missing_slots"], + risk_level=task_data["risk_level"], + next_action=task_data["next_action"], + assistant_message=service.to_message_item(assistant_message), + ), + timestamp=format_now(settings.default_timezone), + ) + + +@router.post("/sessions/{session_id}/tasks/{task_id}/confirm", response_model=ApiResponse[ChatConfirmTaskData]) +def confirm_task_from_chat( + session_id: str, + task_id: str, + payload: ChatConfirmTaskRequest, + db: Annotated[Session, Depends(get_db)], + x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, +) -> ApiResponse[ChatConfirmTaskData]: + settings = get_settings() + request_id = build_request_id(x_request_id) + service = ChatService(db, settings.default_timezone) + try: + _, assistant_message, task_data = service.confirm_task(session_id, task_id, payload.comment) + except ChatSessionNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": "NOT_FOUND", "message": "chat session not found"}) from exc + + return ApiResponse[ChatConfirmTaskData]( + request_id=request_id, + success=True, + code=ERROR_CODE_OK, + message="success", + data=ChatConfirmTaskData( + session_id=session_id, + task_id=task_data["task_id"], + task_status=task_data["task_status"], + approval_status=task_data["approval_status"], + assistant_message=service.to_message_item(assistant_message), + ), + timestamp=format_now(settings.default_timezone), + ) diff --git a/backend/app/api/web/demo.py b/backend/app/api/web/demo.py new file mode 100644 index 0000000..9e3c5ab --- /dev/null +++ b/backend/app/api/web/demo.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from pathlib import Path + +from fastapi import APIRouter +from fastapi.responses import HTMLResponse + + +router = APIRouter(tags=["demo-web"]) + + +@router.get("/", response_class=HTMLResponse) +@router.get("/demo/chat", response_class=HTMLResponse) +def demo_chat_page() -> HTMLResponse: + html_path = Path(__file__).resolve().parents[2] / "web" / "chat_demo.html" + return HTMLResponse(html_path.read_text(encoding="utf-8")) diff --git a/backend/app/main.py b/backend/app/main.py index 2ebbac5..b797a66 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -7,19 +7,26 @@ from fastapi.responses import JSONResponse from app.api.agent.tasks import router as task_router from app.api.demo.approval import router as demo_approval_router +from app.api.demo.chat import router as demo_chat_router from app.api.demo.identity import router as demo_identity_router from app.api.demo.software_a import router as demo_software_a_router from app.api.edge.tasks import router as edge_router +from app.api.web.demo import router as demo_web_router from app.core.config import ensure_runtime_directories, get_settings from app.core.time import format_now from app.db.base import Base from app.db.session import engine +from app.db.session import SessionLocal +from app.models.app_metadata import AppMetadata from app.models.approval import ApprovalRequest from app.models.audit_log import AuditLog +from app.models.chat_message import ChatMessage +from app.models.chat_session import ChatSession from app.models.edge_node import EdgeNode from app.models.edge_task import EdgeTask from app.models.task import Task from app.models.tool_call import ToolCall +from app.services.metadata_service import MetadataService settings = get_settings() @@ -27,6 +34,11 @@ settings = get_settings() async def lifespan(_: FastAPI): ensure_runtime_directories() Base.metadata.create_all(bind=engine) + db = SessionLocal() + try: + MetadataService(db, settings.default_timezone).ensure_demo_metadata() + finally: + db.close() yield @@ -59,7 +71,9 @@ def healthz() -> dict[str, str]: app.include_router(task_router) +app.include_router(demo_chat_router) app.include_router(demo_identity_router) app.include_router(demo_approval_router) app.include_router(demo_software_a_router) app.include_router(edge_router) +app.include_router(demo_web_router) diff --git a/backend/app/models/app_metadata.py b/backend/app/models/app_metadata.py new file mode 100644 index 0000000..b4017e5 --- /dev/null +++ b/backend/app/models/app_metadata.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from sqlalchemy import Integer, Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.db.base import Base + + +class AppMetadata(Base): + __tablename__ = "app_metadata" + + app_metadata_id: Mapped[str] = mapped_column(Text, primary_key=True) + app_code: Mapped[str] = mapped_column(Text, nullable=False, index=True) + env: Mapped[str] = mapped_column(Text, nullable=False, index=True) + process_name: Mapped[str | None] = mapped_column(Text, nullable=True) + command_contains: Mapped[str | None] = mapped_column(Text, nullable=True) + health_check_url: Mapped[str | None] = mapped_column(Text, nullable=True) + log_path: Mapped[str | None] = mapped_column(Text, nullable=True) + listen_port: Mapped[int | None] = mapped_column(Integer, nullable=True) + startup_keyword: Mapped[str | None] = mapped_column(Text, nullable=True) + created_at: Mapped[str] = mapped_column(Text, nullable=False) + updated_at: Mapped[str] = mapped_column(Text, nullable=False) diff --git a/backend/app/models/chat_message.py b/backend/app/models/chat_message.py new file mode 100644 index 0000000..ac1cc4d --- /dev/null +++ b/backend/app/models/chat_message.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from sqlalchemy import Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.db.base import Base + + +class ChatMessage(Base): + __tablename__ = "chat_message" + + message_id: Mapped[str] = mapped_column(Text, primary_key=True) + session_id: Mapped[str] = mapped_column(Text, nullable=False, index=True) + role: Mapped[str] = mapped_column(Text, nullable=False) + content: Mapped[str] = mapped_column(Text, nullable=False) + message_type: Mapped[str] = mapped_column(Text, nullable=False) + task_id: Mapped[str | None] = mapped_column(Text, nullable=True, index=True) + created_at: Mapped[str] = mapped_column(Text, nullable=False) diff --git a/backend/app/models/chat_session.py b/backend/app/models/chat_session.py new file mode 100644 index 0000000..71000d4 --- /dev/null +++ b/backend/app/models/chat_session.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from sqlalchemy import Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.db.base import Base + + +class ChatSession(Base): + __tablename__ = "chat_session" + + session_id: Mapped[str] = mapped_column(Text, primary_key=True) + tenant_id: Mapped[str] = mapped_column(Text, nullable=False, index=True) + channel: Mapped[str] = mapped_column(Text, nullable=False) + title: Mapped[str | None] = mapped_column(Text, nullable=True) + last_task_id: Mapped[str | None] = mapped_column(Text, nullable=True) + context_json: Mapped[str] = mapped_column(Text, nullable=False) + created_at: Mapped[str] = mapped_column(Text, nullable=False) + updated_at: Mapped[str] = mapped_column(Text, nullable=False) diff --git a/backend/app/repositories/chat_repository.py b/backend/app/repositories/chat_repository.py new file mode 100644 index 0000000..83d21c3 --- /dev/null +++ b/backend/app/repositories/chat_repository.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.models.chat_message import ChatMessage +from app.models.chat_session import ChatSession + + +class ChatSessionRepository: + def __init__(self, db: Session) -> None: + self.db = db + + def add(self, item: ChatSession) -> ChatSession: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def update(self, item: ChatSession) -> ChatSession: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def get_by_session_id(self, session_id: str) -> ChatSession | None: + statement = select(ChatSession).where(ChatSession.session_id == session_id) + return self.db.execute(statement).scalar_one_or_none() + + +class ChatMessageRepository: + def __init__(self, db: Session) -> None: + self.db = db + + def add(self, item: ChatMessage) -> ChatMessage: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def list_by_session_id(self, session_id: str) -> list[ChatMessage]: + statement = select(ChatMessage).where(ChatMessage.session_id == session_id).order_by(ChatMessage.created_at.asc()) + return list(self.db.execute(statement).scalars()) diff --git a/backend/app/repositories/metadata_repository.py b/backend/app/repositories/metadata_repository.py new file mode 100644 index 0000000..ec93843 --- /dev/null +++ b/backend/app/repositories/metadata_repository.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.models.app_metadata import AppMetadata + + +class AppMetadataRepository: + def __init__(self, db: Session) -> None: + self.db = db + + def add(self, item: AppMetadata) -> AppMetadata: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def update(self, item: AppMetadata) -> AppMetadata: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def get_by_app_env(self, app_code: str, env: str) -> AppMetadata | None: + statement = select(AppMetadata).where(AppMetadata.app_code == app_code).where(AppMetadata.env == env) + return self.db.execute(statement).scalar_one_or_none() + + def list_all(self) -> list[AppMetadata]: + statement = select(AppMetadata).order_by(AppMetadata.app_code.asc(), AppMetadata.env.asc()) + return list(self.db.execute(statement).scalars()) diff --git a/backend/app/schemas/chat.py b/backend/app/schemas/chat.py new file mode 100644 index 0000000..f2d57bd --- /dev/null +++ b/backend/app/schemas/chat.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + +from app.schemas.task import ParsedIntent + + +class ChatSessionCreateRequest(BaseModel): + tenant_id: str = "tenant-demo" + channel: str = "WEB" + + +class ChatMessageItem(BaseModel): + message_id: str + role: str + content: str + message_type: str + task_id: str | None = None + created_at: str + + +class ChatSessionData(BaseModel): + session_id: str + tenant_id: str + channel: str + title: str | None = None + last_task_id: str | None = None + sample_prompts: list[str] = Field(default_factory=list) + messages: list[ChatMessageItem] = Field(default_factory=list) + + +class ChatSendMessageRequest(BaseModel): + content: str + context: dict[str, Any] = Field(default_factory=dict) + + +class ChatSendMessageData(BaseModel): + session_id: str + task_id: str + task_status: str + parsed_intent: ParsedIntent + missing_slots: list[str] + risk_level: str + next_action: str + assistant_message: ChatMessageItem + + +class ChatConfirmTaskRequest(BaseModel): + comment: str | None = None + + +class ChatConfirmTaskData(BaseModel): + session_id: str + task_id: str + task_status: str + approval_status: str + assistant_message: ChatMessageItem diff --git a/backend/app/schemas/metadata.py b/backend/app/schemas/metadata.py new file mode 100644 index 0000000..1d707a2 --- /dev/null +++ b/backend/app/schemas/metadata.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from pydantic import BaseModel + + +class AppMetadataData(BaseModel): + app_code: str + env: str + process_name: str | None = None + command_contains: str | None = None + health_check_url: str | None = None + log_path: str | None = None + listen_port: int | None = None + startup_keyword: str | None = None diff --git a/backend/app/schemas/task.py b/backend/app/schemas/task.py index e4e533a..93d9dd5 100644 --- a/backend/app/schemas/task.py +++ b/backend/app/schemas/task.py @@ -165,13 +165,27 @@ class AuditTraceItem(BaseModel): timestamp: str +class AuditSummary(BaseModel): + audit_event_count: int = 0 + failure_count: int = 0 + pending_count: int = 0 + cancelled_count: int = 0 + reported_count: int = 0 + action_types: list[str] = Field(default_factory=list) + operator_user_names: list[str] = Field(default_factory=list) + result_counts: dict[str, int] = Field(default_factory=dict) + + class TaskMetrics(BaseModel): total_duration_ms: int | None = None confirm_wait_duration_ms: int | None = None approval_duration_ms: int | None = None execution_duration_ms: int | None = None + software_a_duration_ms_total: int = 0 tool_call_duration_ms_total: int = 0 verification_duration_ms_total: int = 0 + verification_queue_wait_duration_ms_total: int = 0 + verification_end_to_end_duration_ms_total: int = 0 tool_call_count: int = 0 tool_call_success_count: int = 0 tool_call_failed_count: int = 0 @@ -179,6 +193,7 @@ class TaskMetrics(BaseModel): verification_success_count: int = 0 verification_failed_count: int = 0 audit_event_count: int = 0 + audit_failure_count: int = 0 class TaskReportData(BaseModel): @@ -188,6 +203,7 @@ class TaskReportData(BaseModel): tool_trace: list[ToolTraceItem] verification_trace: list[VerificationTraceItem] task_metrics: TaskMetrics + audit_summary: AuditSummary result_summary: str | None = None result_summary_detail: ResultSummaryDetail | None = None audit_trace: list[AuditTraceItem] diff --git a/backend/app/services/chat_service.py b/backend/app/services/chat_service.py new file mode 100644 index 0000000..8312678 --- /dev/null +++ b/backend/app/services/chat_service.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import json +from uuid import uuid4 + +from sqlalchemy.orm import Session + +from app.core.time import format_now +from app.models.chat_message import ChatMessage +from app.models.chat_session import ChatSession +from app.repositories.chat_repository import ChatMessageRepository, ChatSessionRepository +from app.schemas.chat import ChatMessageItem +from app.schemas.task import ConfirmTaskRequest, CreateTaskRequest, ParsedIntent +from app.services.task_service import TaskService + + +class ChatSessionNotFoundError(Exception): + pass + + +class ChatService: + SAMPLE_PROMPTS = [ + "deploy order-service 1.2.3 to test", + "deploy payment-service 1.2.3 to test", + "deploy order-service 1.2.3 to prod", + ] + + def __init__(self, db: Session, timezone_name: str) -> None: + self.db = db + self.timezone_name = timezone_name + self.session_repository = ChatSessionRepository(db) + self.message_repository = ChatMessageRepository(db) + self.task_service = TaskService(db, timezone_name) + + def create_session(self, tenant_id: str, channel: str) -> ChatSession: + current_time = format_now(self.timezone_name) + session = ChatSession( + session_id=f"chat-{uuid4().hex[:12]}", + tenant_id=tenant_id, + channel=channel, + title="Agent Demo Session", + last_task_id=None, + context_json=json.dumps({}, ensure_ascii=False), + created_at=current_time, + updated_at=current_time, + ) + created_session = self.session_repository.add(session) + self._add_message( + session_id=created_session.session_id, + role="assistant", + content="请输入一句自然语言,例如:deploy order-service 1.2.3 to test", + message_type="welcome", + task_id=None, + ) + return created_session + + def get_session(self, session_id: str) -> ChatSession: + session = self.session_repository.get_by_session_id(session_id) + if not session: + raise ChatSessionNotFoundError() + return session + + def list_messages(self, session_id: str) -> list[ChatMessage]: + self.get_session(session_id) + return self.message_repository.list_by_session_id(session_id) + + def handle_user_message(self, session_id: str, content: str, context: dict | None = None) -> tuple[ChatSession, ChatMessage, dict]: + session = self.get_session(session_id) + self._add_message(session_id=session_id, role="user", content=content, message_type="user_input", task_id=None) + request_id = f"chat-req-{uuid4().hex[:12]}" + task = self.task_service.create_task( + CreateTaskRequest( + input_text=content, + channel=session.channel, + session_id=session.session_id, + tenant_id=session.tenant_id, + context=context or {}, + ), + request_id=request_id, + ) + session.last_task_id = task.task_id + session.updated_at = format_now(self.timezone_name) + self.session_repository.update(session) + + parsed_intent = json.loads(task.parsed_intent_json) + missing_slots = json.loads(task.missing_slots_json) + next_action = "CONFIRM_TASK" if not missing_slots else "FILL_MISSING_SLOTS" + assistant_text = self._build_parse_reply(parsed_intent, missing_slots, task.risk_level, next_action) + assistant_message = self._add_message( + session_id=session_id, + role="assistant", + content=assistant_text, + message_type="task_parse", + task_id=task.task_id, + ) + return session, assistant_message, { + "task_id": task.task_id, + "task_status": task.task_status, + "parsed_intent": ParsedIntent(**parsed_intent), + "missing_slots": missing_slots, + "risk_level": task.risk_level, + "next_action": next_action, + } + + def confirm_task(self, session_id: str, task_id: str, comment: str | None) -> tuple[ChatSession, ChatMessage, dict]: + session = self.get_session(session_id) + task, approval_id = self.task_service.confirm_task( + task_id, + ConfirmTaskRequest(confirmed=True, comment=comment), + request_id=f"chat-confirm-{uuid4().hex[:12]}", + ) + session.last_task_id = task.task_id + session.updated_at = format_now(self.timezone_name) + self.session_repository.update(session) + + assistant_text = self._build_confirm_reply(task.task_status, task.approval_status, task.software_a_task_status, approval_id) + assistant_message = self._add_message( + session_id=session_id, + role="assistant", + content=assistant_text, + message_type="task_confirm", + task_id=task.task_id, + ) + return session, assistant_message, { + "task_id": task.task_id, + "task_status": task.task_status, + "approval_status": task.approval_status, + } + + def _build_parse_reply(self, parsed_intent: dict, missing_slots: list[str], risk_level: str, next_action: str) -> str: + if missing_slots: + return f"我已解析任务,但还缺少字段:{', '.join(missing_slots)}。请补充后再继续。" + return ( + "我已解析任务:" + f"动作={parsed_intent.get('action_type')}," + f"应用={parsed_intent.get('app_code')}," + f"环境={parsed_intent.get('env')}," + f"版本={parsed_intent.get('version')}。" + f" 风险等级={risk_level},下一步={next_action}。" + ) + + def _build_confirm_reply(self, task_status: str, approval_status: str, software_a_task_status: str | None, approval_id: str | None) -> str: + if approval_status == "PENDING" and approval_id: + return f"任务已确认,当前进入审批阶段。approval_id={approval_id}" + return f"任务已确认并进入执行。task_status={task_status},software_a_task_status={software_a_task_status}" + + def _add_message(self, session_id: str, role: str, content: str, message_type: str, task_id: str | None) -> ChatMessage: + message = ChatMessage( + message_id=f"msg-{uuid4().hex[:12]}", + session_id=session_id, + role=role, + content=content, + message_type=message_type, + task_id=task_id, + created_at=format_now(self.timezone_name), + ) + return self.message_repository.add(message) + + @staticmethod + def to_message_item(message: ChatMessage) -> ChatMessageItem: + return ChatMessageItem( + message_id=message.message_id, + role=message.role, + content=message.content, + message_type=message.message_type, + task_id=message.task_id, + created_at=message.created_at, + ) diff --git a/backend/app/services/edge_service.py b/backend/app/services/edge_service.py index f4cddbd..bba8987 100644 --- a/backend/app/services/edge_service.py +++ b/backend/app/services/edge_service.py @@ -1,4 +1,4 @@ -from __future__ import annotations +from __future__ import annotations import json from uuid import uuid4 @@ -20,14 +20,15 @@ from app.core.constants import ( TASK_STATUS_VERIFYING, ) from app.core.time import compute_duration_ms, format_now +from app.models.audit_log import AuditLog from app.models.edge_node import EdgeNode from app.models.edge_task import EdgeTask -from app.models.audit_log import AuditLog from app.models.tool_call import ToolCall from app.repositories.audit_repository import AuditRepository from app.repositories.edge_repository import EdgeNodeRepository, EdgeTaskRepository from app.repositories.task_repository import TaskRepository from app.repositories.tool_call_repository import ToolCallRepository +from app.services.metadata_service import MetadataService class EdgeTaskConflictError(Exception): @@ -74,54 +75,50 @@ class EdgeService: ) return self.node_repository.add_or_update(node) - def schedule_default_verification(self, task_id: str, edge_id: str = "edge-shanghai-001") -> EdgeTask: + def schedule_default_verification(self, task_id: str, edge_id: str = "edge-shanghai-001") -> list[EdgeTask]: task = self.task_repository.get_by_task_id(task_id) if not task: raise EdgeTaskNotFoundError() if task.task_status != TASK_STATUS_RUNNING: - raise EdgeTaskConflictError("当前任务状态不允许创建 edge 验证步骤。") + raise EdgeTaskConflictError("current task status does not allow scheduling edge verification steps") if self.edge_task_repository.list_active_by_task_id(task_id): - raise EdgeTaskConflictError("当前任务已存在待处理的 edge 验证步骤。") + raise EdgeTaskConflictError("task already has active edge verification steps") current_time = format_now(self.timezone_name) - step_id = f"step-{uuid4().hex[:12]}" - edge_task = EdgeTask( - edge_task_id=f"edge-task-{uuid4().hex[:12]}", - step_id=step_id, - task_id=task_id, - edge_id=edge_id, - tool_name="http_health_check", - params_json=json.dumps( - { - "url": f"http://{task.app_code or 'localhost'}.{task.env or 'env'}.demo/actuator/health", - "timeout_ms": 3000, - }, - ensure_ascii=False, - ), - step_status=EDGE_STEP_STATUS_PENDING, - success=None, - message=None, - result_data_json="{}", - evidence_json="{}", - duration_ms=None, - expire_at=current_time, - started_at=None, - finished_at=None, - created_at=current_time, - updated_at=current_time, - ) - created_edge_task = self.edge_task_repository.add(edge_task) - self._write_audit_log( - task_id=task_id, - request_id=None, - action="EDGE_TASK_SCHEDULED", - result="PENDING", - target=edge_id, - operator_user_id=None, - operator_user_name=None, - detail={"step_id": created_edge_task.step_id, "tool_name": created_edge_task.tool_name}, - ) - return created_edge_task + created_items: list[EdgeTask] = [] + for tool_name, params in self._build_default_verification_steps(task): + edge_task = EdgeTask( + edge_task_id=f"edge-task-{uuid4().hex[:12]}", + step_id=f"step-{uuid4().hex[:12]}", + task_id=task_id, + edge_id=edge_id, + tool_name=tool_name, + params_json=json.dumps(params, ensure_ascii=False), + step_status=EDGE_STEP_STATUS_PENDING, + success=None, + message=None, + result_data_json="{}", + evidence_json="{}", + duration_ms=None, + expire_at=current_time, + started_at=None, + finished_at=None, + created_at=current_time, + updated_at=current_time, + ) + created_item = self.edge_task_repository.add(edge_task) + created_items.append(created_item) + self._write_audit_log( + task_id=task_id, + request_id=None, + action="EDGE_TASK_SCHEDULED", + result="PENDING", + target=edge_id, + operator_user_id=None, + operator_user_name=None, + detail={"step_id": created_item.step_id, "tool_name": created_item.tool_name}, + ) + return created_items def pull_tasks(self, edge_id: str, max_tasks: int) -> list[EdgeTask]: items = self.edge_task_repository.list_pending_by_edge_id(edge_id)[:max_tasks] @@ -129,7 +126,7 @@ class EdgeService: pulled_items: list[EdgeTask] = [] for item in items: task = self.task_repository.get_by_task_id(item.task_id) - if not task or task.task_status != TASK_STATUS_RUNNING: + if not task or task.task_status not in {TASK_STATUS_RUNNING, TASK_STATUS_VERIFYING}: item.step_status = EDGE_STEP_STATUS_CANCELLED item.updated_at = current_time item.message = "task state no longer allows edge execution" @@ -143,25 +140,35 @@ class EdgeService: task.task_status = TASK_STATUS_VERIFYING task.updated_at = current_time - task.summary = "任务已进入边缘验证阶段。" + task.summary = "task entered edge verification stage" self.task_repository.update(task) pulled_items.append(item) return pulled_items - def report_task(self, edge_id: str, step_id: str, success: bool, message: str, data: dict, evidence: dict, started_at: str, finished_at: str) -> tuple[EdgeTask, str]: + def report_task( + self, + edge_id: str, + step_id: str, + success: bool, + message: str, + data: dict, + evidence: dict, + started_at: str, + finished_at: str, + ) -> tuple[EdgeTask, str]: edge_task = self.edge_task_repository.get_by_step_id(step_id) if not edge_task: raise EdgeTaskNotFoundError() if edge_task.edge_id != edge_id: - raise EdgeTaskConflictError("edge_id 与任务归属不一致。") + raise EdgeTaskConflictError("edge_id does not match the assigned edge task") if edge_task.step_status not in {EDGE_STEP_STATUS_RUNNING, EDGE_STEP_STATUS_PENDING}: - raise EdgeTaskConflictError("当前步骤状态不允许重复回传。") + raise EdgeTaskConflictError("edge step state does not allow duplicate report") task = self.task_repository.get_by_task_id(edge_task.task_id) if not task: - raise EdgeTaskConflictError("edge 步骤关联任务不存在。") + raise EdgeTaskConflictError("edge task references a non-existent task") if task.task_status not in {TASK_STATUS_RUNNING, TASK_STATUS_VERIFYING}: - raise EdgeTaskConflictError("当前任务状态不允许回传 edge 结果。") + raise EdgeTaskConflictError("task state does not allow edge report") edge_task.step_status = EDGE_STEP_STATUS_SUCCEEDED if success else EDGE_STEP_STATUS_FAILED edge_task.success = 1 if success else 0 @@ -187,12 +194,25 @@ class EdgeService: finished_at=finished_at, ) - task_status = TASK_STATUS_RUNNING - task.task_status = TASK_STATUS_SUCCEEDED if success else TASK_STATUS_FAILED + all_steps = self.edge_task_repository.list_by_task_id(task.task_id) + if not success: + for item in all_steps: + if item.step_status in {EDGE_STEP_STATUS_PENDING, EDGE_STEP_STATUS_RUNNING} and item.step_id != updated_edge_task.step_id: + item.step_status = EDGE_STEP_STATUS_CANCELLED + item.updated_at = format_now(self.timezone_name) + item.message = "cancelled because another verification step failed" + self.edge_task_repository.update(item) + task.task_status = TASK_STATUS_FAILED + task.summary = "edge verification failed" + elif all(item.step_status == EDGE_STEP_STATUS_SUCCEEDED for item in all_steps): + task.task_status = TASK_STATUS_SUCCEEDED + task.summary = "all edge verification steps succeeded" + else: + task.task_status = TASK_STATUS_VERIFYING + task.summary = "edge verification is still in progress" + task.updated_at = format_now(self.timezone_name) - task.summary = "边缘验证通过,任务完成。" if success else "边缘验证失败,任务失败。" self.task_repository.update(task) - task_status = task.task_status self._write_audit_log( task_id=task.task_id, request_id=None, @@ -203,8 +223,7 @@ class EdgeService: operator_user_name=edge_id, detail={"step_id": step_id, "tool_name": edge_task.tool_name, "message": message}, ) - - return updated_edge_task, task_status + return updated_edge_task, task.task_status def record_event(self, edge_id: str, event_type: str, message: str, detail: dict) -> AuditLog: current_time = format_now(self.timezone_name) @@ -224,6 +243,65 @@ class EdgeService: self.db.refresh(audit) return audit + def _build_default_verification_steps(self, task) -> list[tuple[str, dict]]: + app_code = task.app_code or "demo-app" + confirmed_at = task.confirmed_at or format_now(self.timezone_name) + metadata = MetadataService(self.db, self.timezone_name).get_app_metadata(task.app_code, task.env) + host = "127.0.0.1" + port = metadata.listen_port if metadata and metadata.listen_port else 8080 + command_contains = metadata.command_contains if metadata and metadata.command_contains else app_code + health_check_url = ( + metadata.health_check_url + if metadata and metadata.health_check_url + else f"http://{app_code}.{task.env or 'env'}.demo/actuator/health" + ) + log_path = metadata.log_path if metadata and metadata.log_path else f"logs/{app_code}.log" + startup_keyword = metadata.startup_keyword if metadata and metadata.startup_keyword else "Started" + process_name = metadata.process_name if metadata and metadata.process_name else "java" + return [ + ( + "check_process", + { + "process_name": process_name, + "command_contains": command_contains, + }, + ), + ( + "check_port", + { + "host": host, + "port": port, + "timeout_ms": 3000, + }, + ), + ( + "tcp_probe", + { + "host": host, + "port": port, + "timeout_ms": 3000, + }, + ), + ( + "http_health_check", + { + "url": health_check_url, + "timeout_ms": 3000, + "expected_status": 200, + "body_contains": "UP", + }, + ), + ( + "grep_log", + { + "path": log_path, + "keyword": startup_keyword, + "start_at": confirmed_at, + "limit": 20, + }, + ), + ] + def _write_tool_call( self, task_id: str, @@ -278,4 +356,4 @@ class EdgeService: detail_json=json.dumps(detail, ensure_ascii=False), timestamp=format_now(self.timezone_name), ) - return self.audit_repository.add(audit_log) + return self.audit_repository.add(audit_log) diff --git a/backend/app/services/metadata_service.py b/backend/app/services/metadata_service.py new file mode 100644 index 0000000..03324c2 --- /dev/null +++ b/backend/app/services/metadata_service.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from uuid import uuid4 + +from sqlalchemy.orm import Session + +from app.core.time import format_now +from app.models.app_metadata import AppMetadata +from app.repositories.metadata_repository import AppMetadataRepository + + +class MetadataService: + DEMO_ITEMS = [ + { + "app_code": "order-service", + "env": "test", + "process_name": "java", + "command_contains": "order-service", + "health_check_url": "http://order-service.test.demo/actuator/health", + "log_path": "logs/order-service.log", + "listen_port": 8080, + "startup_keyword": "Started order-service", + }, + { + "app_code": "order-service", + "env": "prod", + "process_name": "java", + "command_contains": "order-service", + "health_check_url": "http://order-service.prod.demo/actuator/health", + "log_path": "logs/order-service.log", + "listen_port": 8080, + "startup_keyword": "Started order-service", + }, + { + "app_code": "payment-service", + "env": "test", + "process_name": "java", + "command_contains": "payment-service", + "health_check_url": "http://payment-service.test.demo/actuator/health", + "log_path": "logs/payment-service.log", + "listen_port": 8081, + "startup_keyword": "Started payment-service", + }, + ] + + def __init__(self, db: Session, timezone_name: str) -> None: + self.db = db + self.timezone_name = timezone_name + self.repository = AppMetadataRepository(db) + + def ensure_demo_metadata(self) -> None: + for item in self.DEMO_ITEMS: + existing = self.repository.get_by_app_env(item["app_code"], item["env"]) + if existing: + continue + current_time = format_now(self.timezone_name) + self.repository.add( + AppMetadata( + app_metadata_id=f"app-meta-{uuid4().hex[:12]}", + app_code=item["app_code"], + env=item["env"], + process_name=item["process_name"], + command_contains=item["command_contains"], + health_check_url=item["health_check_url"], + log_path=item["log_path"], + listen_port=item["listen_port"], + startup_keyword=item["startup_keyword"], + created_at=current_time, + updated_at=current_time, + ) + ) + + def get_app_metadata(self, app_code: str | None, env: str | None) -> AppMetadata | None: + if not app_code or not env: + return None + return self.repository.get_by_app_env(app_code, env) diff --git a/backend/app/web/chat_demo.html b/backend/app/web/chat_demo.html new file mode 100644 index 0000000..a7d8e49 --- /dev/null +++ b/backend/app/web/chat_demo.html @@ -0,0 +1,406 @@ + + +
+ + +