2521690 2c7714268f feat: 补强 demo 后端任务指标与 edge-agent 执行骨架
- 新增 task_report 任务级聚合指标 task_metrics
- 补充创建任务幂等与失败路径/冲突测试
- 将后端测试基线提升到 20 passed
- 新增 edge-agent 初始化代码、启动脚本与打包脚本
- 新增 http_health_check、check_port、check_process、grep_log 执行器
- 补充 edge-agent 基础测试并提升基线到 10 passed
- 同步更新 backend README 与当前进度总结
2026-04-09 10:51:19 +08:00

42 lines
1.3 KiB
Python

from __future__ import annotations
import socket
import time
from typing import Any
class PortCheckExecutor:
def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]:
host = str(params.get("host", "127.0.0.1"))
port = int(params["port"])
timeout_ms = int(params.get("timeout_ms", 3000))
started_at = time.perf_counter()
try:
with socket.create_connection((host, port), timeout=timeout_ms / 1000.0):
latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0)
return (
True,
f"connected to {host}:{port}",
{
"host": host,
"port": port,
"connected": True,
"latency_ms": latency_ms,
},
{},
)
except OSError as exc:
latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0)
return (
False,
str(exc),
{
"host": host,
"port": port,
"connected": False,
"latency_ms": latency_ms,
},
{},
)