from __future__ import annotations import socket import time from typing import Any class TcpProbeExecutor: def execute(self, params: dict[str, Any]) -> tuple[bool, str, dict[str, Any], dict[str, Any]]: host = str(params.get("host", "127.0.0.1")) port = int(params["port"]) timeout_ms = int(params.get("timeout_ms", 3000)) started_at = time.perf_counter() try: with socket.create_connection((host, port), timeout=timeout_ms / 1000.0): latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0) return ( True, "tcp probe succeeded", { "host": host, "port": port, "connected": True, "latency_ms": latency_ms, }, {}, ) except OSError as exc: latency_ms = max(int((time.perf_counter() - started_at) * 1000), 0) return ( False, str(exc), { "host": host, "port": port, "connected": False, "latency_ms": latency_ms, }, {}, )