auto_agent/edge-agent/tests/test_http_executor.py
2521690 2c7714268f feat: 补强 demo 后端任务指标与 edge-agent 执行骨架
- 新增 task_report 任务级聚合指标 task_metrics
- 补充创建任务幂等与失败路径/冲突测试
- 将后端测试基线提升到 20 passed
- 新增 edge-agent 初始化代码、启动脚本与打包脚本
- 新增 http_health_check、check_port、check_process、grep_log 执行器
- 补充 edge-agent 基础测试并提升基线到 10 passed
- 同步更新 backend README 与当前进度总结
2026-04-09 10:51:19 +08:00

52 lines
1.7 KiB
Python

from __future__ import annotations
from unittest.mock import patch
from app.executors.http_executor import HttpHealthCheckExecutor
class DummyResponse:
def __init__(self, status_code: int, reason_phrase: str, text: str) -> None:
self.status_code = status_code
self.reason_phrase = reason_phrase
self.text = text
class DummyClient:
def __init__(self, *args, **kwargs) -> None:
self.kwargs = kwargs
def __enter__(self) -> "DummyClient":
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
def get(self, url: str) -> DummyResponse:
if "down" in url:
return DummyResponse(500, "Internal Server Error", '{"status":"DOWN"}')
return DummyResponse(200, "OK", '{"status":"UP"}')
def test_http_health_check_executor_success() -> None:
with patch("app.executors.http_executor.httpx.Client", DummyClient):
success, message, data, evidence = HttpHealthCheckExecutor().execute(
{"url": "http://service.test/health", "timeout_ms": 3000}
)
assert success is True
assert message == "200 OK"
assert data["status_code"] == 200
assert data["latency_ms"] is not None
assert evidence["response_body"] == '{"status":"UP"}'
def test_http_health_check_executor_failure() -> None:
with patch("app.executors.http_executor.httpx.Client", DummyClient):
success, message, data, evidence = HttpHealthCheckExecutor().execute(
{"url": "http://service.test/down", "timeout_ms": 3000}
)
assert success is False
assert message == "500 Internal Server Error"
assert data["status_code"] == 500
assert evidence["response_body"] == '{"status":"DOWN"}'