auto_agent/backend/app/services/software_a_service.py
2521690 a0f7152e80 feat(mvp): 接入真实样板应用桥接并推进演示主线
- 新增 `sample-apps/order-service` Java 样板应用及 Win/Linux 构建、启停、状态脚本
- 新增 `LocalSampleAppService`,在 `software-a` 中支持 `order-service test` 本地桥接部署
- 增加桥接开关配置:`ENABLE_SAMPLE_APP_BRIDGE`、`SAMPLE_APP_ROOT`
- 修正后端配置读取方式,环境变量可在运行时生效(`Settings` 改为 `default_factory`)
- 更新应用元数据默认验证目标:`127.0.0.1:18080`、本地日志路径
- 新增桥接测试 `test_sample_app_bridge.py`,后端基线更新至 `24 passed`
- 更新 `.gitignore`,忽略样板应用 `build/runtime` 产物
- 更新 README 与《当前进度总结》:记录本轮“真实样板应用 + 桥接能力”进展,MVP 进度约 `97%`
2026-04-09 15:45:03 +08:00

81 lines
3.3 KiB
Python

from __future__ import annotations
import os
from uuid import uuid4
from app.core.constants import (
SOFTWARE_A_TASK_STATUS_FAILED,
SOFTWARE_A_TASK_STATUS_RUNNING,
SOFTWARE_A_TASK_STATUS_SUCCEEDED,
)
from app.core.config import get_settings
from app.core.time import format_now
from app.schemas.software_a import CreateDeployTaskRequest
from app.services.local_sample_app_service import LocalSampleAppService
class SoftwareAService:
_deploy_tasks: dict[str, dict] = {}
def __init__(self, timezone_name: str) -> None:
self.timezone_name = timezone_name
def create_deploy_task(self, payload: CreateDeployTaskRequest) -> dict:
task_id = f"sa-task-{uuid4().hex[:12]}"
should_fail = self._should_fail_deploy(payload)
task_status = SOFTWARE_A_TASK_STATUS_FAILED if should_fail else SOFTWARE_A_TASK_STATUS_RUNNING
error_detail = self._build_error_detail(payload) if should_fail else None
sample_app_result: dict | None = None
if not should_fail and self._should_use_local_sample_bridge(payload):
try:
sample_app_result = LocalSampleAppService(get_settings().sample_app_root).deploy_order_service(payload.version)
except Exception as exc:
task_status = SOFTWARE_A_TASK_STATUS_FAILED
error_detail = f"local sample app deploy failed: {exc}"
task = {
"software_a_task_id": task_id,
"task_status": task_status,
"progress_percent": 100,
"app_code": payload.app_code,
"env": payload.env,
"version": payload.version,
"target_nodes": payload.target_nodes,
"started_at": format_now(self.timezone_name),
"finished_at": format_now(self.timezone_name),
"error_detail": error_detail,
"sample_app_result": sample_app_result,
}
self._deploy_tasks[task_id] = task
return task
def get_deploy_task(self, software_a_task_id: str) -> dict | None:
task = self._deploy_tasks.get(software_a_task_id)
if not task:
return None
if task["task_status"] == SOFTWARE_A_TASK_STATUS_FAILED:
return task
task["task_status"] = SOFTWARE_A_TASK_STATUS_SUCCEEDED
task["progress_percent"] = 100
return task
def check_permission(self, action_type: str, env: str, approval_status: str | None = None) -> tuple[bool, str]:
if env == "prod" and action_type in {"STOP_SERVICE", "RESTART_SERVICE", "DEPLOY"} and approval_status != "APPROVED":
return False, "生产环境动作默认需要额外审批"
return True, ""
def _should_fail_deploy(self, payload: CreateDeployTaskRequest) -> bool:
app_code = payload.app_code.lower()
version = payload.version.lower()
return "fail" in app_code or "fail" in version
def _build_error_detail(self, payload: CreateDeployTaskRequest) -> str:
return f"demo deploy failed for app={payload.app_code}, env={payload.env}, version={payload.version}"
def _should_use_local_sample_bridge(self, payload: CreateDeployTaskRequest) -> bool:
settings = get_settings()
if not settings.enable_sample_app_bridge:
return False
return payload.app_code == "order-service" and payload.env == "test"