- 新增 task_report 任务级聚合指标 task_metrics - 补充创建任务幂等与失败路径/冲突测试 - 将后端测试基线提升到 20 passed - 新增 edge-agent 初始化代码、启动脚本与打包脚本 - 新增 http_health_check、check_port、check_process、grep_log 执行器 - 补充 edge-agent 基础测试并提升基线到 10 passed - 同步更新 backend README 与当前进度总结
40 lines
1.1 KiB
Python
40 lines
1.1 KiB
Python
from __future__ import annotations
|
|
|
|
import socket
|
|
import threading
|
|
|
|
from app.executors.port_executor import PortCheckExecutor
|
|
|
|
|
|
def test_port_check_executor_success() -> None:
|
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server.bind(("127.0.0.1", 0))
|
|
server.listen(1)
|
|
host, port = server.getsockname()
|
|
|
|
def accept_once() -> None:
|
|
conn, _ = server.accept()
|
|
conn.close()
|
|
server.close()
|
|
|
|
thread = threading.Thread(target=accept_once, daemon=True)
|
|
thread.start()
|
|
|
|
success, message, data, evidence = PortCheckExecutor().execute({"host": host, "port": port, "timeout_ms": 1000})
|
|
thread.join(timeout=1)
|
|
|
|
assert success is True
|
|
assert "connected" in message
|
|
assert data["connected"] is True
|
|
assert data["port"] == port
|
|
assert evidence == {}
|
|
|
|
|
|
def test_port_check_executor_failure() -> None:
|
|
success, message, data, evidence = PortCheckExecutor().execute({"host": "127.0.0.1", "port": 9, "timeout_ms": 100})
|
|
assert success is False
|
|
assert data["connected"] is False
|
|
assert data["port"] == 9
|
|
assert isinstance(message, str)
|
|
assert evidence == {}
|