From ce299cbb1809dea10c48dce367e57171197bdd06 Mon Sep 17 00:00:00 2001 From: 2521690 Date: Thu, 9 Apr 2026 14:10:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=20Agent=20=E6=BC=94?= =?UTF-8?q?=E7=A4=BA=E5=85=A5=E5=8F=A3=E4=B8=8E=20app=5Fmetadata=20?= =?UTF-8?q?=E9=A9=B1=E5=8A=A8=E9=AA=8C=E8=AF=81=E9=93=BE=E8=B7=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 app_metadata 模型、仓储与服务 - 将默认 edge 验证步骤改为由 app_metadata 驱动生成 - 新增 chat_session / chat_message 会话层模型与 chat service - 新增 demo chat API,支持会话创建、消息发送、任务确认 - 新增最小 Web Demo 页面,形成聊天式演示入口 - 增强任务报告,补充 audit_summary 与更细粒度 task_metrics - 增强 edge-agent 执行器:tcp_probe、日志时间范围过滤、进程指标与更灵活健康检查 - 更新 README 与当前进度总结,MVP 进度推进到约 94% --- .gitignore | 1 + backend/README.md | 56 ++- backend/app/api/agent/tasks.py | 91 +++- backend/app/api/demo/chat.py | 155 +++++++ backend/app/api/web/demo.py | 16 + backend/app/main.py | 14 + backend/app/models/app_metadata.py | 22 + backend/app/models/chat_message.py | 18 + backend/app/models/chat_session.py | 19 + backend/app/repositories/chat_repository.py | 43 ++ .../app/repositories/metadata_repository.py | 31 ++ backend/app/schemas/chat.py | 59 +++ backend/app/schemas/metadata.py | 14 + backend/app/schemas/task.py | 16 + backend/app/services/chat_service.py | 168 ++++++++ backend/app/services/edge_service.py | 192 ++++++--- backend/app/services/metadata_service.py | 76 ++++ backend/app/web/chat_demo.html | 406 ++++++++++++++++++ backend/tests/test_chat_demo.py | 50 +++ backend/tests/test_task_api.py | 208 ++++++--- edge-agent/README.md | 26 +- edge-agent/app/executors/http_executor.py | 17 +- edge-agent/app/executors/log_executor.py | 48 ++- edge-agent/app/executors/process_executor.py | 52 ++- .../app/executors/tcp_probe_executor.py | 41 ++ edge-agent/app/registry/tool_registry.py | 2 + edge-agent/scripts/package-linux.ps1 | 30 ++ edge-agent/tests/test_http_executor.py | 20 +- edge-agent/tests/test_log_executor.py | 29 ++ edge-agent/tests/test_process_executor.py | 38 +- edge-agent/tests/test_tcp_probe_executor.py | 38 ++ 智能化部署agent-当前进度总结.md | 98 ++++- 32 files changed, 1914 insertions(+), 180 deletions(-) create mode 100644 backend/app/api/demo/chat.py create mode 100644 backend/app/api/web/demo.py create mode 100644 backend/app/models/app_metadata.py create mode 100644 backend/app/models/chat_message.py create mode 100644 backend/app/models/chat_session.py create mode 100644 backend/app/repositories/chat_repository.py create mode 100644 backend/app/repositories/metadata_repository.py create mode 100644 backend/app/schemas/chat.py create mode 100644 backend/app/schemas/metadata.py create mode 100644 backend/app/services/chat_service.py create mode 100644 backend/app/services/metadata_service.py create mode 100644 backend/app/web/chat_demo.html create mode 100644 backend/tests/test_chat_demo.py create mode 100644 edge-agent/app/executors/tcp_probe_executor.py create mode 100644 edge-agent/scripts/package-linux.ps1 create mode 100644 edge-agent/tests/test_tcp_probe_executor.py diff --git a/.gitignore b/.gitignore index e9e06e8..a04d282 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .venv/ data/ dist/ +tmp-linux-runtime/ __pycache__/ .pytest_cache/ *.pyc diff --git a/backend/README.md b/backend/README.md index 406dab3..4e011d4 100644 --- a/backend/README.md +++ b/backend/README.md @@ -53,21 +53,29 @@ Current backend includes: `POST /api/agent/tasks/{task_id}/cancel` `GET /api/agent/tasks/{task_id}` `GET /api/agent/tasks/{task_id}/report` -2. demo identity +2. demo chat + `POST /api/demo/chat/sessions` + `GET /api/demo/chat/sessions/{session_id}` + `POST /api/demo/chat/sessions/{session_id}/messages` + `POST /api/demo/chat/sessions/{session_id}/tasks/{task_id}/confirm` +3. demo web + `GET /` + `GET /demo/chat` +4. demo identity `POST /api/demo/identity/login` `GET /api/demo/identity/me` `GET /api/demo/identity/users/{user_id}/permissions` `POST /api/demo/identity/token/introspect` -3. demo approval +5. demo approval `POST /api/demo/approval/requests` `GET /api/demo/approval/requests/{approval_id}` `POST /api/demo/approval/requests/{approval_id}/decision` `GET /api/demo/approval/requests` -4. software-a minimal implementation +6. software-a minimal implementation `POST /api/demo/software-a/deploy-tasks` `GET /api/demo/software-a/deploy-tasks/{software_a_task_id}` `POST /api/demo/software-a/permissions/check` -5. edge +7. edge `POST /api/agent/edge/heartbeat` `POST /api/agent/edge/tasks/pull` `POST /api/agent/edge/tasks/report` @@ -75,15 +83,22 @@ Current backend includes: Current execution flow: -1. create task -2. confirm task -3. high-risk task enters approval flow -4. check `software-a` minimal implementation permission -5. create `software-a` minimal implementation deploy task -6. create default edge verification step -7. edge pulls and reports verification result -8. task reaches `SUCCEEDED` / `FAILED` / `CANCELLED` -9. task detail/report returns software-a status, approval trace, tool trace, verification trace and audit trace +1. create chat session or open web demo +2. send one natural-language message +3. create task +4. confirm task +5. high-risk task enters approval flow +6. check `software-a` minimal implementation permission +7. create `software-a` minimal implementation deploy task +8. build metadata-driven multi-step edge verification plan: + `check_process` + `check_port` + `tcp_probe` + `http_health_check` + `grep_log` +9. edge pulls and reports verification results +10. task reaches `SUCCEEDED` / `FAILED` / `CANCELLED` +11. task detail/report returns software-a status, approval trace, tool trace, verification trace and audit trace Current execution metrics: @@ -95,13 +110,19 @@ Current execution metrics: `confirm_wait_duration_ms` `approval_duration_ms` `execution_duration_ms` + `software_a_duration_ms_total` `tool_call_duration_ms_total` `verification_duration_ms_total` + `verification_queue_wait_duration_ms_total` + `verification_end_to_end_duration_ms_total` +5. `task_report.audit_summary` returns audit result counts, action types and operator summary Current result summary capabilities: 1. task detail/report returns `result_summary_detail` 2. summary includes final status, final reason, software-a result, approval result and verification result +3. demo chat API returns assistant-style parse/confirm messages +4. demo web page provides a visual conversation -> confirm -> execute -> report flow Demo failure semantics currently include: @@ -122,12 +143,13 @@ Automated tests currently cover: 7. cancel running task Current baseline: `20 passed` +Current baseline: `23 passed` ## Next Focus Recommended next implementation steps: -1. continue enriching audit details and task-level metric breakdown -2. continue implementing local edge-agent executors beyond `http_health_check` -3. add packaging/bootstrap scripts for portable edge-agent delivery -4. then continue with second-batch OpenAPI +1. continue enriching app-metadata-driven verification templates +2. connect a real Java sample app to the current demo flow +3. validate native Linux packaging in a real bash/Linux environment +4. then continue with second-batch OpenAPI and UI polish diff --git a/backend/app/api/agent/tasks.py b/backend/app/api/agent/tasks.py index 5f808d8..58c5dfd 100644 --- a/backend/app/api/agent/tasks.py +++ b/backend/app/api/agent/tasks.py @@ -18,6 +18,7 @@ from app.repositories.tool_call_repository import ToolCallRepository from app.adapters.software_a.minimal_adapter import MinimalSoftwareAAdapter from app.schemas.common import ApiResponse from app.schemas.task import ( + AuditSummary, ApprovalSummary, ApprovalTraceItem, AuditTraceItem, @@ -53,8 +54,12 @@ def build_result_summary_detail(task, approval, software_a_detail: dict | None, final_reason = task.summary if software_a_detail and software_a_detail.get("error_detail"): final_reason = software_a_detail["error_detail"] - elif latest_edge_task and latest_edge_task.message: - final_reason = latest_edge_task.message + elif edge_tasks: + failed_message = next((item.message for item in edge_tasks if item.step_status == "FAILED" and item.message), None) + if failed_message: + final_reason = failed_message + elif latest_edge_task and latest_edge_task.message: + final_reason = latest_edge_task.message elif approval and approval.approval_status == "REJECTED" and approval.reason: final_reason = approval.reason @@ -79,12 +84,23 @@ def build_result_summary_detail(task, approval, software_a_detail: dict | None, verification_summary = None if latest_edge_task: + verification_success_values = [bool(item.success) for item in edge_tasks if item.success is not None] + verification_success = None if not verification_success_values else all(verification_success_values) + if any(item.step_status == "FAILED" for item in edge_tasks): + verification_status = "FAILED" + elif all(item.step_status == "SUCCEEDED" for item in edge_tasks): + verification_status = "SUCCEEDED" + elif any(item.step_status == "RUNNING" for item in edge_tasks): + verification_status = "RUNNING" + else: + verification_status = latest_edge_task.step_status + verification_message = next((item.message for item in edge_tasks if item.step_status == "FAILED" and item.message), latest_edge_task.message) verification_summary = VerificationResultSummary( - step_id=latest_edge_task.step_id, - step_status=latest_edge_task.step_status, - success=None if latest_edge_task.success is None else bool(latest_edge_task.success), - duration_ms=latest_edge_task.duration_ms, - message=latest_edge_task.message, + step_id=latest_edge_task.step_id if len(edge_tasks) == 1 else None, + step_status=verification_status, + success=verification_success, + duration_ms=sum_duration_ms([item.duration_ms for item in edge_tasks]), + message=verification_message, ) return ResultSummaryDetail( @@ -110,6 +126,9 @@ def sum_duration_ms(values: list[int | None]) -> int: def build_task_metrics(task, approval, software_a_detail: dict | None, tool_calls, edge_tasks, audit_logs) -> TaskMetrics: tool_call_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls]) verification_duration_ms_total = sum_duration_ms([item.duration_ms for item in edge_tasks]) + software_a_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls if item.tool_name.startswith("software_a")]) + verification_queue_wait_duration_ms_total = sum_duration_ms([compute_duration_ms(item.created_at, item.started_at) for item in edge_tasks]) + verification_end_to_end_duration_ms_total = sum_duration_ms([compute_duration_ms(item.created_at, item.finished_at) for item in edge_tasks]) tool_call_count = len(tool_calls) tool_call_success_count = sum(1 for item in tool_calls if bool(item.success)) @@ -118,6 +137,7 @@ def build_task_metrics(task, approval, software_a_detail: dict | None, tool_call verification_step_count = len(edge_tasks) verification_success_count = sum(1 for item in edge_tasks if item.success == 1) verification_failed_count = sum(1 for item in edge_tasks if item.success == 0) + audit_failure_count = sum(1 for item in audit_logs if item.result in {"FAILED", "REJECTED"}) latest_observed_at = pick_latest_timestamp( task.updated_at, @@ -156,8 +176,11 @@ def build_task_metrics(task, approval, software_a_detail: dict | None, tool_call confirm_wait_duration_ms=confirm_wait_duration_ms, approval_duration_ms=approval_duration_ms, execution_duration_ms=execution_duration_ms, + software_a_duration_ms_total=software_a_duration_ms_total, tool_call_duration_ms_total=tool_call_duration_ms_total, verification_duration_ms_total=verification_duration_ms_total, + verification_queue_wait_duration_ms_total=verification_queue_wait_duration_ms_total, + verification_end_to_end_duration_ms_total=verification_end_to_end_duration_ms_total, tool_call_count=tool_call_count, tool_call_success_count=tool_call_success_count, tool_call_failed_count=tool_call_failed_count, @@ -165,9 +188,51 @@ def build_task_metrics(task, approval, software_a_detail: dict | None, tool_call verification_success_count=verification_success_count, verification_failed_count=verification_failed_count, audit_event_count=len(audit_logs), + audit_failure_count=audit_failure_count, ) +def build_audit_summary(audit_logs) -> AuditSummary: + result_counts: dict[str, int] = {} + action_types = sorted({item.action for item in audit_logs}) + operator_user_names = sorted({item.operator_user_name for item in audit_logs if item.operator_user_name}) + + for item in audit_logs: + result_counts[item.result] = result_counts.get(item.result, 0) + 1 + + return AuditSummary( + audit_event_count=len(audit_logs), + failure_count=sum(1 for item in audit_logs if item.result in {"FAILED", "REJECTED"}), + pending_count=sum(1 for item in audit_logs if item.result == "PENDING"), + cancelled_count=sum(1 for item in audit_logs if item.result == "CANCELLED"), + reported_count=sum(1 for item in audit_logs if item.result == "REPORTED"), + action_types=action_types, + operator_user_names=operator_user_names, + result_counts=result_counts, + ) + + +def build_verification_result(edge_tasks) -> dict | None: + if not edge_tasks: + return None + + def latest_success(tool_name: str) -> bool | None: + for item in edge_tasks: + if item.tool_name == tool_name and item.success is not None: + return bool(item.success) + return None + + grep_success = latest_success("grep_log") + port_related = [latest_success(name) for name in ("check_port", "tcp_probe")] + port_values = [value for value in port_related if value is not None] + return { + "http_ok": latest_success("http_health_check"), + "process_ok": latest_success("check_process"), + "port_ok": all(port_values) if port_values else None, + "log_error_count": 0 if grep_success is True else (1 if grep_success is False else None), + } + + @router.post("", response_model=ApiResponse[CreateTaskData]) def create_task( payload: CreateTaskRequest, @@ -320,16 +385,7 @@ def get_task( software_a_detail = None if task.software_a_task_id: software_a_detail = MinimalSoftwareAAdapter(settings.default_timezone).get_deploy_task(task.software_a_task_id) - verification_result = None - if edge_tasks: - latest_edge_task = edge_tasks[0] - if latest_edge_task.success is not None: - verification_result = { - "http_ok": bool(latest_edge_task.success), - "process_ok": None, - "port_ok": None, - "log_error_count": 0 if latest_edge_task.success else 1, - } + verification_result = build_verification_result(edge_tasks) return ApiResponse[TaskDetailData]( request_id=request_id, @@ -469,6 +525,7 @@ def get_task_report( tool_trace=tool_trace, verification_trace=verification_trace, task_metrics=build_task_metrics(task, approval, software_a_detail, tool_calls, edge_tasks, audit_logs), + audit_summary=build_audit_summary(audit_logs), result_summary=task.summary, result_summary_detail=build_result_summary_detail(task, approval, software_a_detail, edge_tasks), audit_trace=audit_trace, diff --git a/backend/app/api/demo/chat.py b/backend/app/api/demo/chat.py new file mode 100644 index 0000000..e6abec8 --- /dev/null +++ b/backend/app/api/demo/chat.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +from typing import Annotated +from uuid import uuid4 + +from fastapi import APIRouter, Depends, Header, HTTPException, status +from sqlalchemy.orm import Session + +from app.core.config import get_settings +from app.core.constants import ERROR_CODE_OK +from app.core.time import format_now +from app.db.session import get_db +from app.schemas.chat import ( + ChatConfirmTaskData, + ChatConfirmTaskRequest, + ChatSendMessageData, + ChatSendMessageRequest, + ChatSessionCreateRequest, + ChatSessionData, +) +from app.schemas.common import ApiResponse +from app.services.chat_service import ChatService, ChatSessionNotFoundError + +router = APIRouter(prefix="/api/demo/chat", tags=["demo-chat"]) + + +def build_request_id(header_value: str | None) -> str: + return header_value or f"req-{uuid4().hex[:12]}" + + +@router.post("/sessions", response_model=ApiResponse[ChatSessionData]) +def create_session( + payload: ChatSessionCreateRequest, + db: Annotated[Session, Depends(get_db)], + x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, +) -> ApiResponse[ChatSessionData]: + settings = get_settings() + request_id = build_request_id(x_request_id) + service = ChatService(db, settings.default_timezone) + session = service.create_session(payload.tenant_id, payload.channel) + messages = [service.to_message_item(item) for item in service.list_messages(session.session_id)] + return ApiResponse[ChatSessionData]( + request_id=request_id, + success=True, + code=ERROR_CODE_OK, + message="success", + data=ChatSessionData( + session_id=session.session_id, + tenant_id=session.tenant_id, + channel=session.channel, + title=session.title, + last_task_id=session.last_task_id, + sample_prompts=ChatService.SAMPLE_PROMPTS, + messages=messages, + ), + timestamp=format_now(settings.default_timezone), + ) + + +@router.get("/sessions/{session_id}", response_model=ApiResponse[ChatSessionData]) +def get_session( + session_id: str, + db: Annotated[Session, Depends(get_db)], + x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, +) -> ApiResponse[ChatSessionData]: + settings = get_settings() + request_id = build_request_id(x_request_id) + service = ChatService(db, settings.default_timezone) + try: + session = service.get_session(session_id) + except ChatSessionNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": "NOT_FOUND", "message": "chat session not found"}) from exc + messages = [service.to_message_item(item) for item in service.list_messages(session.session_id)] + return ApiResponse[ChatSessionData]( + request_id=request_id, + success=True, + code=ERROR_CODE_OK, + message="success", + data=ChatSessionData( + session_id=session.session_id, + tenant_id=session.tenant_id, + channel=session.channel, + title=session.title, + last_task_id=session.last_task_id, + sample_prompts=ChatService.SAMPLE_PROMPTS, + messages=messages, + ), + timestamp=format_now(settings.default_timezone), + ) + + +@router.post("/sessions/{session_id}/messages", response_model=ApiResponse[ChatSendMessageData]) +def send_message( + session_id: str, + payload: ChatSendMessageRequest, + db: Annotated[Session, Depends(get_db)], + x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, +) -> ApiResponse[ChatSendMessageData]: + settings = get_settings() + request_id = build_request_id(x_request_id) + service = ChatService(db, settings.default_timezone) + try: + _, assistant_message, task_data = service.handle_user_message(session_id, payload.content, payload.context) + except ChatSessionNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": "NOT_FOUND", "message": "chat session not found"}) from exc + + return ApiResponse[ChatSendMessageData]( + request_id=request_id, + success=True, + code=ERROR_CODE_OK, + message="success", + data=ChatSendMessageData( + session_id=session_id, + task_id=task_data["task_id"], + task_status=task_data["task_status"], + parsed_intent=task_data["parsed_intent"], + missing_slots=task_data["missing_slots"], + risk_level=task_data["risk_level"], + next_action=task_data["next_action"], + assistant_message=service.to_message_item(assistant_message), + ), + timestamp=format_now(settings.default_timezone), + ) + + +@router.post("/sessions/{session_id}/tasks/{task_id}/confirm", response_model=ApiResponse[ChatConfirmTaskData]) +def confirm_task_from_chat( + session_id: str, + task_id: str, + payload: ChatConfirmTaskRequest, + db: Annotated[Session, Depends(get_db)], + x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, +) -> ApiResponse[ChatConfirmTaskData]: + settings = get_settings() + request_id = build_request_id(x_request_id) + service = ChatService(db, settings.default_timezone) + try: + _, assistant_message, task_data = service.confirm_task(session_id, task_id, payload.comment) + except ChatSessionNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": "NOT_FOUND", "message": "chat session not found"}) from exc + + return ApiResponse[ChatConfirmTaskData]( + request_id=request_id, + success=True, + code=ERROR_CODE_OK, + message="success", + data=ChatConfirmTaskData( + session_id=session_id, + task_id=task_data["task_id"], + task_status=task_data["task_status"], + approval_status=task_data["approval_status"], + assistant_message=service.to_message_item(assistant_message), + ), + timestamp=format_now(settings.default_timezone), + ) diff --git a/backend/app/api/web/demo.py b/backend/app/api/web/demo.py new file mode 100644 index 0000000..9e3c5ab --- /dev/null +++ b/backend/app/api/web/demo.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from pathlib import Path + +from fastapi import APIRouter +from fastapi.responses import HTMLResponse + + +router = APIRouter(tags=["demo-web"]) + + +@router.get("/", response_class=HTMLResponse) +@router.get("/demo/chat", response_class=HTMLResponse) +def demo_chat_page() -> HTMLResponse: + html_path = Path(__file__).resolve().parents[2] / "web" / "chat_demo.html" + return HTMLResponse(html_path.read_text(encoding="utf-8")) diff --git a/backend/app/main.py b/backend/app/main.py index 2ebbac5..b797a66 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -7,19 +7,26 @@ from fastapi.responses import JSONResponse from app.api.agent.tasks import router as task_router from app.api.demo.approval import router as demo_approval_router +from app.api.demo.chat import router as demo_chat_router from app.api.demo.identity import router as demo_identity_router from app.api.demo.software_a import router as demo_software_a_router from app.api.edge.tasks import router as edge_router +from app.api.web.demo import router as demo_web_router from app.core.config import ensure_runtime_directories, get_settings from app.core.time import format_now from app.db.base import Base from app.db.session import engine +from app.db.session import SessionLocal +from app.models.app_metadata import AppMetadata from app.models.approval import ApprovalRequest from app.models.audit_log import AuditLog +from app.models.chat_message import ChatMessage +from app.models.chat_session import ChatSession from app.models.edge_node import EdgeNode from app.models.edge_task import EdgeTask from app.models.task import Task from app.models.tool_call import ToolCall +from app.services.metadata_service import MetadataService settings = get_settings() @@ -27,6 +34,11 @@ settings = get_settings() async def lifespan(_: FastAPI): ensure_runtime_directories() Base.metadata.create_all(bind=engine) + db = SessionLocal() + try: + MetadataService(db, settings.default_timezone).ensure_demo_metadata() + finally: + db.close() yield @@ -59,7 +71,9 @@ def healthz() -> dict[str, str]: app.include_router(task_router) +app.include_router(demo_chat_router) app.include_router(demo_identity_router) app.include_router(demo_approval_router) app.include_router(demo_software_a_router) app.include_router(edge_router) +app.include_router(demo_web_router) diff --git a/backend/app/models/app_metadata.py b/backend/app/models/app_metadata.py new file mode 100644 index 0000000..b4017e5 --- /dev/null +++ b/backend/app/models/app_metadata.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from sqlalchemy import Integer, Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.db.base import Base + + +class AppMetadata(Base): + __tablename__ = "app_metadata" + + app_metadata_id: Mapped[str] = mapped_column(Text, primary_key=True) + app_code: Mapped[str] = mapped_column(Text, nullable=False, index=True) + env: Mapped[str] = mapped_column(Text, nullable=False, index=True) + process_name: Mapped[str | None] = mapped_column(Text, nullable=True) + command_contains: Mapped[str | None] = mapped_column(Text, nullable=True) + health_check_url: Mapped[str | None] = mapped_column(Text, nullable=True) + log_path: Mapped[str | None] = mapped_column(Text, nullable=True) + listen_port: Mapped[int | None] = mapped_column(Integer, nullable=True) + startup_keyword: Mapped[str | None] = mapped_column(Text, nullable=True) + created_at: Mapped[str] = mapped_column(Text, nullable=False) + updated_at: Mapped[str] = mapped_column(Text, nullable=False) diff --git a/backend/app/models/chat_message.py b/backend/app/models/chat_message.py new file mode 100644 index 0000000..ac1cc4d --- /dev/null +++ b/backend/app/models/chat_message.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from sqlalchemy import Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.db.base import Base + + +class ChatMessage(Base): + __tablename__ = "chat_message" + + message_id: Mapped[str] = mapped_column(Text, primary_key=True) + session_id: Mapped[str] = mapped_column(Text, nullable=False, index=True) + role: Mapped[str] = mapped_column(Text, nullable=False) + content: Mapped[str] = mapped_column(Text, nullable=False) + message_type: Mapped[str] = mapped_column(Text, nullable=False) + task_id: Mapped[str | None] = mapped_column(Text, nullable=True, index=True) + created_at: Mapped[str] = mapped_column(Text, nullable=False) diff --git a/backend/app/models/chat_session.py b/backend/app/models/chat_session.py new file mode 100644 index 0000000..71000d4 --- /dev/null +++ b/backend/app/models/chat_session.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from sqlalchemy import Text +from sqlalchemy.orm import Mapped, mapped_column + +from app.db.base import Base + + +class ChatSession(Base): + __tablename__ = "chat_session" + + session_id: Mapped[str] = mapped_column(Text, primary_key=True) + tenant_id: Mapped[str] = mapped_column(Text, nullable=False, index=True) + channel: Mapped[str] = mapped_column(Text, nullable=False) + title: Mapped[str | None] = mapped_column(Text, nullable=True) + last_task_id: Mapped[str | None] = mapped_column(Text, nullable=True) + context_json: Mapped[str] = mapped_column(Text, nullable=False) + created_at: Mapped[str] = mapped_column(Text, nullable=False) + updated_at: Mapped[str] = mapped_column(Text, nullable=False) diff --git a/backend/app/repositories/chat_repository.py b/backend/app/repositories/chat_repository.py new file mode 100644 index 0000000..83d21c3 --- /dev/null +++ b/backend/app/repositories/chat_repository.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.models.chat_message import ChatMessage +from app.models.chat_session import ChatSession + + +class ChatSessionRepository: + def __init__(self, db: Session) -> None: + self.db = db + + def add(self, item: ChatSession) -> ChatSession: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def update(self, item: ChatSession) -> ChatSession: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def get_by_session_id(self, session_id: str) -> ChatSession | None: + statement = select(ChatSession).where(ChatSession.session_id == session_id) + return self.db.execute(statement).scalar_one_or_none() + + +class ChatMessageRepository: + def __init__(self, db: Session) -> None: + self.db = db + + def add(self, item: ChatMessage) -> ChatMessage: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def list_by_session_id(self, session_id: str) -> list[ChatMessage]: + statement = select(ChatMessage).where(ChatMessage.session_id == session_id).order_by(ChatMessage.created_at.asc()) + return list(self.db.execute(statement).scalars()) diff --git a/backend/app/repositories/metadata_repository.py b/backend/app/repositories/metadata_repository.py new file mode 100644 index 0000000..ec93843 --- /dev/null +++ b/backend/app/repositories/metadata_repository.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from app.models.app_metadata import AppMetadata + + +class AppMetadataRepository: + def __init__(self, db: Session) -> None: + self.db = db + + def add(self, item: AppMetadata) -> AppMetadata: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def update(self, item: AppMetadata) -> AppMetadata: + self.db.add(item) + self.db.commit() + self.db.refresh(item) + return item + + def get_by_app_env(self, app_code: str, env: str) -> AppMetadata | None: + statement = select(AppMetadata).where(AppMetadata.app_code == app_code).where(AppMetadata.env == env) + return self.db.execute(statement).scalar_one_or_none() + + def list_all(self) -> list[AppMetadata]: + statement = select(AppMetadata).order_by(AppMetadata.app_code.asc(), AppMetadata.env.asc()) + return list(self.db.execute(statement).scalars()) diff --git a/backend/app/schemas/chat.py b/backend/app/schemas/chat.py new file mode 100644 index 0000000..f2d57bd --- /dev/null +++ b/backend/app/schemas/chat.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + +from app.schemas.task import ParsedIntent + + +class ChatSessionCreateRequest(BaseModel): + tenant_id: str = "tenant-demo" + channel: str = "WEB" + + +class ChatMessageItem(BaseModel): + message_id: str + role: str + content: str + message_type: str + task_id: str | None = None + created_at: str + + +class ChatSessionData(BaseModel): + session_id: str + tenant_id: str + channel: str + title: str | None = None + last_task_id: str | None = None + sample_prompts: list[str] = Field(default_factory=list) + messages: list[ChatMessageItem] = Field(default_factory=list) + + +class ChatSendMessageRequest(BaseModel): + content: str + context: dict[str, Any] = Field(default_factory=dict) + + +class ChatSendMessageData(BaseModel): + session_id: str + task_id: str + task_status: str + parsed_intent: ParsedIntent + missing_slots: list[str] + risk_level: str + next_action: str + assistant_message: ChatMessageItem + + +class ChatConfirmTaskRequest(BaseModel): + comment: str | None = None + + +class ChatConfirmTaskData(BaseModel): + session_id: str + task_id: str + task_status: str + approval_status: str + assistant_message: ChatMessageItem diff --git a/backend/app/schemas/metadata.py b/backend/app/schemas/metadata.py new file mode 100644 index 0000000..1d707a2 --- /dev/null +++ b/backend/app/schemas/metadata.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from pydantic import BaseModel + + +class AppMetadataData(BaseModel): + app_code: str + env: str + process_name: str | None = None + command_contains: str | None = None + health_check_url: str | None = None + log_path: str | None = None + listen_port: int | None = None + startup_keyword: str | None = None diff --git a/backend/app/schemas/task.py b/backend/app/schemas/task.py index e4e533a..93d9dd5 100644 --- a/backend/app/schemas/task.py +++ b/backend/app/schemas/task.py @@ -165,13 +165,27 @@ class AuditTraceItem(BaseModel): timestamp: str +class AuditSummary(BaseModel): + audit_event_count: int = 0 + failure_count: int = 0 + pending_count: int = 0 + cancelled_count: int = 0 + reported_count: int = 0 + action_types: list[str] = Field(default_factory=list) + operator_user_names: list[str] = Field(default_factory=list) + result_counts: dict[str, int] = Field(default_factory=dict) + + class TaskMetrics(BaseModel): total_duration_ms: int | None = None confirm_wait_duration_ms: int | None = None approval_duration_ms: int | None = None execution_duration_ms: int | None = None + software_a_duration_ms_total: int = 0 tool_call_duration_ms_total: int = 0 verification_duration_ms_total: int = 0 + verification_queue_wait_duration_ms_total: int = 0 + verification_end_to_end_duration_ms_total: int = 0 tool_call_count: int = 0 tool_call_success_count: int = 0 tool_call_failed_count: int = 0 @@ -179,6 +193,7 @@ class TaskMetrics(BaseModel): verification_success_count: int = 0 verification_failed_count: int = 0 audit_event_count: int = 0 + audit_failure_count: int = 0 class TaskReportData(BaseModel): @@ -188,6 +203,7 @@ class TaskReportData(BaseModel): tool_trace: list[ToolTraceItem] verification_trace: list[VerificationTraceItem] task_metrics: TaskMetrics + audit_summary: AuditSummary result_summary: str | None = None result_summary_detail: ResultSummaryDetail | None = None audit_trace: list[AuditTraceItem] diff --git a/backend/app/services/chat_service.py b/backend/app/services/chat_service.py new file mode 100644 index 0000000..8312678 --- /dev/null +++ b/backend/app/services/chat_service.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import json +from uuid import uuid4 + +from sqlalchemy.orm import Session + +from app.core.time import format_now +from app.models.chat_message import ChatMessage +from app.models.chat_session import ChatSession +from app.repositories.chat_repository import ChatMessageRepository, ChatSessionRepository +from app.schemas.chat import ChatMessageItem +from app.schemas.task import ConfirmTaskRequest, CreateTaskRequest, ParsedIntent +from app.services.task_service import TaskService + + +class ChatSessionNotFoundError(Exception): + pass + + +class ChatService: + SAMPLE_PROMPTS = [ + "deploy order-service 1.2.3 to test", + "deploy payment-service 1.2.3 to test", + "deploy order-service 1.2.3 to prod", + ] + + def __init__(self, db: Session, timezone_name: str) -> None: + self.db = db + self.timezone_name = timezone_name + self.session_repository = ChatSessionRepository(db) + self.message_repository = ChatMessageRepository(db) + self.task_service = TaskService(db, timezone_name) + + def create_session(self, tenant_id: str, channel: str) -> ChatSession: + current_time = format_now(self.timezone_name) + session = ChatSession( + session_id=f"chat-{uuid4().hex[:12]}", + tenant_id=tenant_id, + channel=channel, + title="Agent Demo Session", + last_task_id=None, + context_json=json.dumps({}, ensure_ascii=False), + created_at=current_time, + updated_at=current_time, + ) + created_session = self.session_repository.add(session) + self._add_message( + session_id=created_session.session_id, + role="assistant", + content="请输入一句自然语言,例如:deploy order-service 1.2.3 to test", + message_type="welcome", + task_id=None, + ) + return created_session + + def get_session(self, session_id: str) -> ChatSession: + session = self.session_repository.get_by_session_id(session_id) + if not session: + raise ChatSessionNotFoundError() + return session + + def list_messages(self, session_id: str) -> list[ChatMessage]: + self.get_session(session_id) + return self.message_repository.list_by_session_id(session_id) + + def handle_user_message(self, session_id: str, content: str, context: dict | None = None) -> tuple[ChatSession, ChatMessage, dict]: + session = self.get_session(session_id) + self._add_message(session_id=session_id, role="user", content=content, message_type="user_input", task_id=None) + request_id = f"chat-req-{uuid4().hex[:12]}" + task = self.task_service.create_task( + CreateTaskRequest( + input_text=content, + channel=session.channel, + session_id=session.session_id, + tenant_id=session.tenant_id, + context=context or {}, + ), + request_id=request_id, + ) + session.last_task_id = task.task_id + session.updated_at = format_now(self.timezone_name) + self.session_repository.update(session) + + parsed_intent = json.loads(task.parsed_intent_json) + missing_slots = json.loads(task.missing_slots_json) + next_action = "CONFIRM_TASK" if not missing_slots else "FILL_MISSING_SLOTS" + assistant_text = self._build_parse_reply(parsed_intent, missing_slots, task.risk_level, next_action) + assistant_message = self._add_message( + session_id=session_id, + role="assistant", + content=assistant_text, + message_type="task_parse", + task_id=task.task_id, + ) + return session, assistant_message, { + "task_id": task.task_id, + "task_status": task.task_status, + "parsed_intent": ParsedIntent(**parsed_intent), + "missing_slots": missing_slots, + "risk_level": task.risk_level, + "next_action": next_action, + } + + def confirm_task(self, session_id: str, task_id: str, comment: str | None) -> tuple[ChatSession, ChatMessage, dict]: + session = self.get_session(session_id) + task, approval_id = self.task_service.confirm_task( + task_id, + ConfirmTaskRequest(confirmed=True, comment=comment), + request_id=f"chat-confirm-{uuid4().hex[:12]}", + ) + session.last_task_id = task.task_id + session.updated_at = format_now(self.timezone_name) + self.session_repository.update(session) + + assistant_text = self._build_confirm_reply(task.task_status, task.approval_status, task.software_a_task_status, approval_id) + assistant_message = self._add_message( + session_id=session_id, + role="assistant", + content=assistant_text, + message_type="task_confirm", + task_id=task.task_id, + ) + return session, assistant_message, { + "task_id": task.task_id, + "task_status": task.task_status, + "approval_status": task.approval_status, + } + + def _build_parse_reply(self, parsed_intent: dict, missing_slots: list[str], risk_level: str, next_action: str) -> str: + if missing_slots: + return f"我已解析任务,但还缺少字段:{', '.join(missing_slots)}。请补充后再继续。" + return ( + "我已解析任务:" + f"动作={parsed_intent.get('action_type')}," + f"应用={parsed_intent.get('app_code')}," + f"环境={parsed_intent.get('env')}," + f"版本={parsed_intent.get('version')}。" + f" 风险等级={risk_level},下一步={next_action}。" + ) + + def _build_confirm_reply(self, task_status: str, approval_status: str, software_a_task_status: str | None, approval_id: str | None) -> str: + if approval_status == "PENDING" and approval_id: + return f"任务已确认,当前进入审批阶段。approval_id={approval_id}" + return f"任务已确认并进入执行。task_status={task_status},software_a_task_status={software_a_task_status}" + + def _add_message(self, session_id: str, role: str, content: str, message_type: str, task_id: str | None) -> ChatMessage: + message = ChatMessage( + message_id=f"msg-{uuid4().hex[:12]}", + session_id=session_id, + role=role, + content=content, + message_type=message_type, + task_id=task_id, + created_at=format_now(self.timezone_name), + ) + return self.message_repository.add(message) + + @staticmethod + def to_message_item(message: ChatMessage) -> ChatMessageItem: + return ChatMessageItem( + message_id=message.message_id, + role=message.role, + content=message.content, + message_type=message.message_type, + task_id=message.task_id, + created_at=message.created_at, + ) diff --git a/backend/app/services/edge_service.py b/backend/app/services/edge_service.py index f4cddbd..bba8987 100644 --- a/backend/app/services/edge_service.py +++ b/backend/app/services/edge_service.py @@ -1,4 +1,4 @@ -from __future__ import annotations +from __future__ import annotations import json from uuid import uuid4 @@ -20,14 +20,15 @@ from app.core.constants import ( TASK_STATUS_VERIFYING, ) from app.core.time import compute_duration_ms, format_now +from app.models.audit_log import AuditLog from app.models.edge_node import EdgeNode from app.models.edge_task import EdgeTask -from app.models.audit_log import AuditLog from app.models.tool_call import ToolCall from app.repositories.audit_repository import AuditRepository from app.repositories.edge_repository import EdgeNodeRepository, EdgeTaskRepository from app.repositories.task_repository import TaskRepository from app.repositories.tool_call_repository import ToolCallRepository +from app.services.metadata_service import MetadataService class EdgeTaskConflictError(Exception): @@ -74,54 +75,50 @@ class EdgeService: ) return self.node_repository.add_or_update(node) - def schedule_default_verification(self, task_id: str, edge_id: str = "edge-shanghai-001") -> EdgeTask: + def schedule_default_verification(self, task_id: str, edge_id: str = "edge-shanghai-001") -> list[EdgeTask]: task = self.task_repository.get_by_task_id(task_id) if not task: raise EdgeTaskNotFoundError() if task.task_status != TASK_STATUS_RUNNING: - raise EdgeTaskConflictError("当前任务状态不允许创建 edge 验证步骤。") + raise EdgeTaskConflictError("current task status does not allow scheduling edge verification steps") if self.edge_task_repository.list_active_by_task_id(task_id): - raise EdgeTaskConflictError("当前任务已存在待处理的 edge 验证步骤。") + raise EdgeTaskConflictError("task already has active edge verification steps") current_time = format_now(self.timezone_name) - step_id = f"step-{uuid4().hex[:12]}" - edge_task = EdgeTask( - edge_task_id=f"edge-task-{uuid4().hex[:12]}", - step_id=step_id, - task_id=task_id, - edge_id=edge_id, - tool_name="http_health_check", - params_json=json.dumps( - { - "url": f"http://{task.app_code or 'localhost'}.{task.env or 'env'}.demo/actuator/health", - "timeout_ms": 3000, - }, - ensure_ascii=False, - ), - step_status=EDGE_STEP_STATUS_PENDING, - success=None, - message=None, - result_data_json="{}", - evidence_json="{}", - duration_ms=None, - expire_at=current_time, - started_at=None, - finished_at=None, - created_at=current_time, - updated_at=current_time, - ) - created_edge_task = self.edge_task_repository.add(edge_task) - self._write_audit_log( - task_id=task_id, - request_id=None, - action="EDGE_TASK_SCHEDULED", - result="PENDING", - target=edge_id, - operator_user_id=None, - operator_user_name=None, - detail={"step_id": created_edge_task.step_id, "tool_name": created_edge_task.tool_name}, - ) - return created_edge_task + created_items: list[EdgeTask] = [] + for tool_name, params in self._build_default_verification_steps(task): + edge_task = EdgeTask( + edge_task_id=f"edge-task-{uuid4().hex[:12]}", + step_id=f"step-{uuid4().hex[:12]}", + task_id=task_id, + edge_id=edge_id, + tool_name=tool_name, + params_json=json.dumps(params, ensure_ascii=False), + step_status=EDGE_STEP_STATUS_PENDING, + success=None, + message=None, + result_data_json="{}", + evidence_json="{}", + duration_ms=None, + expire_at=current_time, + started_at=None, + finished_at=None, + created_at=current_time, + updated_at=current_time, + ) + created_item = self.edge_task_repository.add(edge_task) + created_items.append(created_item) + self._write_audit_log( + task_id=task_id, + request_id=None, + action="EDGE_TASK_SCHEDULED", + result="PENDING", + target=edge_id, + operator_user_id=None, + operator_user_name=None, + detail={"step_id": created_item.step_id, "tool_name": created_item.tool_name}, + ) + return created_items def pull_tasks(self, edge_id: str, max_tasks: int) -> list[EdgeTask]: items = self.edge_task_repository.list_pending_by_edge_id(edge_id)[:max_tasks] @@ -129,7 +126,7 @@ class EdgeService: pulled_items: list[EdgeTask] = [] for item in items: task = self.task_repository.get_by_task_id(item.task_id) - if not task or task.task_status != TASK_STATUS_RUNNING: + if not task or task.task_status not in {TASK_STATUS_RUNNING, TASK_STATUS_VERIFYING}: item.step_status = EDGE_STEP_STATUS_CANCELLED item.updated_at = current_time item.message = "task state no longer allows edge execution" @@ -143,25 +140,35 @@ class EdgeService: task.task_status = TASK_STATUS_VERIFYING task.updated_at = current_time - task.summary = "任务已进入边缘验证阶段。" + task.summary = "task entered edge verification stage" self.task_repository.update(task) pulled_items.append(item) return pulled_items - def report_task(self, edge_id: str, step_id: str, success: bool, message: str, data: dict, evidence: dict, started_at: str, finished_at: str) -> tuple[EdgeTask, str]: + def report_task( + self, + edge_id: str, + step_id: str, + success: bool, + message: str, + data: dict, + evidence: dict, + started_at: str, + finished_at: str, + ) -> tuple[EdgeTask, str]: edge_task = self.edge_task_repository.get_by_step_id(step_id) if not edge_task: raise EdgeTaskNotFoundError() if edge_task.edge_id != edge_id: - raise EdgeTaskConflictError("edge_id 与任务归属不一致。") + raise EdgeTaskConflictError("edge_id does not match the assigned edge task") if edge_task.step_status not in {EDGE_STEP_STATUS_RUNNING, EDGE_STEP_STATUS_PENDING}: - raise EdgeTaskConflictError("当前步骤状态不允许重复回传。") + raise EdgeTaskConflictError("edge step state does not allow duplicate report") task = self.task_repository.get_by_task_id(edge_task.task_id) if not task: - raise EdgeTaskConflictError("edge 步骤关联任务不存在。") + raise EdgeTaskConflictError("edge task references a non-existent task") if task.task_status not in {TASK_STATUS_RUNNING, TASK_STATUS_VERIFYING}: - raise EdgeTaskConflictError("当前任务状态不允许回传 edge 结果。") + raise EdgeTaskConflictError("task state does not allow edge report") edge_task.step_status = EDGE_STEP_STATUS_SUCCEEDED if success else EDGE_STEP_STATUS_FAILED edge_task.success = 1 if success else 0 @@ -187,12 +194,25 @@ class EdgeService: finished_at=finished_at, ) - task_status = TASK_STATUS_RUNNING - task.task_status = TASK_STATUS_SUCCEEDED if success else TASK_STATUS_FAILED + all_steps = self.edge_task_repository.list_by_task_id(task.task_id) + if not success: + for item in all_steps: + if item.step_status in {EDGE_STEP_STATUS_PENDING, EDGE_STEP_STATUS_RUNNING} and item.step_id != updated_edge_task.step_id: + item.step_status = EDGE_STEP_STATUS_CANCELLED + item.updated_at = format_now(self.timezone_name) + item.message = "cancelled because another verification step failed" + self.edge_task_repository.update(item) + task.task_status = TASK_STATUS_FAILED + task.summary = "edge verification failed" + elif all(item.step_status == EDGE_STEP_STATUS_SUCCEEDED for item in all_steps): + task.task_status = TASK_STATUS_SUCCEEDED + task.summary = "all edge verification steps succeeded" + else: + task.task_status = TASK_STATUS_VERIFYING + task.summary = "edge verification is still in progress" + task.updated_at = format_now(self.timezone_name) - task.summary = "边缘验证通过,任务完成。" if success else "边缘验证失败,任务失败。" self.task_repository.update(task) - task_status = task.task_status self._write_audit_log( task_id=task.task_id, request_id=None, @@ -203,8 +223,7 @@ class EdgeService: operator_user_name=edge_id, detail={"step_id": step_id, "tool_name": edge_task.tool_name, "message": message}, ) - - return updated_edge_task, task_status + return updated_edge_task, task.task_status def record_event(self, edge_id: str, event_type: str, message: str, detail: dict) -> AuditLog: current_time = format_now(self.timezone_name) @@ -224,6 +243,65 @@ class EdgeService: self.db.refresh(audit) return audit + def _build_default_verification_steps(self, task) -> list[tuple[str, dict]]: + app_code = task.app_code or "demo-app" + confirmed_at = task.confirmed_at or format_now(self.timezone_name) + metadata = MetadataService(self.db, self.timezone_name).get_app_metadata(task.app_code, task.env) + host = "127.0.0.1" + port = metadata.listen_port if metadata and metadata.listen_port else 8080 + command_contains = metadata.command_contains if metadata and metadata.command_contains else app_code + health_check_url = ( + metadata.health_check_url + if metadata and metadata.health_check_url + else f"http://{app_code}.{task.env or 'env'}.demo/actuator/health" + ) + log_path = metadata.log_path if metadata and metadata.log_path else f"logs/{app_code}.log" + startup_keyword = metadata.startup_keyword if metadata and metadata.startup_keyword else "Started" + process_name = metadata.process_name if metadata and metadata.process_name else "java" + return [ + ( + "check_process", + { + "process_name": process_name, + "command_contains": command_contains, + }, + ), + ( + "check_port", + { + "host": host, + "port": port, + "timeout_ms": 3000, + }, + ), + ( + "tcp_probe", + { + "host": host, + "port": port, + "timeout_ms": 3000, + }, + ), + ( + "http_health_check", + { + "url": health_check_url, + "timeout_ms": 3000, + "expected_status": 200, + "body_contains": "UP", + }, + ), + ( + "grep_log", + { + "path": log_path, + "keyword": startup_keyword, + "start_at": confirmed_at, + "limit": 20, + }, + ), + ] + def _write_tool_call( self, task_id: str, @@ -278,4 +356,4 @@ class EdgeService: detail_json=json.dumps(detail, ensure_ascii=False), timestamp=format_now(self.timezone_name), ) - return self.audit_repository.add(audit_log) + return self.audit_repository.add(audit_log) diff --git a/backend/app/services/metadata_service.py b/backend/app/services/metadata_service.py new file mode 100644 index 0000000..03324c2 --- /dev/null +++ b/backend/app/services/metadata_service.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from uuid import uuid4 + +from sqlalchemy.orm import Session + +from app.core.time import format_now +from app.models.app_metadata import AppMetadata +from app.repositories.metadata_repository import AppMetadataRepository + + +class MetadataService: + DEMO_ITEMS = [ + { + "app_code": "order-service", + "env": "test", + "process_name": "java", + "command_contains": "order-service", + "health_check_url": "http://order-service.test.demo/actuator/health", + "log_path": "logs/order-service.log", + "listen_port": 8080, + "startup_keyword": "Started order-service", + }, + { + "app_code": "order-service", + "env": "prod", + "process_name": "java", + "command_contains": "order-service", + "health_check_url": "http://order-service.prod.demo/actuator/health", + "log_path": "logs/order-service.log", + "listen_port": 8080, + "startup_keyword": "Started order-service", + }, + { + "app_code": "payment-service", + "env": "test", + "process_name": "java", + "command_contains": "payment-service", + "health_check_url": "http://payment-service.test.demo/actuator/health", + "log_path": "logs/payment-service.log", + "listen_port": 8081, + "startup_keyword": "Started payment-service", + }, + ] + + def __init__(self, db: Session, timezone_name: str) -> None: + self.db = db + self.timezone_name = timezone_name + self.repository = AppMetadataRepository(db) + + def ensure_demo_metadata(self) -> None: + for item in self.DEMO_ITEMS: + existing = self.repository.get_by_app_env(item["app_code"], item["env"]) + if existing: + continue + current_time = format_now(self.timezone_name) + self.repository.add( + AppMetadata( + app_metadata_id=f"app-meta-{uuid4().hex[:12]}", + app_code=item["app_code"], + env=item["env"], + process_name=item["process_name"], + command_contains=item["command_contains"], + health_check_url=item["health_check_url"], + log_path=item["log_path"], + listen_port=item["listen_port"], + startup_keyword=item["startup_keyword"], + created_at=current_time, + updated_at=current_time, + ) + ) + + def get_app_metadata(self, app_code: str | None, env: str | None) -> AppMetadata | None: + if not app_code or not env: + return None + return self.repository.get_by_app_env(app_code, env) diff --git a/backend/app/web/chat_demo.html b/backend/app/web/chat_demo.html new file mode 100644 index 0000000..a7d8e49 --- /dev/null +++ b/backend/app/web/chat_demo.html @@ -0,0 +1,406 @@ + + + + + + 智能化部署 Agent Demo + + + +
+ + +
+
+
+
Conversation
+
输入一句自然语言,页面会展示任务解析、确认、执行、验证、报告。
+
+
+
+ + +
+
+
+ + +
+ + + + diff --git a/backend/tests/test_chat_demo.py b/backend/tests/test_chat_demo.py new file mode 100644 index 0000000..cbade40 --- /dev/null +++ b/backend/tests/test_chat_demo.py @@ -0,0 +1,50 @@ +import os + +from fastapi.testclient import TestClient + +os.environ["DATABASE_URL"] = "sqlite:///:memory:" + +from app.main import app + + +def test_chat_session_and_message_flow() -> None: + with TestClient(app) as client: + create_session_response = client.post("/api/demo/chat/sessions", json={"tenant_id": "tenant-demo", "channel": "WEB"}) + assert create_session_response.status_code == 200 + session_payload = create_session_response.json()["data"] + session_id = session_payload["session_id"] + assert len(session_payload["messages"]) >= 1 + assert len(session_payload["sample_prompts"]) >= 1 + + send_response = client.post( + f"/api/demo/chat/sessions/{session_id}/messages", + json={"content": "deploy order-service 1.2.3 to test", "context": {}}, + ) + assert send_response.status_code == 200 + send_payload = send_response.json()["data"] + task_id = send_payload["task_id"] + assert send_payload["parsed_intent"]["app_code"] == "order-service" + assert send_payload["next_action"] == "CONFIRM_TASK" + + confirm_response = client.post( + f"/api/demo/chat/sessions/{session_id}/tasks/{task_id}/confirm", + json={"comment": "from ui"}, + ) + assert confirm_response.status_code == 200 + confirm_payload = confirm_response.json()["data"] + assert confirm_payload["task_id"] == task_id + assert confirm_payload["assistant_message"]["role"] == "assistant" + + session_detail = client.get(f"/api/demo/chat/sessions/{session_id}") + assert session_detail.status_code == 200 + session_detail_payload = session_detail.json()["data"] + assert session_detail_payload["last_task_id"] == task_id + assert len(session_detail_payload["messages"]) >= 4 + + +def test_demo_chat_page_exists() -> None: + with TestClient(app) as client: + response = client.get("/demo/chat") + assert response.status_code == 200 + assert "智能化部署 Agent Demo" in response.text + assert "Conversation" in response.text diff --git a/backend/tests/test_task_api.py b/backend/tests/test_task_api.py index e114df4..b570c72 100644 --- a/backend/tests/test_task_api.py +++ b/backend/tests/test_task_api.py @@ -7,6 +7,58 @@ os.environ["DATABASE_URL"] = "sqlite:///:memory:" from app.main import app +def report_all_edge_steps_success(client: TestClient, task_id: str) -> list[dict]: + pull_response = client.post( + "/api/agent/edge/tasks/pull", + json={"edge_id": "edge-shanghai-001", "max_tasks": 200}, + ) + assert pull_response.status_code == 200 + matched_tasks = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id] + assert len(matched_tasks) >= 5 + + for item in matched_tasks: + if item["tool_name"] == "http_health_check": + data = {"status_code": 200, "latency_ms": 45} + evidence = {"response_body": "{\"status\":\"UP\"}"} + message = "200 OK" + elif item["tool_name"] in {"check_port", "tcp_probe"}: + data = {"connected": True, "latency_ms": 12} + evidence = {} + message = "connected" + elif item["tool_name"] == "check_process": + data = {"matched_count": 1, "cpu_percent_total": 1.5, "memory_rss_kb_total": 20480} + evidence = {"matches": [{"pid": 1234, "process_name": "java", "command": "java -jar order-service.jar"}]} + message = "process found" + elif item["tool_name"] == "grep_log": + data = {"matched_count": 1} + evidence = {"matches": [{"line_number": 10, "content": "Started order-service", "timestamp": "2026-04-08 20:20:00.000"}]} + message = "keyword matched" + else: + data = {} + evidence = {} + message = "OK" + + report_response = client.post( + "/api/agent/edge/tasks/report", + json={ + "edge_id": "edge-shanghai-001", + "task_id": task_id, + "step_id": item["step_id"], + "tool_name": item["tool_name"], + "success": True, + "code": "OK", + "message": message, + "data": data, + "evidence": evidence, + "started_at": "2026-04-08 20:20:00.000", + "finished_at": "2026-04-08 20:20:00.100", + }, + ) + assert report_response.status_code == 200 + + return matched_tasks + + def test_task_create_confirm_get() -> None: with TestClient(app) as client: create_response = client.post( @@ -158,35 +210,10 @@ def test_edge_heartbeat_pull_and_report_flow() -> None: ) assert confirm_response.status_code == 200 - pull_response = client.post( - "/api/agent/edge/tasks/pull", - json={"edge_id": "edge-shanghai-001", "max_tasks": 5}, - ) - assert pull_response.status_code == 200 - tasks = pull_response.json()["data"]["tasks"] - matched_tasks = [item for item in tasks if item["task_id"] == task_id] - assert len(matched_tasks) == 1 - step_id = matched_tasks[0]["step_id"] - assert matched_tasks[0]["tool_name"] == "http_health_check" - - report_response = client.post( - "/api/agent/edge/tasks/report", - json={ - "edge_id": "edge-shanghai-001", - "task_id": task_id, - "step_id": step_id, - "tool_name": "http_health_check", - "success": True, - "code": "OK", - "message": "200 OK", - "data": {"status_code": 200, "latency_ms": 45}, - "evidence": {"response_body": "{\"status\":\"UP\"}"}, - "started_at": "2026-04-08 20:20:00.000", - "finished_at": "2026-04-08 20:20:00.100", - }, - ) - assert report_response.status_code == 200 - assert report_response.json()["data"]["task_status"] == "SUCCEEDED" + matched_tasks = report_all_edge_steps_success(client, task_id) + assert any(item["tool_name"] == "http_health_check" for item in matched_tasks) + assert any(item["tool_name"] == "check_port" for item in matched_tasks) + assert any(item["tool_name"] == "check_process" for item in matched_tasks) get_response = client.get(f"/api/agent/tasks/{task_id}") assert get_response.status_code == 200 @@ -194,6 +221,9 @@ def test_edge_heartbeat_pull_and_report_flow() -> None: assert get_response.json()["data"]["software_a_task_id"] is not None assert get_response.json()["data"]["software_a_task_status"] == "SUCCEEDED" assert get_response.json()["data"]["verification_result"]["http_ok"] is True + assert get_response.json()["data"]["verification_result"]["process_ok"] is True + assert get_response.json()["data"]["verification_result"]["port_ok"] is True + assert get_response.json()["data"]["verification_result"]["log_error_count"] == 0 def test_edge_event_report_endpoint() -> None: @@ -243,35 +273,15 @@ def test_task_report_contains_traces() -> None: headers={"X-Request-Id": "req-report-confirm-001"}, json={"confirmed": True, "comment": "confirm"}, ) - pull_response = client.post( - "/api/agent/edge/tasks/pull", - json={"edge_id": "edge-shanghai-001", "max_tasks": 5}, - ) - step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id][0] - client.post( - "/api/agent/edge/tasks/report", - json={ - "edge_id": "edge-shanghai-001", - "task_id": task_id, - "step_id": step["step_id"], - "tool_name": step["tool_name"], - "success": True, - "code": "OK", - "message": "200 OK", - "data": {"status_code": 200}, - "evidence": {"response_body": "{\"status\":\"UP\"}"}, - "started_at": "2026-04-08 20:20:00.000", - "finished_at": "2026-04-08 20:20:00.100", - }, - ) + matched_tasks = report_all_edge_steps_success(client, task_id) report_response = client.get(f"/api/agent/tasks/{task_id}/report") assert report_response.status_code == 200 payload = report_response.json()["data"] assert payload["task_basic"]["task_id"] == task_id - assert len(payload["tool_trace"]) >= 2 - assert len(payload["verification_trace"]) >= 1 - assert len(payload["audit_trace"]) >= 3 + assert len(payload["tool_trace"]) >= 6 + assert len(payload["verification_trace"]) >= 5 + assert len(payload["audit_trace"]) >= 7 assert payload["approval_trace"] == [] assert any(item["request_id"] == "req-report-confirm-001" for item in payload["tool_trace"]) assert any(item["operator_user_name"] == "alice" for item in payload["tool_trace"]) @@ -279,23 +289,33 @@ def test_task_report_contains_traces() -> None: assert payload["result_summary_detail"]["final_status"] == "SUCCEEDED" assert payload["result_summary_detail"]["software_a"]["task_status"] == "SUCCEEDED" assert payload["result_summary_detail"]["verification"]["success"] is True + assert payload["result_summary_detail"]["verification"]["step_status"] == "SUCCEEDED" deploy_trace = next(item for item in payload["tool_trace"] if item["tool_name"] == "software_a_deploy") assert deploy_trace["duration_ms"] is not None verification_trace = payload["verification_trace"][0] assert verification_trace["duration_ms"] == 100 task_metrics = payload["task_metrics"] - assert task_metrics["tool_call_count"] >= 2 - assert task_metrics["tool_call_success_count"] >= 2 + assert task_metrics["tool_call_count"] >= 6 + assert task_metrics["tool_call_success_count"] >= 6 assert task_metrics["tool_call_failed_count"] == 0 - assert task_metrics["verification_step_count"] == 1 - assert task_metrics["verification_success_count"] == 1 + assert task_metrics["software_a_duration_ms_total"] is not None + assert task_metrics["verification_step_count"] == len(matched_tasks) + assert task_metrics["verification_success_count"] == len(matched_tasks) assert task_metrics["verification_failed_count"] == 0 - assert task_metrics["verification_duration_ms_total"] == 100 + assert task_metrics["verification_duration_ms_total"] == len(matched_tasks) * 100 + assert task_metrics["verification_queue_wait_duration_ms_total"] is not None + assert task_metrics["verification_end_to_end_duration_ms_total"] is not None assert task_metrics["tool_call_duration_ms_total"] is not None assert task_metrics["confirm_wait_duration_ms"] is not None assert task_metrics["execution_duration_ms"] is not None assert task_metrics["total_duration_ms"] is not None assert task_metrics["audit_event_count"] >= 3 + assert task_metrics["audit_failure_count"] == 0 + audit_summary = payload["audit_summary"] + assert audit_summary["audit_event_count"] >= 3 + assert "CREATE_TASK" in audit_summary["action_types"] + assert "alice" in audit_summary["operator_user_names"] + assert audit_summary["result_counts"]["OK"] >= 1 def test_task_report_contains_metrics_for_approved_flow() -> None: @@ -333,11 +353,58 @@ def test_task_report_contains_metrics_for_approved_flow() -> None: task_metrics = payload["task_metrics"] assert task_metrics["approval_duration_ms"] is not None assert task_metrics["tool_call_count"] >= 1 - assert task_metrics["verification_step_count"] >= 1 + assert task_metrics["verification_step_count"] >= 5 assert task_metrics["audit_event_count"] >= 3 + assert payload["audit_summary"]["result_counts"]["APPROVED"] >= 1 assert payload["approval_trace"][0]["approval_status"] == "APPROVED" +def test_edge_pull_uses_app_metadata_driven_params() -> None: + with TestClient(app) as client: + client.post( + "/api/agent/edge/heartbeat", + json={ + "edge_id": "edge-shanghai-001", + "hostname": "customer-host-01", + "os_type": "WINDOWS", + "agent_version": "0.1.0", + "capabilities": ["http_health_check", "check_port", "check_process", "grep_log", "tcp_probe"], + }, + ) + + create_response = client.post( + "/api/agent/tasks", + json={ + "input_text": "deploy payment-service 1.2.3 to test", + "channel": "WEB", + "session_id": "sess-meta-001", + "tenant_id": "tenant-demo", + "context": {}, + }, + ) + task_id = create_response.json()["data"]["task_id"] + + client.post( + f"/api/agent/tasks/{task_id}/confirm", + json={"confirmed": True, "comment": "confirm"}, + ) + + pull_response = client.post( + "/api/agent/edge/tasks/pull", + json={"edge_id": "edge-shanghai-001", "max_tasks": 200}, + ) + matched_tasks = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id] + assert len(matched_tasks) == 5 + + by_tool_name = {item["tool_name"]: item for item in matched_tasks} + assert by_tool_name["check_process"]["params"]["command_contains"] == "payment-service" + assert by_tool_name["check_port"]["params"]["port"] == 8081 + assert by_tool_name["tcp_probe"]["params"]["port"] == 8081 + assert by_tool_name["http_health_check"]["params"]["url"] == "http://payment-service.test.demo/actuator/health" + assert by_tool_name["grep_log"]["params"]["path"] == "logs/payment-service.log" + assert by_tool_name["grep_log"]["params"]["keyword"] == "Started payment-service" + + def test_cancel_running_task() -> None: with TestClient(app) as client: create_response = client.post( @@ -564,7 +631,7 @@ def test_duplicate_edge_report_returns_conflict() -> None: pull_response = client.post( "/api/agent/edge/tasks/pull", - json={"edge_id": "edge-shanghai-001", "max_tasks": 5}, + json={"edge_id": "edge-shanghai-001", "max_tasks": 200}, ) step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id][0] @@ -636,7 +703,7 @@ def test_edge_report_with_wrong_edge_id_returns_conflict() -> None: pull_response = client.post( "/api/agent/edge/tasks/pull", - json={"edge_id": "edge-shanghai-001", "max_tasks": 5}, + json={"edge_id": "edge-shanghai-001", "max_tasks": 200}, ) step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id][0] @@ -724,7 +791,7 @@ def test_task_fails_when_software_a_deploy_fails() -> None: pull_response = client.post( "/api/agent/edge/tasks/pull", - json={"edge_id": "edge-shanghai-001", "max_tasks": 10}, + json={"edge_id": "edge-shanghai-001", "max_tasks": 200}, ) assert pull_response.status_code == 200 assert all(item["task_id"] != task_id for item in pull_response.json()["data"]["tasks"]) @@ -790,6 +857,12 @@ def test_high_risk_task_can_be_rejected() -> None: assert get_task_response.json()["data"]["task_status"] == "CANCELLED" assert get_task_response.json()["data"]["approval_status"] == "REJECTED" + report_response = client.get(f"/api/agent/tasks/{task_id}/report") + assert report_response.status_code == 200 + report_payload = report_response.json()["data"] + assert report_payload["task_metrics"]["audit_failure_count"] >= 1 + assert report_payload["audit_summary"]["result_counts"]["REJECTED"] >= 1 + def test_edge_failure_marks_task_failed() -> None: with TestClient(app) as client: @@ -820,9 +893,9 @@ def test_edge_failure_marks_task_failed() -> None: ) pull_response = client.post( "/api/agent/edge/tasks/pull", - json={"edge_id": "edge-shanghai-001", "max_tasks": 5}, + json={"edge_id": "edge-shanghai-001", "max_tasks": 200}, ) - step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id][0] + step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id and item["tool_name"] == "http_health_check"][0] report_response = client.post( "/api/agent/edge/tasks/report", @@ -849,3 +922,10 @@ def test_edge_failure_marks_task_failed() -> None: assert get_response.json()["data"]["verification_result"]["http_ok"] is False assert get_response.json()["data"]["result_summary_detail"]["verification"]["success"] is False assert get_response.json()["data"]["result_summary_detail"]["final_reason"] == "health check failed" + + remaining_pull_response = client.post( + "/api/agent/edge/tasks/pull", + json={"edge_id": "edge-shanghai-001", "max_tasks": 200}, + ) + assert remaining_pull_response.status_code == 200 + assert all(item["task_id"] != task_id for item in remaining_pull_response.json()["data"]["tasks"]) diff --git a/edge-agent/README.md b/edge-agent/README.md index c0c0323..09efd82 100644 --- a/edge-agent/README.md +++ b/edge-agent/README.md @@ -34,6 +34,7 @@ C:\Users\MH\AppData\Local\Programs\Python\Python311\python.exe -m pytest edge-ag 2. default edge id: `edge-shanghai-001` 3. current registered tools: `http_health_check` + `tcp_probe` `check_port` `check_process` `grep_log` @@ -54,12 +55,14 @@ Current repo includes: 2. `scripts/start-linux.sh` 3. `scripts/package-windows.ps1` 4. `scripts/package-linux.sh` +5. `scripts/package-linux.ps1` These scripts currently prepare a portable package skeleton and startup entrypoints. Current Windows package script already bundles a private Python runtime into: `runtime/python/` Current Linux package script supports bundling a private Python runtime directory passed in by argument or `EDGE_PYTHON_HOME`. +Current repo also provides `package-linux.ps1` as a Windows-hosted equivalent for validating Linux artifact structure when bash/WSL is unavailable. ## Packaging Direction @@ -70,7 +73,7 @@ For user-side delivery, this edge agent is intended to be bundled as: ## Current Verification Baseline -Current edge-agent baseline: `10 passed` +Current edge-agent baseline: `20 passed` ## Verified Packaging @@ -80,3 +83,24 @@ Current verified artifact: `start.ps1` `app/main.py` `runtime/python/python.exe` +2. Linux portable package tar.gz has been generated and verified to include: + `start.sh` + `app/main.py` + `runtime/python/bin/python3` + +## Native Linux Verification Steps + +When a real Linux/bash environment is available, validate native packaging with: + +```bash +export EDGE_PYTHON_HOME=/path/to/private/python/runtime +chmod +x edge-agent/scripts/package-linux.sh +./edge-agent/scripts/package-linux.sh +tar -tf edge-agent/dist/edge-agent-linux-*.tar.gz | grep -E 'start.sh|app/main.py|runtime/python/bin/python3' +``` + +Recommended follow-up checks: + +1. verify `runtime/python/bin/python3` can start +2. verify `start.sh --once` can run against backend +3. verify file permissions are preserved after extraction diff --git a/edge-agent/app/executors/http_executor.py b/edge-agent/app/executors/http_executor.py index ed73e9e..2150e58 100644 --- a/edge-agent/app/executors/http_executor.py +++ b/edge-agent/app/executors/http_executor.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import time from typing import Any @@ -9,18 +10,30 @@ import httpx class HttpHealthCheckExecutor: def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: url = params["url"] + method = str(params.get("method", "GET")).upper() timeout_ms = int(params.get("timeout_ms", 3000)) + expected_status = params.get("expected_status", 200) + body_contains = params.get("body_contains") + headers = params.get("headers", {}) started_at = time.perf_counter() with httpx.Client(timeout=timeout_ms / 1000.0) as client: - response = client.get(url) + response = client.request(method, url, headers=headers) latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0) - success = response.status_code == 200 + success = response.status_code == int(expected_status) + if success and body_contains is not None: + success = str(body_contains) in response.text message = f"{response.status_code} {response.reason_phrase}" data: dict[str, Any] = { "status_code": response.status_code, "latency_ms": latency_ms, + "method": method, + "expected_status": int(expected_status), } evidence = { "response_body": response.text, } + try: + evidence["response_json"] = json.loads(response.text) + except Exception: + pass return success, message, data, evidence diff --git a/edge-agent/app/executors/log_executor.py b/edge-agent/app/executors/log_executor.py index 65ec3f3..bb47f70 100644 --- a/edge-agent/app/executors/log_executor.py +++ b/edge-agent/app/executors/log_executor.py @@ -1,6 +1,8 @@ from __future__ import annotations +from datetime import datetime from pathlib import Path +import re from typing import Any @@ -11,6 +13,12 @@ class GrepLogExecutor: limit = int(params.get("limit", 100)) encoding = str(params.get("encoding", "utf-8")) case_sensitive = bool(params.get("case_sensitive", False)) + start_at = self._parse_time(params.get("start_at")) + end_at = self._parse_time(params.get("end_at")) + timestamp_regex = params.get( + "timestamp_regex", + r"^\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d{3,6})?)", + ) if not path.exists(): return False, f"log file not found: {path}", {}, {} @@ -21,9 +29,18 @@ class GrepLogExecutor: with path.open("r", encoding=encoding, errors="ignore") as handle: for line_number, line in enumerate(handle, start=1): text = line.rstrip("\n") + line_timestamp = self._extract_line_time(text, timestamp_regex) + if not self._match_time_range(line_timestamp, start_at, end_at): + continue text_cmp = text if case_sensitive else text.lower() if keyword_cmp in text_cmp: - matches.append({"line_number": line_number, "content": text}) + matches.append( + { + "line_number": line_number, + "content": text, + "timestamp": None if line_timestamp is None else line_timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], + } + ) if len(matches) >= limit: break @@ -36,8 +53,37 @@ class GrepLogExecutor: "path": str(path), "keyword": keyword, "matched_count": len(matches), + "start_at": params.get("start_at"), + "end_at": params.get("end_at"), }, { "matches": matches, }, ) + + def _extract_line_time(self, text: str, timestamp_regex: str) -> datetime | None: + matched = re.search(timestamp_regex, text) + if not matched: + return None + return self._parse_time(matched.group(1)) + + def _parse_time(self, value: str | None) -> datetime | None: + if not value: + return None + for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): + try: + return datetime.strptime(str(value), fmt) + except ValueError: + continue + return None + + def _match_time_range(self, line_timestamp: datetime | None, start_at: datetime | None, end_at: datetime | None) -> bool: + if start_at is None and end_at is None: + return True + if line_timestamp is None: + return False + if start_at is not None and line_timestamp < start_at: + return False + if end_at is not None and line_timestamp > end_at: + return False + return True diff --git a/edge-agent/app/executors/process_executor.py b/edge-agent/app/executors/process_executor.py index ed68039..4fd38a5 100644 --- a/edge-agent/app/executors/process_executor.py +++ b/edge-agent/app/executors/process_executor.py @@ -1,7 +1,8 @@ -from __future__ import annotations +from __future__ import annotations import csv import platform +import re import subprocess from io import StringIO from typing import Any @@ -11,12 +12,17 @@ class ProcessCheckExecutor: def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: process_name = params.get("process_name") pid = params.get("pid") + command_contains = params.get("command_contains") - if not process_name and pid is None: - return False, "process_name or pid is required", {}, {} + if not process_name and pid is None and not command_contains: + return False, "process_name, pid or command_contains is required", {}, {} processes = self._list_processes() - matched = [item for item in processes if self._match_process(item, process_name=process_name, pid=pid)] + matched = [ + item + for item in processes + if self._match_process(item, process_name=process_name, pid=pid, command_contains=command_contains) + ] success = len(matched) > 0 message = "process found" if success else "process not found" @@ -27,6 +33,9 @@ class ProcessCheckExecutor: "matched_count": len(matched), "process_name": process_name, "pid": pid, + "command_contains": command_contains, + "cpu_percent_total": round(sum(float(item.get("cpu_percent") or 0.0) for item in matched), 2), + "memory_rss_kb_total": sum(int(item.get("memory_rss_kb") or 0) for item in matched), }, { "matches": matched, @@ -51,44 +60,69 @@ class ProcessCheckExecutor: for row in reader: if len(row) < 2: continue + memory_rss_kb = self._parse_windows_memory_kb(row[4]) if len(row) > 4 else None rows.append( { "pid": int(row[1]), "process_name": row[0], "command": row[0], + "cpu_percent": None, + "memory_rss_kb": memory_rss_kb, } ) return rows def _list_unix_processes(self) -> list[dict[str, Any]]: result = subprocess.run( - ["ps", "-eo", "pid=,comm=,args="], + ["ps", "-eo", "pid=,comm=,pcpu=,pmem=,rss=,args="], capture_output=True, text=True, check=True, ) rows: list[dict[str, Any]] = [] for line in result.stdout.splitlines(): - parts = line.strip().split(None, 2) - if len(parts) < 2: + parts = line.strip().split(None, 5) + if len(parts) < 5: continue pid_text = parts[0] process_name = parts[1] - command = parts[2] if len(parts) > 2 else process_name + cpu_percent = float(parts[2]) + memory_percent = float(parts[3]) + memory_rss_kb = int(parts[4]) + command = parts[5] if len(parts) > 5 else process_name rows.append( { "pid": int(pid_text), "process_name": process_name, "command": command, + "cpu_percent": cpu_percent, + "memory_percent": memory_percent, + "memory_rss_kb": memory_rss_kb, } ) return rows - def _match_process(self, item: dict[str, Any], process_name: str | None, pid: int | str | None) -> bool: + def _match_process( + self, + item: dict[str, Any], + process_name: str | None, + pid: int | str | None, + command_contains: str | None, + ) -> bool: if pid is not None and item["pid"] != int(pid): return False if process_name: name = str(process_name).lower() if name not in item["process_name"].lower() and name not in item["command"].lower(): return False + if command_contains: + keyword = str(command_contains).lower() + if keyword not in item["command"].lower(): + return False return True + + def _parse_windows_memory_kb(self, value: str) -> int | None: + digits = re.sub(r"[^\d]", "", value or "") + if not digits: + return None + return int(digits) diff --git a/edge-agent/app/executors/tcp_probe_executor.py b/edge-agent/app/executors/tcp_probe_executor.py new file mode 100644 index 0000000..fd181b2 --- /dev/null +++ b/edge-agent/app/executors/tcp_probe_executor.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import socket +import time +from typing import Any + + +class TcpProbeExecutor: + def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: + host = str(params.get("host", "127.0.0.1")) + port = int(params["port"]) + timeout_ms = int(params.get("timeout_ms", 3000)) + + started_at = time.perf_counter() + try: + with socket.create_connection((host, port), timeout=timeout_ms / 1000.0): + latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0) + return ( + True, + "tcp probe succeeded", + { + "host": host, + "port": port, + "connected": True, + "latency_ms": latency_ms, + }, + {}, + ) + except OSError as exc: + latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0) + return ( + False, + str(exc), + { + "host": host, + "port": port, + "connected": False, + "latency_ms": latency_ms, + }, + {}, + ) diff --git a/edge-agent/app/registry/tool_registry.py b/edge-agent/app/registry/tool_registry.py index 390fbb9..6a43524 100644 --- a/edge-agent/app/registry/tool_registry.py +++ b/edge-agent/app/registry/tool_registry.py @@ -4,6 +4,7 @@ from app.executors.log_executor import GrepLogExecutor from app.executors.http_executor import HttpHealthCheckExecutor from app.executors.port_executor import PortCheckExecutor from app.executors.process_executor import ProcessCheckExecutor +from app.executors.tcp_probe_executor import TcpProbeExecutor from app.executors.linux_service_executor import LinuxServiceExecutor from app.executors.windows_service_executor import WindowsServiceExecutor @@ -12,6 +13,7 @@ class ToolRegistry: def __init__(self) -> None: self._executors = { "http_health_check": HttpHealthCheckExecutor(), + "tcp_probe": TcpProbeExecutor(), "check_port": PortCheckExecutor(), "check_process": ProcessCheckExecutor(), "grep_log": GrepLogExecutor(), diff --git a/edge-agent/scripts/package-linux.ps1 b/edge-agent/scripts/package-linux.ps1 new file mode 100644 index 0000000..b50725d --- /dev/null +++ b/edge-agent/scripts/package-linux.ps1 @@ -0,0 +1,30 @@ +param( + [string]$PythonHome = $env:EDGE_PYTHON_HOME +) + +$ErrorActionPreference = "Stop" + +if (-not $PythonHome) { + throw "Python runtime directory is required. Pass -PythonHome or set EDGE_PYTHON_HOME." +} + +$resolvedPythonHome = (Resolve-Path -LiteralPath $PythonHome).Path +$root = Split-Path -Parent $PSScriptRoot +$dist = Join-Path $root "dist" +$timestamp = Get-Date -Format "yyyyMMdd-HHmmss" +$packageRoot = Join-Path $dist "edge-agent-linux-$timestamp" +$runtimeRoot = Join-Path $packageRoot "runtime\python" +$archivePath = Join-Path $dist "edge-agent-linux-$timestamp.tar.gz" + +New-Item -ItemType Directory -Path $packageRoot -Force | Out-Null +New-Item -ItemType Directory -Path $runtimeRoot -Force | Out-Null +New-Item -ItemType Directory -Path $dist -Force | Out-Null + +Copy-Item -LiteralPath (Join-Path $root "app") -Destination $packageRoot -Recurse +Copy-Item -LiteralPath (Join-Path $root "README.md") -Destination $packageRoot +Copy-Item -LiteralPath (Join-Path $root "pyproject.toml") -Destination $packageRoot +Copy-Item -LiteralPath (Join-Path $PSScriptRoot "start-linux.sh") -Destination (Join-Path $packageRoot "start.sh") +Get-ChildItem -LiteralPath $resolvedPythonHome -Force | Copy-Item -Destination $runtimeRoot -Recurse + +tar -czf $archivePath -C $packageRoot . +Write-Output $archivePath diff --git a/edge-agent/tests/test_http_executor.py b/edge-agent/tests/test_http_executor.py index e48dc3e..ff405c9 100644 --- a/edge-agent/tests/test_http_executor.py +++ b/edge-agent/tests/test_http_executor.py @@ -22,9 +22,11 @@ class DummyClient: def __exit__(self, exc_type, exc, tb) -> None: return None - def get(self, url: str) -> DummyResponse: + def request(self, method: str, url: str, headers: dict | None = None) -> DummyResponse: if "down" in url: return DummyResponse(500, "Internal Server Error", '{"status":"DOWN"}') + if "ready" in url: + return DummyResponse(200, "OK", '{"status":"READY"}') return DummyResponse(200, "OK", '{"status":"UP"}') @@ -49,3 +51,19 @@ def test_http_health_check_executor_failure() -> None: assert message == "500 Internal Server Error" assert data["status_code"] == 500 assert evidence["response_body"] == '{"status":"DOWN"}' + + +def test_http_health_check_executor_body_contains() -> None: + with patch("app.executors.http_executor.httpx.Client", DummyClient): + success, message, data, evidence = HttpHealthCheckExecutor().execute( + { + "url": "http://service.test/ready", + "timeout_ms": 3000, + "expected_status": 200, + "body_contains": "READY", + } + ) + assert success is True + assert message == "200 OK" + assert data["method"] == "GET" + assert evidence["response_json"]["status"] == "READY" diff --git a/edge-agent/tests/test_log_executor.py b/edge-agent/tests/test_log_executor.py index 4610c89..1eaad6c 100644 --- a/edge-agent/tests/test_log_executor.py +++ b/edge-agent/tests/test_log_executor.py @@ -35,3 +35,32 @@ def test_grep_log_executor_missing_file() -> None: assert "not found" in message assert data == {} assert evidence == {} + + +def test_grep_log_executor_filters_by_time_range(tmp_path: Path) -> None: + log_file = tmp_path / "timed.log" + log_file.write_text( + "\n".join( + [ + "2026-04-09 10:00:00.000 INFO start", + "2026-04-09 10:05:00.000 ERROR first failure", + "2026-04-09 10:10:00.000 ERROR second failure", + ] + ) + + "\n", + encoding="utf-8", + ) + + success, message, data, evidence = GrepLogExecutor().execute( + { + "path": str(log_file), + "keyword": "ERROR", + "start_at": "2026-04-09 10:06:00.000", + "end_at": "2026-04-09 10:11:00.000", + } + ) + + assert success is True + assert message == "keyword matched" + assert data["matched_count"] == 1 + assert evidence["matches"][0]["line_number"] == 3 diff --git a/edge-agent/tests/test_process_executor.py b/edge-agent/tests/test_process_executor.py index f61c18a..66f7095 100644 --- a/edge-agent/tests/test_process_executor.py +++ b/edge-agent/tests/test_process_executor.py @@ -23,6 +23,7 @@ def test_process_check_executor_windows_match() -> None: assert success is True assert message == "process found" assert data["matched_count"] == 1 + assert data["memory_rss_kb_total"] == 10000 assert evidence["matches"][0]["pid"] == 1234 @@ -31,7 +32,7 @@ def test_process_check_executor_unix_pid_miss() -> None: patch("app.executors.process_executor.platform.system", return_value="Linux"), patch( "app.executors.process_executor.subprocess.run", - return_value=DummyCompletedProcess("1234 python python app.py\n"), + return_value=DummyCompletedProcess("1234 python 1.5 2.0 20480 python app.py\n"), ), ): success, message, data, evidence = ProcessCheckExecutor().execute({"pid": 9999}) @@ -40,3 +41,38 @@ def test_process_check_executor_unix_pid_miss() -> None: assert message == "process not found" assert data["matched_count"] == 0 assert evidence["matches"] == [] + + +def test_process_check_executor_unix_collects_metrics() -> None: + with ( + patch("app.executors.process_executor.platform.system", return_value="Linux"), + patch( + "app.executors.process_executor.subprocess.run", + return_value=DummyCompletedProcess("1234 python 1.5 2.0 20480 python app.py\n"), + ), + ): + success, message, data, evidence = ProcessCheckExecutor().execute({"process_name": "python"}) + + assert success is True + assert message == "process found" + assert data["matched_count"] == 1 + assert data["cpu_percent_total"] == 1.5 + assert data["memory_rss_kb_total"] == 20480 + assert evidence["matches"][0]["memory_percent"] == 2.0 + + +def test_process_check_executor_command_contains_match() -> None: + with ( + patch("app.executors.process_executor.platform.system", return_value="Linux"), + patch( + "app.executors.process_executor.subprocess.run", + return_value=DummyCompletedProcess("1234 java 1.5 2.0 20480 java -jar order-service.jar\n"), + ), + ): + success, message, data, evidence = ProcessCheckExecutor().execute({"command_contains": "order-service"}) + + assert success is True + assert message == "process found" + assert data["matched_count"] == 1 + assert data["command_contains"] == "order-service" + assert "order-service" in evidence["matches"][0]["command"] diff --git a/edge-agent/tests/test_tcp_probe_executor.py b/edge-agent/tests/test_tcp_probe_executor.py new file mode 100644 index 0000000..f980124 --- /dev/null +++ b/edge-agent/tests/test_tcp_probe_executor.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import socket +import threading + +from app.executors.tcp_probe_executor import TcpProbeExecutor + + +def test_tcp_probe_executor_success() -> None: + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.bind(("127.0.0.1", 0)) + server.listen(1) + host, port = server.getsockname() + + def accept_once() -> None: + conn, _ = server.accept() + conn.close() + server.close() + + thread = threading.Thread(target=accept_once, daemon=True) + thread.start() + + success, message, data, evidence = TcpProbeExecutor().execute({"host": host, "port": port, "timeout_ms": 1000}) + thread.join(timeout=1) + + assert success is True + assert message == "tcp probe succeeded" + assert data["connected"] is True + assert data["latency_ms"] is not None + assert evidence == {} + + +def test_tcp_probe_executor_failure() -> None: + success, message, data, evidence = TcpProbeExecutor().execute({"host": "127.0.0.1", "port": 9, "timeout_ms": 100}) + assert success is False + assert data["connected"] is False + assert isinstance(message, str) + assert evidence == {} diff --git a/智能化部署agent-当前进度总结.md b/智能化部署agent-当前进度总结.md index 237b8aa..8ac893d 100644 --- a/智能化部署agent-当前进度总结.md +++ b/智能化部署agent-当前进度总结.md @@ -22,14 +22,14 @@ 4. edge 接入与调度链路: 已完成 5. 基础验证执行器: 已完成 6. service control 执行器: 已完成 -7. 审计 / 报告 / 聚合指标: 已完成第一轮 +7. 审计 / 报告 / 聚合指标: 已完成第二轮 8. 失败路径与幂等性测试: 已完成第一轮 -9. 便携打包与私有运行时: Windows 已完成验证, Linux 完成脚本待验证 +9. 便携打包与私有运行时: Windows 已完成验证, Linux 产物契约已验证 10. 真实场景联调: 进行中 当前 MVP 进度估算: -**约 85%** +**约 94%** --- @@ -165,6 +165,11 @@ demo 接口定义文档已覆盖: 23. 已补充 `edge-agent` 基础执行器实现,新增 `check_port`、`check_process`、`grep_log` 三类能力并接入工具注册表。 24. 已将 Windows / Linux 的 service control 执行器从占位实现推进为可用版本,支持 `status`、`start`、`stop`、`restart`。 25. 已将便携打包脚本增强为携带私有 Python 运行时,并完成 Windows 便携包实际打包验证。 +26. 已增强健康检查与验证能力,新增 `tcp_probe`,并扩展 `http_health_check` 的期望状态码与响应体匹配能力。 +27. 已增强 `grep_log` 的日志时间范围过滤能力,支持按 `start_at` / `end_at` 过滤。 +28. 已增强 `check_process` 的进程指标输出,支持 CPU 与内存聚合信息。 +29. 已增强任务报告中的审计与指标输出,新增 `audit_summary` 以及更细的 `task_metrics` 字段。 +30. 已生成并验证 Linux 便携包产物契约,确认 `tar.gz` 中包含 `start.sh`、`app/main.py` 与 `runtime/python/bin/python3`。 ### 3.8 当前代码可运行范围 @@ -190,14 +195,17 @@ demo 接口定义文档已覆盖: 11. 本地 `edge-agent` 当前已具备: 启动脚本、打包脚本、基础执行器测试和轮询调度测试。 12. 本地 `edge-agent` 当前已具备已注册工具: - `http_health_check`、`check_port`、`check_process`、`grep_log`、`windows_service_control`、`linux_service_control` + `http_health_check`、`tcp_probe`、`check_port`、`check_process`、`grep_log`、`windows_service_control`、`linux_service_control` +13. 任务报告当前已新增: + `audit_summary` + 更细粒度 `task_metrics` 当前测试基线: 1. 共 20 条测试通过。 2. 使用 `sqlite:///:memory:` 做回归验证。 3. 当前主链路已不是“只有接口壳”,而是具备最小闭环行为。 -4. `edge-agent` 侧基础测试共 14 条通过。 +4. `edge-agent` 侧基础测试共 19 条通过。 --- @@ -301,12 +309,12 @@ demo 接口定义文档已覆盖: 当前还未收口,或仅实现了最小版本的工作包括: -1. 本地 `edge-agent` 初始化代码与打包脚本已完成第一轮,Windows 私有运行时便携包已验证,Linux 私有运行时打包脚本待实际验证。 +1. 本地 `edge-agent` 初始化代码与打包脚本已完成第一轮,Windows 私有运行时便携包已验证,Linux 便携包产物契约已验证,但原生 Linux/bash 环境下的实机打包仍待验证。 2. 文件型 SQLite / PostgreSQL 实库运行验证。 3. 身份 demo / 审批 demo 与任务主链路的权限、审批决策联动细化。 4. 任务级聚合指标已完成第一轮,但更细的任务级指标拆分仍可继续增强,如等待时长细分、失败步骤占比、阶段级统计。 -5. 更真实的验证插件实现,尤其是日志时间范围过滤、进程指标扩展和更多健康检查方式。 -6. 部署脚本和运行脚本进一步完善,包括 Linux 私有运行时打包验证和安装/升级流程。 +5. 更真实的验证插件实现,尤其是更细的日志解析、进程/JVM 指标扩展和更多健康检查方式。 +6. 部署脚本和运行脚本进一步完善,包括原生 Linux/bash 环境下的私有运行时打包验证和安装/升级流程。 7. OpenAPI 扩展到第二批接口。 8. 更多测试用例与联调脚本。 @@ -337,7 +345,7 @@ demo 接口定义文档已覆盖: 当前状态: -**SQLite / 去 Redis / 最小 DDL / 首批 OpenAPI / FastAPI 骨架 / 主接口 / demo adapter / edge 接口 / 第一轮任务级聚合指标 / 第一轮失败与幂等性测试 / edge-agent 初始化骨架 / edge-agent 启动与打包脚本 / edge-agent 基础测试 / service control 执行器 / Windows 私有运行时便携打包,均已完成第一轮落地。** +**SQLite / 去 Redis / 最小 DDL / 首批 OpenAPI / FastAPI 骨架 / 主接口 / demo adapter / edge 接口 / 第二轮任务级聚合指标与审计摘要 / 第一轮失败与幂等性测试 / edge-agent 初始化骨架 / edge-agent 启动与打包脚本 / edge-agent 基础测试 / service control 执行器 / Windows 私有运行时便携打包 / Linux 便携包产物契约验证,均已完成当前阶段落地。** --- @@ -373,7 +381,7 @@ demo 接口定义文档已覆盖: 1. 再补更细的任务级指标拆分。 2. 再补审计细节和聚合摘要。 -3. 继续补本地 Agent 更真实的日志/进程/健康检查执行能力,并验证 Linux 私有运行时打包。 +3. 继续补本地 Agent 更真实的日志/进程/健康检查执行能力,并在原生 Linux/bash 环境验证私有运行时打包。 4. 再补第二批 OpenAPI。 ### 7.2 如果上下文快满,有什么影响 @@ -407,3 +415,73 @@ set DATABASE_URL=sqlite:///:memory: 当前已经完成从"写文档"切换到"写 demo 代码"的第一步,下一步进入: **更多执行指标 -> 审计细节增强 -> 本地 Agent 与联调能力继续补齐** + +## 9. 本轮更新(2026-04-09) + +本轮新增完成内容: + +1. 已将多类 edge 执行器真正接入后端下发链路,默认验证计划已由单步扩展为多步组合: + `check_process`、`check_port`、`tcp_probe`、`http_health_check`、`grep_log` +2. 已将 edge 结果聚合逻辑从“单步回传即结束”调整为: + 全部成功才 `SUCCEEDED` + 任一步失败则 `FAILED` 并取消剩余待执行步骤 +3. 已增强 `http_health_check`,支持 `method`、`expected_status`、`body_contains` +4. 已增强 `grep_log`,支持 `start_at` / `end_at` 时间范围过滤 +5. 已增强 `check_process`,支持 `command_contains`,并返回 CPU / 内存聚合指标 +6. 已新增 `tcp_probe` 执行器并接入工具注册表 +7. 已增强任务报告,补充更细的 `task_metrics` 和 `audit_summary` +8. 已新增 Linux 原生打包后续测试步骤说明,供后续在真实 Linux/bash 环境验证 +9. 已完成 Windows 便携包验证与 Linux 产物契约验证,当前临时验证目录已清理 + +本轮测试结果: + +1. backend 测试 `20 passed` +2. edge-agent 测试 `20 passed` + +本轮 MVP 进度更新: + +**约 91%** + +距离当前 MVP 收口,主要剩余: + +1. 更真实的日志/JVM/健康检查插件扩展 +2. 更细的任务级阶段指标与审计摘要打磨 +3. 原生 Linux/bash 环境下的私有运行时打包实机验证 +4. 第二批 OpenAPI 与更多联调场景 + +## 10. 本轮更新(2026-04-09, Agent 演示入口层) + +本轮新增完成内容: + +1. 已新增 `app_metadata` 模型、仓储与服务,并在后端启动时自动注入 demo 元数据。 +2. 已将默认验证步骤改为由 `app_metadata` 驱动生成,不再全部依赖写死参数。 +3. 已新增最小会话层: + `chat_session` + `chat_message` + 以及对应 chat service +4. 已新增 demo chat API: + `POST /api/demo/chat/sessions` + `GET /api/demo/chat/sessions/{session_id}` + `POST /api/demo/chat/sessions/{session_id}/messages` + `POST /api/demo/chat/sessions/{session_id}/tasks/{task_id}/confirm` +5. 已新增最小 Web Demo 页面: + `GET /` + `GET /demo/chat` +6. 已形成“一句话部署 -> 结构化解析 -> 确认 -> 执行 -> 验证 -> 报告”的可视化演示流。 +7. 已补充聊天入口和页面可用性测试,并完成后端全量回归。 + +本轮测试结果: + +1. backend 测试 `23 passed` +2. edge-agent 测试 `20 passed` + +本轮 MVP 进度更新: + +**约 94%** + +当前 MVP 主线剩余重点: + +1. 接入一个真实 Java 样板应用做端到端演示 +2. 继续增强 app_metadata 驱动的验证模板与真实插件能力 +3. 原生 Linux/bash 环境下验证私有运行时打包 +4. 对演示 UI 做产品化打磨