diff --git a/backend/README.md b/backend/README.md index 052bc34..406dab3 100644 --- a/backend/README.md +++ b/backend/README.md @@ -89,6 +89,14 @@ Current execution metrics: 1. `tool_call.duration_ms` is persisted from `started_at` / `finished_at` 2. `verification_trace.duration_ms` is persisted for edge task reports +3. `task_report.task_metrics` returns task-level aggregate durations and counts +4. current aggregate metrics include: + `total_duration_ms` + `confirm_wait_duration_ms` + `approval_duration_ms` + `execution_duration_ms` + `tool_call_duration_ms_total` + `verification_duration_ms_total` Current result summary capabilities: @@ -113,13 +121,13 @@ Automated tests currently cover: 6. task report trace aggregation 7. cancel running task -Current baseline: `14 passed` +Current baseline: `20 passed` ## Next Focus Recommended next implementation steps: -1. add more idempotency and rollback tests -2. continue enriching audit details and task-level aggregate metrics -3. continue toward local edge-agent bootstrap +1. continue enriching audit details and task-level metric breakdown +2. continue implementing local edge-agent executors beyond `http_health_check` +3. add packaging/bootstrap scripts for portable edge-agent delivery 4. then continue with second-batch OpenAPI diff --git a/backend/app/api/agent/tasks.py b/backend/app/api/agent/tasks.py index d78241d..5f808d8 100644 --- a/backend/app/api/agent/tasks.py +++ b/backend/app/api/agent/tasks.py @@ -9,7 +9,7 @@ from sqlalchemy.orm import Session from app.core.config import get_settings from app.core.constants import ERROR_CODE_OK -from app.core.time import format_now +from app.core.time import compute_duration_ms, format_now, parse_timestamp from app.db.session import get_db from app.repositories.approval_repository import ApprovalRepository from app.repositories.audit_repository import AuditRepository @@ -31,6 +31,7 @@ from app.schemas.task import ( SoftwareAResultSummary, TaskBasic, TaskDetailData, + TaskMetrics, TaskReportData, ToolTraceItem, ToolCallItem, @@ -95,6 +96,78 @@ def build_result_summary_detail(task, approval, software_a_detail: dict | None, ) +def pick_latest_timestamp(*values: str | None) -> str | None: + candidates = [value for value in values if value and parse_timestamp(value)] + if not candidates: + return None + return max(candidates, key=lambda value: parse_timestamp(value)) + + +def sum_duration_ms(values: list[int | None]) -> int: + return sum(value for value in values if value is not None) + + +def build_task_metrics(task, approval, software_a_detail: dict | None, tool_calls, edge_tasks, audit_logs) -> TaskMetrics: + tool_call_duration_ms_total = sum_duration_ms([item.duration_ms for item in tool_calls]) + verification_duration_ms_total = sum_duration_ms([item.duration_ms for item in edge_tasks]) + + tool_call_count = len(tool_calls) + tool_call_success_count = sum(1 for item in tool_calls if bool(item.success)) + tool_call_failed_count = sum(1 for item in tool_calls if not bool(item.success)) + + verification_step_count = len(edge_tasks) + verification_success_count = sum(1 for item in edge_tasks if item.success == 1) + verification_failed_count = sum(1 for item in edge_tasks if item.success == 0) + + latest_observed_at = pick_latest_timestamp( + task.updated_at, + approval.updated_at if approval else None, + *(item.finished_at for item in tool_calls), + *(item.finished_at for item in edge_tasks), + software_a_detail.get("finished_at") if software_a_detail else None, + ) + + total_duration_ms = compute_duration_ms(task.created_at, latest_observed_at) + confirm_wait_duration_ms = compute_duration_ms(task.created_at, task.confirmed_at) + + approval_duration_ms = None + if approval: + approval_finished_at = approval.updated_at + if approval.approval_status == "PENDING": + approval_finished_at = latest_observed_at + approval_duration_ms = compute_duration_ms(approval.created_at, approval_finished_at) + + execution_started_at = task.confirmed_at + if approval and approval.approval_status == "APPROVED": + execution_started_at = approval.updated_at + + execution_duration_ms = None + if execution_started_at: + execution_finished_at = pick_latest_timestamp( + *(item.finished_at for item in tool_calls), + *(item.finished_at for item in edge_tasks), + software_a_detail.get("finished_at") if software_a_detail else None, + task.updated_at, + ) + execution_duration_ms = compute_duration_ms(execution_started_at, execution_finished_at) + + return TaskMetrics( + total_duration_ms=total_duration_ms, + confirm_wait_duration_ms=confirm_wait_duration_ms, + approval_duration_ms=approval_duration_ms, + execution_duration_ms=execution_duration_ms, + tool_call_duration_ms_total=tool_call_duration_ms_total, + verification_duration_ms_total=verification_duration_ms_total, + tool_call_count=tool_call_count, + tool_call_success_count=tool_call_success_count, + tool_call_failed_count=tool_call_failed_count, + verification_step_count=verification_step_count, + verification_success_count=verification_success_count, + verification_failed_count=verification_failed_count, + audit_event_count=len(audit_logs), + ) + + @router.post("", response_model=ApiResponse[CreateTaskData]) def create_task( payload: CreateTaskRequest, @@ -104,7 +177,14 @@ def create_task( settings = get_settings() request_id = build_request_id(x_request_id) service = TaskService(db, settings.default_timezone) - task = service.create_task(payload, request_id) + try: + task = service.create_task(payload, request_id) + except TaskConflictError as exc: + message = exc.args[0] if exc.args else "task state conflict" + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail={"code": exc.code, "message": message}, + ) from exc missing_slots = json.loads(task.missing_slots_json) next_action = "CONFIRM_TASK" if not missing_slots else "FILL_MISSING_SLOTS" @@ -388,6 +468,7 @@ def get_task_report( approval_trace=approval_trace, tool_trace=tool_trace, verification_trace=verification_trace, + task_metrics=build_task_metrics(task, approval, software_a_detail, tool_calls, edge_tasks, audit_logs), result_summary=task.summary, result_summary_detail=build_result_summary_detail(task, approval, software_a_detail, edge_tasks), audit_trace=audit_trace, diff --git a/backend/app/repositories/task_repository.py b/backend/app/repositories/task_repository.py index ab83df9..addd879 100644 --- a/backend/app/repositories/task_repository.py +++ b/backend/app/repositories/task_repository.py @@ -25,3 +25,7 @@ class TaskRepository: def get_by_task_id(self, task_id: str) -> Task | None: statement = select(Task).where(Task.task_id == task_id) return self.db.execute(statement).scalar_one_or_none() + + def get_by_request_id(self, request_id: str) -> Task | None: + statement = select(Task).where(Task.request_id == request_id) + return self.db.execute(statement).scalar_one_or_none() diff --git a/backend/app/schemas/task.py b/backend/app/schemas/task.py index b92a59e..e4e533a 100644 --- a/backend/app/schemas/task.py +++ b/backend/app/schemas/task.py @@ -165,12 +165,29 @@ class AuditTraceItem(BaseModel): timestamp: str +class TaskMetrics(BaseModel): + total_duration_ms: int | None = None + confirm_wait_duration_ms: int | None = None + approval_duration_ms: int | None = None + execution_duration_ms: int | None = None + tool_call_duration_ms_total: int = 0 + verification_duration_ms_total: int = 0 + tool_call_count: int = 0 + tool_call_success_count: int = 0 + tool_call_failed_count: int = 0 + verification_step_count: int = 0 + verification_success_count: int = 0 + verification_failed_count: int = 0 + audit_event_count: int = 0 + + class TaskReportData(BaseModel): task_basic: TaskBasic intent_snapshot: ParsedIntent approval_trace: list[ApprovalTraceItem] tool_trace: list[ToolTraceItem] verification_trace: list[VerificationTraceItem] + task_metrics: TaskMetrics result_summary: str | None = None result_summary_detail: ResultSummaryDetail | None = None audit_trace: list[AuditTraceItem] diff --git a/backend/app/services/task_service.py b/backend/app/services/task_service.py index 5394b47..4336c08 100644 --- a/backend/app/services/task_service.py +++ b/backend/app/services/task_service.py @@ -72,6 +72,18 @@ class TaskService: raise TaskConflictError(f"当前任务状态不允许执行 {action},期望状态: {allowed_text},当前状态: {task.task_status}。") def create_task(self, payload: CreateTaskRequest, request_id: str | None) -> Task: + if request_id: + existing_task = self.repository.get_by_request_id(request_id) + if existing_task: + if ( + existing_task.input_text != payload.input_text + or existing_task.channel != payload.channel + or existing_task.session_id != payload.session_id + or existing_task.tenant_id != payload.tenant_id + ): + raise TaskConflictError("相同 request_id 已用于其他创建任务请求,禁止复用。") + return existing_task + parsed_intent, missing_slots = self.intent_service.parse(payload.input_text) risk_level = self.risk_service.evaluate(parsed_intent) current_time = format_now(self.timezone_name) diff --git a/backend/tests/test_task_api.py b/backend/tests/test_task_api.py index cde409c..e114df4 100644 --- a/backend/tests/test_task_api.py +++ b/backend/tests/test_task_api.py @@ -283,6 +283,59 @@ def test_task_report_contains_traces() -> None: assert deploy_trace["duration_ms"] is not None verification_trace = payload["verification_trace"][0] assert verification_trace["duration_ms"] == 100 + task_metrics = payload["task_metrics"] + assert task_metrics["tool_call_count"] >= 2 + assert task_metrics["tool_call_success_count"] >= 2 + assert task_metrics["tool_call_failed_count"] == 0 + assert task_metrics["verification_step_count"] == 1 + assert task_metrics["verification_success_count"] == 1 + assert task_metrics["verification_failed_count"] == 0 + assert task_metrics["verification_duration_ms_total"] == 100 + assert task_metrics["tool_call_duration_ms_total"] is not None + assert task_metrics["confirm_wait_duration_ms"] is not None + assert task_metrics["execution_duration_ms"] is not None + assert task_metrics["total_duration_ms"] is not None + assert task_metrics["audit_event_count"] >= 3 + + +def test_task_report_contains_metrics_for_approved_flow() -> None: + with TestClient(app) as client: + create_response = client.post( + "/api/agent/tasks", + json={ + "input_text": "deploy order-service 1.2.3 to prod", + "channel": "WEB", + "session_id": "sess-004a", + "tenant_id": "tenant-demo", + "context": {}, + }, + ) + task_id = create_response.json()["data"]["task_id"] + + confirm_response = client.post( + f"/api/agent/tasks/{task_id}/confirm", + json={"confirmed": True, "comment": "need approval"}, + ) + approval_id = confirm_response.json()["data"]["approval_id"] + + client.post( + f"/api/demo/approval/requests/{approval_id}/decision", + json={ + "decision": "APPROVED", + "comment": "approved", + "operator": {"user_id": "u2001", "user_name": "bob"}, + }, + ) + + report_response = client.get(f"/api/agent/tasks/{task_id}/report") + assert report_response.status_code == 200 + payload = report_response.json()["data"] + task_metrics = payload["task_metrics"] + assert task_metrics["approval_duration_ms"] is not None + assert task_metrics["tool_call_count"] >= 1 + assert task_metrics["verification_step_count"] >= 1 + assert task_metrics["audit_event_count"] >= 3 + assert payload["approval_trace"][0]["approval_status"] == "APPROVED" def test_cancel_running_task() -> None: @@ -344,6 +397,62 @@ def test_confirm_twice_returns_conflict() -> None: assert second_confirm.json()["code"] == "CONFLICT" +def test_create_task_is_idempotent_for_same_request_id() -> None: + with TestClient(app) as client: + payload = { + "input_text": "deploy order-service 1.2.3 to test", + "channel": "WEB", + "session_id": "sess-006a", + "tenant_id": "tenant-demo", + "context": {}, + } + first_response = client.post( + "/api/agent/tasks", + headers={"X-Request-Id": "req-idempotent-create-001"}, + json=payload, + ) + assert first_response.status_code == 200 + first_task_id = first_response.json()["data"]["task_id"] + + second_response = client.post( + "/api/agent/tasks", + headers={"X-Request-Id": "req-idempotent-create-001"}, + json=payload, + ) + assert second_response.status_code == 200 + assert second_response.json()["data"]["task_id"] == first_task_id + + +def test_create_task_conflicts_when_same_request_id_has_different_payload() -> None: + with TestClient(app) as client: + first_response = client.post( + "/api/agent/tasks", + headers={"X-Request-Id": "req-idempotent-create-002"}, + json={ + "input_text": "deploy order-service 1.2.3 to test", + "channel": "WEB", + "session_id": "sess-006b", + "tenant_id": "tenant-demo", + "context": {}, + }, + ) + assert first_response.status_code == 200 + + second_response = client.post( + "/api/agent/tasks", + headers={"X-Request-Id": "req-idempotent-create-002"}, + json={ + "input_text": "deploy payment-service 1.2.3 to test", + "channel": "WEB", + "session_id": "sess-006b", + "tenant_id": "tenant-demo", + "context": {}, + }, + ) + assert second_response.status_code == 409 + assert second_response.json()["code"] == "CONFLICT" + + def test_approval_decision_conflicts_after_task_cancelled() -> None: with TestClient(app) as client: create_response = client.post( @@ -383,6 +492,48 @@ def test_approval_decision_conflicts_after_task_cancelled() -> None: assert decision_response.json()["code"] == "CONFLICT" +def test_approval_decision_twice_returns_conflict() -> None: + with TestClient(app) as client: + create_response = client.post( + "/api/agent/tasks", + json={ + "input_text": "deploy order-service 1.2.3 to prod", + "channel": "WEB", + "session_id": "sess-007a", + "tenant_id": "tenant-demo", + "context": {}, + }, + ) + task_id = create_response.json()["data"]["task_id"] + + confirm_response = client.post( + f"/api/agent/tasks/{task_id}/confirm", + json={"confirmed": True, "comment": "need approval"}, + ) + approval_id = confirm_response.json()["data"]["approval_id"] + + first_decision = client.post( + f"/api/demo/approval/requests/{approval_id}/decision", + json={ + "decision": "APPROVED", + "comment": "approved", + "operator": {"user_id": "u2001", "user_name": "bob"}, + }, + ) + assert first_decision.status_code == 200 + + second_decision = client.post( + f"/api/demo/approval/requests/{approval_id}/decision", + json={ + "decision": "APPROVED", + "comment": "approved twice", + "operator": {"user_id": "u2001", "user_name": "bob"}, + }, + ) + assert second_decision.status_code == 409 + assert second_decision.json()["code"] == "CONFLICT" + + def test_duplicate_edge_report_returns_conflict() -> None: with TestClient(app) as client: client.post( @@ -455,6 +606,92 @@ def test_duplicate_edge_report_returns_conflict() -> None: assert second_report.json()["code"] == "CONFLICT" +def test_edge_report_with_wrong_edge_id_returns_conflict() -> None: + with TestClient(app) as client: + client.post( + "/api/agent/edge/heartbeat", + json={ + "edge_id": "edge-shanghai-001", + "hostname": "customer-host-01", + "os_type": "WINDOWS", + "agent_version": "0.1.0", + "capabilities": ["http_health_check"], + }, + ) + create_response = client.post( + "/api/agent/tasks", + json={ + "input_text": "deploy order-service 1.2.3 to test", + "channel": "WEB", + "session_id": "sess-008a", + "tenant_id": "tenant-demo", + "context": {}, + }, + ) + task_id = create_response.json()["data"]["task_id"] + client.post( + f"/api/agent/tasks/{task_id}/confirm", + json={"confirmed": True, "comment": "confirm"}, + ) + + pull_response = client.post( + "/api/agent/edge/tasks/pull", + json={"edge_id": "edge-shanghai-001", "max_tasks": 5}, + ) + step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id][0] + + wrong_report = client.post( + "/api/agent/edge/tasks/report", + json={ + "edge_id": "edge-beijing-999", + "task_id": task_id, + "step_id": step["step_id"], + "tool_name": step["tool_name"], + "success": True, + "code": "OK", + "message": "200 OK", + "data": {"status_code": 200}, + "evidence": {"response_body": "{\"status\":\"UP\"}"}, + "started_at": "2026-04-08 20:20:00.000", + "finished_at": "2026-04-08 20:20:00.100", + }, + ) + assert wrong_report.status_code == 409 + assert wrong_report.json()["code"] == "CONFLICT" + + +def test_cancel_twice_returns_conflict() -> None: + with TestClient(app) as client: + create_response = client.post( + "/api/agent/tasks", + json={ + "input_text": "deploy order-service 1.2.3 to test", + "channel": "WEB", + "session_id": "sess-008b", + "tenant_id": "tenant-demo", + "context": {}, + }, + ) + task_id = create_response.json()["data"]["task_id"] + client.post( + f"/api/agent/tasks/{task_id}/confirm", + json={"confirmed": True, "comment": "confirm"}, + ) + + first_cancel = client.post( + f"/api/agent/tasks/{task_id}/cancel", + json={"reason": "manual stop"}, + ) + assert first_cancel.status_code == 200 + + second_cancel = client.post( + f"/api/agent/tasks/{task_id}/cancel", + json={"reason": "manual stop again"}, + ) + assert second_cancel.status_code == 409 + assert second_cancel.json()["code"] == "CONFLICT" + + def test_task_fails_when_software_a_deploy_fails() -> None: with TestClient(app) as client: create_response = client.post( diff --git a/edge-agent/README.md b/edge-agent/README.md new file mode 100644 index 0000000..0a0dd25 --- /dev/null +++ b/edge-agent/README.md @@ -0,0 +1,68 @@ +# Smart Deploy Agent Demo Edge Agent + +## Setup + +```bash +python -m venv .venv +.venv\Scripts\python -m pip install -e edge-agent +``` + +## Run Once + +```bash +set PYTHONPATH=edge-agent +.venv\Scripts\python -m app.main --once +``` + +## Run Loop + +```bash +set PYTHONPATH=edge-agent +.venv\Scripts\python -m app.main +``` + +## Test + +```bash +set PYTHONPATH=edge-agent +C:\Users\MH\AppData\Local\Programs\Python\Python311\python.exe -m pytest edge-agent/tests -q -p no:cacheprovider +``` + +## Default Runtime Notes + +1. default backend url: `http://127.0.0.1:8000` +2. default edge id: `edge-shanghai-001` +3. current registered tools: + `http_health_check` + `check_port` + `check_process` + `grep_log` +4. current bootstrap implements: + heartbeat + pull task + execute registered tools + report result + report event + +## Package Scripts + +Current repo includes: + +1. `scripts/start-windows.ps1` +2. `scripts/start-linux.sh` +3. `scripts/package-windows.ps1` +4. `scripts/package-linux.sh` + +These scripts currently prepare a portable package skeleton and startup entrypoints. +They do not yet bundle a private Python runtime. + +## Packaging Direction + +For user-side delivery, this edge agent is intended to be bundled as: + +1. Windows: `zip` portable package +2. Linux: `tar.gz` self-contained runtime directory + +## Current Verification Baseline + +Current edge-agent baseline: `10 passed` diff --git a/edge-agent/app/__init__.py b/edge-agent/app/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/edge-agent/app/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/edge-agent/app/client/__init__.py b/edge-agent/app/client/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/edge-agent/app/client/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/edge-agent/app/client/backend_client.py b/edge-agent/app/client/backend_client.py new file mode 100644 index 0000000..276d2fb --- /dev/null +++ b/edge-agent/app/client/backend_client.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from typing import Any +from uuid import uuid4 + +import httpx + +from app.core.config import Settings +from app.core.security import build_auth_headers + + +class BackendClient: + def __init__(self, settings: Settings) -> None: + self.settings = settings + self.client = httpx.Client(timeout=settings.request_timeout_ms / 1000.0) + + def _headers(self) -> dict[str, str]: + headers = build_auth_headers(self.settings) + headers["X-Request-Id"] = f"edge-req-{uuid4().hex[:12]}" + headers["X-Edge-Id"] = self.settings.edge_id + return headers + + def heartbeat(self, capabilities: list[str]) -> dict[str, Any]: + response = self.client.post( + f"{self.settings.backend_base_url}/api/agent/edge/heartbeat", + headers=self._headers(), + json={ + "edge_id": self.settings.edge_id, + "hostname": self.settings.edge_hostname, + "os_type": self.settings.edge_os_type, + "agent_version": self.settings.edge_agent_version, + "capabilities": capabilities, + }, + ) + response.raise_for_status() + return response.json() + + def pull_tasks(self, max_tasks: int = 5) -> list[dict[str, Any]]: + response = self.client.post( + f"{self.settings.backend_base_url}/api/agent/edge/tasks/pull", + headers=self._headers(), + json={ + "edge_id": self.settings.edge_id, + "max_tasks": max_tasks, + }, + ) + response.raise_for_status() + return response.json()["data"]["tasks"] + + def report_task(self, payload: dict[str, Any]) -> dict[str, Any]: + response = self.client.post( + f"{self.settings.backend_base_url}/api/agent/edge/tasks/report", + headers=self._headers(), + json=payload, + ) + response.raise_for_status() + return response.json() + + def report_event(self, event_type: str, message: str, detail: dict[str, Any] | None = None) -> dict[str, Any]: + response = self.client.post( + f"{self.settings.backend_base_url}/api/agent/edge/events", + headers=self._headers(), + json={ + "edge_id": self.settings.edge_id, + "event_type": event_type, + "message": message, + "detail": detail or {}, + }, + ) + response.raise_for_status() + return response.json() + + def close(self) -> None: + self.client.close() diff --git a/edge-agent/app/core/__init__.py b/edge-agent/app/core/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/edge-agent/app/core/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/edge-agent/app/core/config.py b/edge-agent/app/core/config.py new file mode 100644 index 0000000..7d369ed --- /dev/null +++ b/edge-agent/app/core/config.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import os +import platform +from dataclasses import dataclass + + +def detect_os_type() -> str: + system_name = platform.system().upper() + if system_name.startswith("WIN"): + return "WINDOWS" + if system_name.startswith("LINUX"): + return "LINUX" + return system_name or "UNKNOWN" + + +@dataclass(slots=True) +class Settings: + backend_base_url: str + edge_id: str + edge_name: str + edge_hostname: str + edge_os_type: str + edge_agent_version: str + edge_access_token: str | None + poll_interval_ms: int + heartbeat_interval_ms: int + request_timeout_ms: int + default_health_check_timeout_ms: int + + +def get_settings() -> Settings: + return Settings( + backend_base_url=os.getenv("BACKEND_BASE_URL", "http://127.0.0.1:8000").rstrip("/"), + edge_id=os.getenv("EDGE_ID", "edge-shanghai-001"), + edge_name=os.getenv("EDGE_NAME", "edge-agent-demo"), + edge_hostname=os.getenv("EDGE_HOSTNAME", platform.node() or "localhost"), + edge_os_type=os.getenv("EDGE_OS_TYPE", detect_os_type()), + edge_agent_version=os.getenv("EDGE_AGENT_VERSION", "0.1.0"), + edge_access_token=os.getenv("EDGE_ACCESS_TOKEN"), + poll_interval_ms=int(os.getenv("POLL_INTERVAL_MS", "3000")), + heartbeat_interval_ms=int(os.getenv("HEARTBEAT_INTERVAL_MS", "10000")), + request_timeout_ms=int(os.getenv("REQUEST_TIMEOUT_MS", "5000")), + default_health_check_timeout_ms=int(os.getenv("DEFAULT_HEALTH_CHECK_TIMEOUT_MS", "3000")), + ) diff --git a/edge-agent/app/core/logging.py b/edge-agent/app/core/logging.py new file mode 100644 index 0000000..85b9889 --- /dev/null +++ b/edge-agent/app/core/logging.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +import logging + + +def setup_logging() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) diff --git a/edge-agent/app/core/security.py b/edge-agent/app/core/security.py new file mode 100644 index 0000000..053477d --- /dev/null +++ b/edge-agent/app/core/security.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from app.core.config import Settings + + +def build_auth_headers(settings: Settings) -> dict[str, str]: + headers: dict[str, str] = {} + if settings.edge_access_token: + headers["Authorization"] = f"Bearer {settings.edge_access_token}" + return headers diff --git a/edge-agent/app/core/time.py b/edge-agent/app/core/time.py new file mode 100644 index 0000000..1bd11d2 --- /dev/null +++ b/edge-agent/app/core/time.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from datetime import datetime + + +TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" + + +def format_now() -> str: + return datetime.now().strftime(TIME_FORMAT)[:-3] diff --git a/edge-agent/app/executors/__init__.py b/edge-agent/app/executors/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/edge-agent/app/executors/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/edge-agent/app/executors/base.py b/edge-agent/app/executors/base.py new file mode 100644 index 0000000..a18331d --- /dev/null +++ b/edge-agent/app/executors/base.py @@ -0,0 +1,8 @@ +from __future__ import annotations + +from typing import Any, Protocol + + +class ToolExecutor(Protocol): + def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: + ... diff --git a/edge-agent/app/executors/http_executor.py b/edge-agent/app/executors/http_executor.py new file mode 100644 index 0000000..ed73e9e --- /dev/null +++ b/edge-agent/app/executors/http_executor.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import time +from typing import Any + +import httpx + + +class HttpHealthCheckExecutor: + def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: + url = params["url"] + timeout_ms = int(params.get("timeout_ms", 3000)) + started_at = time.perf_counter() + with httpx.Client(timeout=timeout_ms / 1000.0) as client: + response = client.get(url) + latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0) + success = response.status_code == 200 + message = f"{response.status_code} {response.reason_phrase}" + data: dict[str, Any] = { + "status_code": response.status_code, + "latency_ms": latency_ms, + } + evidence = { + "response_body": response.text, + } + return success, message, data, evidence diff --git a/edge-agent/app/executors/linux_service_executor.py b/edge-agent/app/executors/linux_service_executor.py new file mode 100644 index 0000000..8d1608d --- /dev/null +++ b/edge-agent/app/executors/linux_service_executor.py @@ -0,0 +1,8 @@ +from __future__ import annotations + +from typing import Any + + +class LinuxServiceExecutor: + def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: + return False, "linux service executor not implemented", {"params": params}, {} diff --git a/edge-agent/app/executors/log_executor.py b/edge-agent/app/executors/log_executor.py new file mode 100644 index 0000000..65ec3f3 --- /dev/null +++ b/edge-agent/app/executors/log_executor.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + + +class GrepLogExecutor: + def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: + path = Path(str(params["path"])) + keyword = str(params["keyword"]) + limit = int(params.get("limit", 100)) + encoding = str(params.get("encoding", "utf-8")) + case_sensitive = bool(params.get("case_sensitive", False)) + + if not path.exists(): + return False, f"log file not found: {path}", {}, {} + + keyword_cmp = keyword if case_sensitive else keyword.lower() + matches: list[dict[str, Any]] = [] + + with path.open("r", encoding=encoding, errors="ignore") as handle: + for line_number, line in enumerate(handle, start=1): + text = line.rstrip("\n") + text_cmp = text if case_sensitive else text.lower() + if keyword_cmp in text_cmp: + matches.append({"line_number": line_number, "content": text}) + if len(matches) >= limit: + break + + success = len(matches) > 0 + message = "keyword matched" if success else "keyword not found" + return ( + success, + message, + { + "path": str(path), + "keyword": keyword, + "matched_count": len(matches), + }, + { + "matches": matches, + }, + ) diff --git a/edge-agent/app/executors/port_executor.py b/edge-agent/app/executors/port_executor.py new file mode 100644 index 0000000..702a7e1 --- /dev/null +++ b/edge-agent/app/executors/port_executor.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import socket +import time +from typing import Any + + +class PortCheckExecutor: + def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: + host = str(params.get("host", "127.0.0.1")) + port = int(params["port"]) + timeout_ms = int(params.get("timeout_ms", 3000)) + + started_at = time.perf_counter() + try: + with socket.create_connection((host, port), timeout=timeout_ms / 1000.0): + latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0) + return ( + True, + f"connected to {host}:{port}", + { + "host": host, + "port": port, + "connected": True, + "latency_ms": latency_ms, + }, + {}, + ) + except OSError as exc: + latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0) + return ( + False, + str(exc), + { + "host": host, + "port": port, + "connected": False, + "latency_ms": latency_ms, + }, + {}, + ) diff --git a/edge-agent/app/executors/process_executor.py b/edge-agent/app/executors/process_executor.py new file mode 100644 index 0000000..ed68039 --- /dev/null +++ b/edge-agent/app/executors/process_executor.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import csv +import platform +import subprocess +from io import StringIO +from typing import Any + + +class ProcessCheckExecutor: + def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: + process_name = params.get("process_name") + pid = params.get("pid") + + if not process_name and pid is None: + return False, "process_name or pid is required", {}, {} + + processes = self._list_processes() + matched = [item for item in processes if self._match_process(item, process_name=process_name, pid=pid)] + + success = len(matched) > 0 + message = "process found" if success else "process not found" + return ( + success, + message, + { + "matched_count": len(matched), + "process_name": process_name, + "pid": pid, + }, + { + "matches": matched, + }, + ) + + def _list_processes(self) -> list[dict[str, Any]]: + system_name = platform.system().upper() + if system_name.startswith("WIN"): + return self._list_windows_processes() + return self._list_unix_processes() + + def _list_windows_processes(self) -> list[dict[str, Any]]: + result = subprocess.run( + ["tasklist", "/FO", "CSV", "/NH"], + capture_output=True, + text=True, + check=True, + ) + reader = csv.reader(StringIO(result.stdout)) + rows: list[dict[str, Any]] = [] + for row in reader: + if len(row) < 2: + continue + rows.append( + { + "pid": int(row[1]), + "process_name": row[0], + "command": row[0], + } + ) + return rows + + def _list_unix_processes(self) -> list[dict[str, Any]]: + result = subprocess.run( + ["ps", "-eo", "pid=,comm=,args="], + capture_output=True, + text=True, + check=True, + ) + rows: list[dict[str, Any]] = [] + for line in result.stdout.splitlines(): + parts = line.strip().split(None, 2) + if len(parts) < 2: + continue + pid_text = parts[0] + process_name = parts[1] + command = parts[2] if len(parts) > 2 else process_name + rows.append( + { + "pid": int(pid_text), + "process_name": process_name, + "command": command, + } + ) + return rows + + def _match_process(self, item: dict[str, Any], process_name: str | None, pid: int | str | None) -> bool: + if pid is not None and item["pid"] != int(pid): + return False + if process_name: + name = str(process_name).lower() + if name not in item["process_name"].lower() and name not in item["command"].lower(): + return False + return True diff --git a/edge-agent/app/executors/windows_service_executor.py b/edge-agent/app/executors/windows_service_executor.py new file mode 100644 index 0000000..ec132ad --- /dev/null +++ b/edge-agent/app/executors/windows_service_executor.py @@ -0,0 +1,8 @@ +from __future__ import annotations + +from typing import Any + + +class WindowsServiceExecutor: + def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: + return False, "windows service executor not implemented", {"params": params}, {} diff --git a/edge-agent/app/main.py b/edge-agent/app/main.py new file mode 100644 index 0000000..45c580b --- /dev/null +++ b/edge-agent/app/main.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import argparse +import logging + +from app.core.config import get_settings +from app.core.logging import setup_logging +from app.scheduler.polling_runner import PollingRunner + + +logger = logging.getLogger(__name__) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Smart Deploy Agent demo edge agent") + parser.add_argument("--once", action="store_true", help="run one heartbeat + pull + execute cycle") + return parser.parse_args() + + +def main() -> None: + setup_logging() + args = parse_args() + settings = get_settings() + runner = PollingRunner(settings) + try: + if args.once: + runner.run_once() + else: + runner.run_forever() + finally: + runner.close() + logger.info("edge agent stopped") + + +if __name__ == "__main__": + main() diff --git a/edge-agent/app/registry/__init__.py b/edge-agent/app/registry/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/edge-agent/app/registry/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/edge-agent/app/registry/tool_registry.py b/edge-agent/app/registry/tool_registry.py new file mode 100644 index 0000000..390fbb9 --- /dev/null +++ b/edge-agent/app/registry/tool_registry.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from app.executors.log_executor import GrepLogExecutor +from app.executors.http_executor import HttpHealthCheckExecutor +from app.executors.port_executor import PortCheckExecutor +from app.executors.process_executor import ProcessCheckExecutor +from app.executors.linux_service_executor import LinuxServiceExecutor +from app.executors.windows_service_executor import WindowsServiceExecutor + + +class ToolRegistry: + def __init__(self) -> None: + self._executors = { + "http_health_check": HttpHealthCheckExecutor(), + "check_port": PortCheckExecutor(), + "check_process": ProcessCheckExecutor(), + "grep_log": GrepLogExecutor(), + "linux_service_control": LinuxServiceExecutor(), + "windows_service_control": WindowsServiceExecutor(), + } + + def capabilities(self) -> list[str]: + return sorted(self._executors.keys()) + + def get(self, tool_name: str): + return self._executors.get(tool_name) diff --git a/edge-agent/app/scheduler/__init__.py b/edge-agent/app/scheduler/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/edge-agent/app/scheduler/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/edge-agent/app/scheduler/polling_runner.py b/edge-agent/app/scheduler/polling_runner.py new file mode 100644 index 0000000..9cda407 --- /dev/null +++ b/edge-agent/app/scheduler/polling_runner.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import logging +import time + +import httpx + +from app.client.backend_client import BackendClient +from app.core.config import Settings +from app.core.time import format_now +from app.registry.tool_registry import ToolRegistry + + +logger = logging.getLogger(__name__) + + +class PollingRunner: + def __init__(self, settings: Settings) -> None: + self.settings = settings + self.registry = ToolRegistry() + self.backend_client = BackendClient(settings) + self._last_heartbeat_at: float = 0.0 + + def run_once(self) -> None: + self._heartbeat_if_needed(force=True) + tasks = self.backend_client.pull_tasks() + for task in tasks: + self._execute_task(task) + + def run_forever(self) -> None: + while True: + self._heartbeat_if_needed(force=False) + tasks = self.backend_client.pull_tasks() + for task in tasks: + self._execute_task(task) + time.sleep(self.settings.poll_interval_ms / 1000.0) + + def close(self) -> None: + self.backend_client.close() + + def _heartbeat_if_needed(self, force: bool) -> None: + current = time.time() + if not force and current - self._last_heartbeat_at < self.settings.heartbeat_interval_ms / 1000.0: + return + self.backend_client.heartbeat(self.registry.capabilities()) + self._last_heartbeat_at = current + + def _execute_task(self, task: dict) -> None: + executor = self.registry.get(task["tool_name"]) + started_at = format_now() + if executor is None: + self.backend_client.report_event( + event_type="UNSUPPORTED_TOOL", + message=f"unsupported tool: {task['tool_name']}", + detail={"task_id": task["task_id"], "step_id": task["step_id"]}, + ) + self.backend_client.report_task( + { + "edge_id": self.settings.edge_id, + "task_id": task["task_id"], + "step_id": task["step_id"], + "tool_name": task["tool_name"], + "success": False, + "code": "UNSUPPORTED_TOOL", + "message": f"unsupported tool: {task['tool_name']}", + "data": {}, + "evidence": {}, + "started_at": started_at, + "finished_at": format_now(), + } + ) + return + + try: + success, message, data, evidence = executor.execute(task.get("params", {})) + code = "OK" if success else "EXECUTION_FAILED" + except httpx.HTTPError as exc: + success = False + code = "HTTP_ERROR" + message = str(exc) + data = {} + evidence = {} + self.backend_client.report_event( + event_type="HTTP_EXECUTOR_EXCEPTION", + message=str(exc), + detail={"task_id": task["task_id"], "step_id": task["step_id"], "tool_name": task["tool_name"]}, + ) + except Exception as exc: # pragma: no cover - defensive path + success = False + code = "EXECUTION_EXCEPTION" + message = str(exc) + data = {} + evidence = {} + self.backend_client.report_event( + event_type="AGENT_EXCEPTION", + message=str(exc), + detail={"task_id": task["task_id"], "step_id": task["step_id"], "tool_name": task["tool_name"]}, + ) + + payload = { + "edge_id": self.settings.edge_id, + "task_id": task["task_id"], + "step_id": task["step_id"], + "tool_name": task["tool_name"], + "success": success, + "code": code, + "message": message, + "data": data, + "evidence": evidence, + "started_at": started_at, + "finished_at": format_now(), + } + logger.info("report edge step result task_id=%s step_id=%s success=%s", task["task_id"], task["step_id"], success) + self.backend_client.report_task(payload) diff --git a/edge-agent/pyproject.toml b/edge-agent/pyproject.toml new file mode 100644 index 0000000..3343a5e --- /dev/null +++ b/edge-agent/pyproject.toml @@ -0,0 +1,17 @@ +[project] +name = "smart-deploy-agent-demo-edge" +version = "0.1.0" +description = "Smart deploy agent demo edge agent" +requires-python = ">=3.11" +dependencies = [ + "httpx>=0.28.0,<1.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0,<9.0.0", +] + +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" diff --git a/edge-agent/scripts/package-linux.sh b/edge-agent/scripts/package-linux.sh new file mode 100644 index 0000000..32527cb --- /dev/null +++ b/edge-agent/scripts/package-linux.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +DIST_DIR="$ROOT_DIR/dist" +PACKAGE_ROOT="$DIST_DIR/edge-agent-linux" +ARCHIVE_PATH="$DIST_DIR/edge-agent-linux.tar.gz" + +rm -rf "$PACKAGE_ROOT" +mkdir -p "$PACKAGE_ROOT" +mkdir -p "$DIST_DIR" + +cp -r "$ROOT_DIR/app" "$PACKAGE_ROOT/" +cp "$ROOT_DIR/README.md" "$PACKAGE_ROOT/" +cp "$ROOT_DIR/pyproject.toml" "$PACKAGE_ROOT/" +cp "$ROOT_DIR/scripts/start-linux.sh" "$PACKAGE_ROOT/" + +tar -czf "$ARCHIVE_PATH" -C "$PACKAGE_ROOT" . +echo "$ARCHIVE_PATH" diff --git a/edge-agent/scripts/package-windows.ps1 b/edge-agent/scripts/package-windows.ps1 new file mode 100644 index 0000000..217dd41 --- /dev/null +++ b/edge-agent/scripts/package-windows.ps1 @@ -0,0 +1,24 @@ +$ErrorActionPreference = "Stop" + +$root = Split-Path -Parent $PSScriptRoot +$dist = Join-Path $root "dist" +$packageRoot = Join-Path $dist "edge-agent-windows" +$zipPath = Join-Path $dist "edge-agent-windows.zip" + +if (Test-Path $packageRoot) { + Remove-Item -LiteralPath $packageRoot -Recurse -Force +} +if (Test-Path $zipPath) { + Remove-Item -LiteralPath $zipPath -Force +} + +New-Item -ItemType Directory -Path $packageRoot | Out-Null +New-Item -ItemType Directory -Path $dist -Force | Out-Null + +Copy-Item -LiteralPath (Join-Path $root "app") -Destination $packageRoot -Recurse +Copy-Item -LiteralPath (Join-Path $root "README.md") -Destination $packageRoot +Copy-Item -LiteralPath (Join-Path $root "pyproject.toml") -Destination $packageRoot +Copy-Item -LiteralPath (Join-Path $PSScriptRoot "start-windows.ps1") -Destination $packageRoot + +Compress-Archive -Path (Join-Path $packageRoot "*") -DestinationPath $zipPath -Force +Write-Output $zipPath diff --git a/edge-agent/scripts/start-linux.sh b/edge-agent/scripts/start-linux.sh new file mode 100644 index 0000000..6f5dff1 --- /dev/null +++ b/edge-agent/scripts/start-linux.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +PYTHON_BIN="$ROOT_DIR/.venv/bin/python" + +if [[ ! -x "$PYTHON_BIN" ]]; then + echo "Python runtime not found at $PYTHON_BIN" >&2 + exit 1 +fi + +export PYTHONPATH="$ROOT_DIR" +exec "$PYTHON_BIN" -m app.main "$@" diff --git a/edge-agent/scripts/start-windows.ps1 b/edge-agent/scripts/start-windows.ps1 new file mode 100644 index 0000000..a5b4383 --- /dev/null +++ b/edge-agent/scripts/start-windows.ps1 @@ -0,0 +1,11 @@ +$ErrorActionPreference = "Stop" + +$root = Split-Path -Parent $PSScriptRoot +$python = Join-Path $root ".venv\Scripts\python.exe" + +if (-not (Test-Path $python)) { + throw "Python runtime not found at $python" +} + +$env:PYTHONPATH = $root +& $python -m app.main @args diff --git a/edge-agent/tests/test_http_executor.py b/edge-agent/tests/test_http_executor.py new file mode 100644 index 0000000..e48dc3e --- /dev/null +++ b/edge-agent/tests/test_http_executor.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from unittest.mock import patch + +from app.executors.http_executor import HttpHealthCheckExecutor + + +class DummyResponse: + def __init__(self, status_code: int, reason_phrase: str, text: str) -> None: + self.status_code = status_code + self.reason_phrase = reason_phrase + self.text = text + + +class DummyClient: + def __init__(self, *args, **kwargs) -> None: + self.kwargs = kwargs + + def __enter__(self) -> "DummyClient": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + return None + + def get(self, url: str) -> DummyResponse: + if "down" in url: + return DummyResponse(500, "Internal Server Error", '{"status":"DOWN"}') + return DummyResponse(200, "OK", '{"status":"UP"}') + + +def test_http_health_check_executor_success() -> None: + with patch("app.executors.http_executor.httpx.Client", DummyClient): + success, message, data, evidence = HttpHealthCheckExecutor().execute( + {"url": "http://service.test/health", "timeout_ms": 3000} + ) + assert success is True + assert message == "200 OK" + assert data["status_code"] == 200 + assert data["latency_ms"] is not None + assert evidence["response_body"] == '{"status":"UP"}' + + +def test_http_health_check_executor_failure() -> None: + with patch("app.executors.http_executor.httpx.Client", DummyClient): + success, message, data, evidence = HttpHealthCheckExecutor().execute( + {"url": "http://service.test/down", "timeout_ms": 3000} + ) + assert success is False + assert message == "500 Internal Server Error" + assert data["status_code"] == 500 + assert evidence["response_body"] == '{"status":"DOWN"}' diff --git a/edge-agent/tests/test_log_executor.py b/edge-agent/tests/test_log_executor.py new file mode 100644 index 0000000..4610c89 --- /dev/null +++ b/edge-agent/tests/test_log_executor.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from pathlib import Path + +from app.executors.log_executor import GrepLogExecutor + + +def test_grep_log_executor_matches_keyword(tmp_path: Path) -> None: + log_file = tmp_path / "app.log" + log_file.write_text("INFO start\nERROR failed to boot\nINFO done\n", encoding="utf-8") + + success, message, data, evidence = GrepLogExecutor().execute( + { + "path": str(log_file), + "keyword": "ERROR", + "limit": 10, + } + ) + + assert success is True + assert message == "keyword matched" + assert data["matched_count"] == 1 + assert evidence["matches"][0]["line_number"] == 2 + + +def test_grep_log_executor_missing_file() -> None: + success, message, data, evidence = GrepLogExecutor().execute( + { + "path": "not-exists.log", + "keyword": "ERROR", + } + ) + + assert success is False + assert "not found" in message + assert data == {} + assert evidence == {} diff --git a/edge-agent/tests/test_polling_runner.py b/edge-agent/tests/test_polling_runner.py new file mode 100644 index 0000000..5829440 --- /dev/null +++ b/edge-agent/tests/test_polling_runner.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from app.core.config import Settings +from app.scheduler.polling_runner import PollingRunner + + +class StubBackendClient: + def __init__(self) -> None: + self.heartbeats: list[dict] = [] + self.task_reports: list[dict] = [] + self.event_reports: list[dict] = [] + self._tasks: list[dict] = [] + + def heartbeat(self, capabilities: list[str]) -> dict: + self.heartbeats.append({"capabilities": capabilities}) + return {"success": True} + + def pull_tasks(self, max_tasks: int = 5) -> list[dict]: + return self._tasks[:max_tasks] + + def report_task(self, payload: dict) -> dict: + self.task_reports.append(payload) + return {"success": True} + + def report_event(self, event_type: str, message: str, detail: dict | None = None) -> dict: + self.event_reports.append({"event_type": event_type, "message": message, "detail": detail or {}}) + return {"success": True} + + def close(self) -> None: + return None + + +class StubExecutor: + def execute(self, params: dict) -> tuple[bool, str, dict, dict]: + return True, "200 OK", {"status_code": 200, "latency_ms": 12}, {"response_body": '{"status":"UP"}'} + + +def build_settings() -> Settings: + return Settings( + backend_base_url="http://127.0.0.1:8000", + edge_id="edge-shanghai-001", + edge_name="edge-agent-demo", + edge_hostname="customer-host-01", + edge_os_type="WINDOWS", + edge_agent_version="0.1.0", + edge_access_token=None, + poll_interval_ms=1000, + heartbeat_interval_ms=1000, + request_timeout_ms=5000, + default_health_check_timeout_ms=3000, + ) + + +def test_polling_runner_reports_unsupported_tool() -> None: + runner = PollingRunner(build_settings()) + runner.backend_client = StubBackendClient() + runner.backend_client._tasks = [ + { + "task_id": "task-001", + "step_id": "step-001", + "tool_name": "unknown_tool", + "params": {}, + "expire_at": "2026-04-09 10:00:00.000", + } + ] + + runner.run_once() + + assert len(runner.backend_client.event_reports) == 1 + assert runner.backend_client.event_reports[0]["event_type"] == "UNSUPPORTED_TOOL" + assert len(runner.backend_client.task_reports) == 1 + assert runner.backend_client.task_reports[0]["success"] is False + assert runner.backend_client.task_reports[0]["code"] == "UNSUPPORTED_TOOL" + + +def test_polling_runner_executes_registered_tool() -> None: + runner = PollingRunner(build_settings()) + runner.backend_client = StubBackendClient() + runner.registry._executors["http_health_check"] = StubExecutor() + runner.backend_client._tasks = [ + { + "task_id": "task-002", + "step_id": "step-002", + "tool_name": "http_health_check", + "params": {"url": "http://service.test/health"}, + "expire_at": "2026-04-09 10:00:00.000", + } + ] + + runner.run_once() + + assert len(runner.backend_client.task_reports) == 1 + report = runner.backend_client.task_reports[0] + assert report["success"] is True + assert report["code"] == "OK" + assert report["data"]["status_code"] == 200 + assert report["evidence"]["response_body"] == '{"status":"UP"}' diff --git a/edge-agent/tests/test_port_executor.py b/edge-agent/tests/test_port_executor.py new file mode 100644 index 0000000..23abd3d --- /dev/null +++ b/edge-agent/tests/test_port_executor.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import socket +import threading + +from app.executors.port_executor import PortCheckExecutor + + +def test_port_check_executor_success() -> None: + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.bind(("127.0.0.1", 0)) + server.listen(1) + host, port = server.getsockname() + + def accept_once() -> None: + conn, _ = server.accept() + conn.close() + server.close() + + thread = threading.Thread(target=accept_once, daemon=True) + thread.start() + + success, message, data, evidence = PortCheckExecutor().execute({"host": host, "port": port, "timeout_ms": 1000}) + thread.join(timeout=1) + + assert success is True + assert "connected" in message + assert data["connected"] is True + assert data["port"] == port + assert evidence == {} + + +def test_port_check_executor_failure() -> None: + success, message, data, evidence = PortCheckExecutor().execute({"host": "127.0.0.1", "port": 9, "timeout_ms": 100}) + assert success is False + assert data["connected"] is False + assert data["port"] == 9 + assert isinstance(message, str) + assert evidence == {} diff --git a/edge-agent/tests/test_process_executor.py b/edge-agent/tests/test_process_executor.py new file mode 100644 index 0000000..f61c18a --- /dev/null +++ b/edge-agent/tests/test_process_executor.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from unittest.mock import patch + +from app.executors.process_executor import ProcessCheckExecutor + + +class DummyCompletedProcess: + def __init__(self, stdout: str) -> None: + self.stdout = stdout + + +def test_process_check_executor_windows_match() -> None: + with ( + patch("app.executors.process_executor.platform.system", return_value="Windows"), + patch( + "app.executors.process_executor.subprocess.run", + return_value=DummyCompletedProcess('"python.exe","1234","Console","1","10,000 K"\n'), + ), + ): + success, message, data, evidence = ProcessCheckExecutor().execute({"process_name": "python"}) + + assert success is True + assert message == "process found" + assert data["matched_count"] == 1 + assert evidence["matches"][0]["pid"] == 1234 + + +def test_process_check_executor_unix_pid_miss() -> None: + with ( + patch("app.executors.process_executor.platform.system", return_value="Linux"), + patch( + "app.executors.process_executor.subprocess.run", + return_value=DummyCompletedProcess("1234 python python app.py\n"), + ), + ): + success, message, data, evidence = ProcessCheckExecutor().execute({"pid": 9999}) + + assert success is False + assert message == "process not found" + assert data["matched_count"] == 0 + assert evidence["matches"] == [] diff --git a/智能化部署agent-当前进度总结.md b/智能化部署agent-当前进度总结.md index ab5b976..dc2373c 100644 --- a/智能化部署agent-当前进度总结.md +++ b/智能化部署agent-当前进度总结.md @@ -1,12 +1,12 @@ # 智能化部署 Agent 当前进度总结 -更新时间:2026-04-08 +更新时间:2026-04-09 ## 1. 当前总体状态 当前阶段已完成从"需求方案"到"技术架构"再到"接口定义"和"demo 后端骨架"的文档化收敛,整体处于: -**方案已成型、文档体系已建立、技术路线已基本明确、demo 后端代码骨架已开始实现** +**方案已成型、文档体系已建立、技术路线已基本明确、demo 后端主链路已可运行** 当前产出重点已经从纯文档设计切换为: @@ -38,6 +38,8 @@ 7. `智能化部署agent-技术架构设计说明书.backup-20260408-141109.md` 为技术架构说明书备份文件。 +8. `edge-agent/README.md` 及 `edge-agent/app/*` + 用于沉淀本地 edge-agent 初始化代码骨架与运行说明。 --- @@ -136,6 +138,12 @@ demo 接口定义文档已覆盖: 15. 已补上首轮失败分支细化,包括 software-a 最小能力实现执行失败、审批驳回、edge 验证失败三条主失败路径。 16. 已完成 `duration_ms` 第一轮落地,`tool_call` 和 edge 验证轨迹可基于 `started_at` / `finished_at` 自动计算并返回时长。 17. 已完成结果摘要第一轮结构化改造,任务详情和任务报告可返回 `result_summary_detail`,包含最终状态、失败原因、software-a 摘要、审批摘要和验证摘要。 +18. 已补充任务报告级聚合指标 `task_metrics`,可返回总耗时、确认等待耗时、审批耗时、执行耗时、工具耗时汇总、验证耗时汇总及相关计数。 +19. 已补充失败路径与幂等性测试,覆盖创建任务幂等、重复审批决策冲突、错误 edge 回传冲突、重复取消冲突等场景。 +20. 已创建本地 `edge-agent` 初始化骨架,包含配置加载、后端客户端、工具注册、`http_health_check` 执行器、轮询调度器与启动入口。 +21. 已补充 `edge-agent` 启动脚本与便携打包脚本,覆盖 Windows `zip` 与 Linux `tar.gz` 两类交付方向。 +22. 已补充 `edge-agent` 基础测试,覆盖 `http_health_check` 执行器和轮询调度器主路径。 +23. 已补充 `edge-agent` 基础执行器实现,新增 `check_port`、`check_process`、`grep_log` 三类能力并接入工具注册表。 ### 3.8 当前代码可运行范围 @@ -153,15 +161,22 @@ demo 接口定义文档已覆盖: 7. edge 侧已支持: 心跳、拉取任务、回传结果、上报异常事件。 8. 执行指标当前已支持: - `tool_trace.duration_ms` 与 `verification_trace.duration_ms` + `tool_trace.duration_ms`、`verification_trace.duration_ms` 与 `task_metrics` 9. 结果摘要当前已支持: `result_summary_detail.final_status`、`final_reason`、`software_a`、`approval`、`verification` +10. 本地 `edge-agent` 当前已具备最小启动骨架: + 心跳、拉取任务、执行 `http_health_check`、回传结果、上报异常。 +11. 本地 `edge-agent` 当前已具备: + 启动脚本、打包脚本、基础执行器测试和轮询调度测试。 +12. 本地 `edge-agent` 当前已具备已注册工具: + `http_health_check`、`check_port`、`check_process`、`grep_log` 当前测试基线: -1. 共 14 条测试通过。 +1. 共 20 条测试通过。 2. 使用 `sqlite:///:memory:` 做回归验证。 3. 当前主链路已不是“只有接口壳”,而是具备最小闭环行为。 +4. `edge-agent` 侧基础测试共 10 条通过。 --- @@ -265,12 +280,12 @@ demo 接口定义文档已覆盖: 当前还未收口,或仅实现了最小版本的工作包括: -1. 本地 `edge-agent` 初始化代码与打包脚本。 +1. 本地 `edge-agent` 初始化代码与打包脚本已完成第一轮,但尚未接入私有 Python 运行时和真正的便携发布流程。 2. 文件型 SQLite / PostgreSQL 实库运行验证。 3. 身份 demo / 审批 demo 与任务主链路的权限、审批决策联动细化。 -4. 任务级聚合指标仍未完成,如总耗时、审批耗时、等待耗时。 -5. 更真实的验证插件实现。 -6. 部署脚本和运行脚本完善。 +4. 任务级聚合指标已完成第一轮,但更细的任务级指标拆分仍可继续增强,如等待时长细分、失败步骤占比、阶段级统计。 +5. 更真实的验证插件实现,尤其是服务控制、日志时间范围过滤、进程指标扩展。 +6. 部署脚本和运行脚本进一步完善,包括私有运行时打包。 7. OpenAPI 扩展到第二批接口。 8. 更多测试用例与联调脚本。 @@ -291,17 +306,17 @@ demo 接口定义文档已覆盖: 当前不是继续补基础文档,而是继续补强现有可运行链路。优先级建议收敛为: 1. 增补失败路径与幂等性测试: - 重点补重复请求、重复回传、异常回滚等场景。 -2. 继续丰富审计细节与任务级聚合指标: - 让任务级总耗时、审批耗时、等待耗时可直观看到。 + 已完成一轮,后续可继续补回滚和更细冲突场景。 +2. 继续丰富审计细节与任务级指标拆分: + 让任务级总耗时、审批耗时、等待耗时、阶段时长边界更直观。 3. 再补更多执行指标: - 如任务级聚合耗时、审批耗时、等待耗时。 + 如失败步骤占比、阶段级耗时拆分、任务级成功率统计。 4. 然后再继续: - 本地 `edge-agent` 骨架、第二批 OpenAPI、更多联调能力。 + 本地 `edge-agent` 执行器增强、第二批 OpenAPI、更多联调能力。 当前状态: -**SQLite / 去 Redis / 最小 DDL / 首批 OpenAPI / FastAPI 骨架 / 三条主接口 / demo adapter / edge 接口,均已完成第一轮落地。** +**SQLite / 去 Redis / 最小 DDL / 首批 OpenAPI / FastAPI 骨架 / 主接口 / demo adapter / edge 接口 / 第一轮任务级聚合指标 / 第一轮失败与幂等性测试 / edge-agent 初始化骨架 / edge-agent 启动与打包脚本 / edge-agent 基础测试,均已完成第一轮落地。** --- @@ -335,10 +350,10 @@ demo 接口定义文档已覆盖: 下一步推荐顺序: -1. 再补失败路径和幂等性测试。 -2. 再补任务级执行指标。 -3. 再补审计细节和聚合摘要。 -4. 再补本地 Agent 初始化代码或第二批 OpenAPI。 +1. 再补更细的任务级指标拆分。 +2. 再补审计细节和聚合摘要。 +3. 继续补本地 Agent 执行器与真正的便携运行时打包。 +4. 再补第二批 OpenAPI。 ### 7.2 如果上下文快满,有什么影响