auto_agent/edge-agent/tests/test_port_executor.py
2521690 2c7714268f feat: 补强 demo 后端任务指标与 edge-agent 执行骨架
- 新增 task_report 任务级聚合指标 task_metrics
- 补充创建任务幂等与失败路径/冲突测试
- 将后端测试基线提升到 20 passed
- 新增 edge-agent 初始化代码、启动脚本与打包脚本
- 新增 http_health_check、check_port、check_process、grep_log 执行器
- 补充 edge-agent 基础测试并提升基线到 10 passed
- 同步更新 backend README 与当前进度总结
2026-04-09 10:51:19 +08:00

40 lines
1.1 KiB
Python

from __future__ import annotations
import socket
import threading
from app.executors.port_executor import PortCheckExecutor
def test_port_check_executor_success() -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 0))
server.listen(1)
host, port = server.getsockname()
def accept_once() -> None:
conn, _ = server.accept()
conn.close()
server.close()
thread = threading.Thread(target=accept_once, daemon=True)
thread.start()
success, message, data, evidence = PortCheckExecutor().execute({"host": host, "port": port, "timeout_ms": 1000})
thread.join(timeout=1)
assert success is True
assert "connected" in message
assert data["connected"] is True
assert data["port"] == port
assert evidence == {}
def test_port_check_executor_failure() -> None:
success, message, data, evidence = PortCheckExecutor().execute({"host": "127.0.0.1", "port": 9, "timeout_ms": 100})
assert success is False
assert data["connected"] is False
assert data["port"] == 9
assert isinstance(message, str)
assert evidence == {}