auto_agent/backend/tests/test_task_api.py
2521690 ce299cbb18 feat: 增加 Agent 演示入口与 app_metadata 驱动验证链路
- 新增 app_metadata 模型、仓储与服务
- 将默认 edge 验证步骤改为由 app_metadata 驱动生成
- 新增 chat_session / chat_message 会话层模型与 chat service
- 新增 demo chat API,支持会话创建、消息发送、任务确认
- 新增最小 Web Demo 页面,形成聊天式演示入口
- 增强任务报告,补充 audit_summary 与更细粒度 task_metrics
- 增强 edge-agent 执行器:tcp_probe、日志时间范围过滤、进程指标与更灵活健康检查
- 更新 README 与当前进度总结,MVP 进度推进到约 94%
2026-04-09 14:10:13 +08:00

932 lines
37 KiB
Python

import os
from fastapi.testclient import TestClient
os.environ["DATABASE_URL"] = "sqlite:///:memory:"
from app.main import app
def report_all_edge_steps_success(client: TestClient, task_id: str) -> list[dict]:
pull_response = client.post(
"/api/agent/edge/tasks/pull",
json={"edge_id": "edge-shanghai-001", "max_tasks": 200},
)
assert pull_response.status_code == 200
matched_tasks = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id]
assert len(matched_tasks) >= 5
for item in matched_tasks:
if item["tool_name"] == "http_health_check":
data = {"status_code": 200, "latency_ms": 45}
evidence = {"response_body": "{\"status\":\"UP\"}"}
message = "200 OK"
elif item["tool_name"] in {"check_port", "tcp_probe"}:
data = {"connected": True, "latency_ms": 12}
evidence = {}
message = "connected"
elif item["tool_name"] == "check_process":
data = {"matched_count": 1, "cpu_percent_total": 1.5, "memory_rss_kb_total": 20480}
evidence = {"matches": [{"pid": 1234, "process_name": "java", "command": "java -jar order-service.jar"}]}
message = "process found"
elif item["tool_name"] == "grep_log":
data = {"matched_count": 1}
evidence = {"matches": [{"line_number": 10, "content": "Started order-service", "timestamp": "2026-04-08 20:20:00.000"}]}
message = "keyword matched"
else:
data = {}
evidence = {}
message = "OK"
report_response = client.post(
"/api/agent/edge/tasks/report",
json={
"edge_id": "edge-shanghai-001",
"task_id": task_id,
"step_id": item["step_id"],
"tool_name": item["tool_name"],
"success": True,
"code": "OK",
"message": message,
"data": data,
"evidence": evidence,
"started_at": "2026-04-08 20:20:00.000",
"finished_at": "2026-04-08 20:20:00.100",
},
)
assert report_response.status_code == 200
return matched_tasks
def test_task_create_confirm_get() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
headers={"X-Request-Id": "req-test-create-001"},
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-001",
"tenant_id": "tenant-demo",
"context": {"preferred_env": "test"},
},
)
assert create_response.status_code == 200
task_id = create_response.json()["data"]["task_id"]
confirm_response = client.post(
f"/api/agent/tasks/{task_id}/confirm",
headers={"X-Request-Id": "req-test-confirm-001"},
json={"confirmed": True, "comment": "确认执行"},
)
assert confirm_response.status_code == 200
assert confirm_response.json()["data"]["software_a_task_id"] is not None
assert confirm_response.json()["data"]["software_a_task_status"] == "RUNNING"
get_response = client.get(f"/api/agent/tasks/{task_id}")
assert get_response.status_code == 200
assert get_response.json()["data"]["task_id"] == task_id
assert get_response.json()["data"]["software_a_task_id"] is not None
assert get_response.json()["data"]["software_a_task_status"] == "SUCCEEDED"
assert get_response.json()["data"]["tool_calls"][0]["tool_name"] == "software_a_deploy"
assert get_response.json()["data"]["result_summary_detail"]["final_status"] == "RUNNING"
assert get_response.json()["data"]["result_summary_detail"]["software_a"]["task_status"] == "SUCCEEDED"
def test_high_risk_task_creates_approval_and_can_be_approved() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to prod",
"channel": "WEB",
"session_id": "sess-002",
"tenant_id": "tenant-demo",
"context": {},
},
)
assert create_response.status_code == 200
task_id = create_response.json()["data"]["task_id"]
confirm_response = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "need approval"},
)
assert confirm_response.status_code == 200
approval_id = confirm_response.json()["data"]["approval_id"]
assert approval_id is not None
assert confirm_response.json()["data"]["task_status"] == "PENDING_APPROVAL"
approval_response = client.get(f"/api/demo/approval/requests/{approval_id}")
assert approval_response.status_code == 200
assert approval_response.json()["data"]["approval_status"] == "PENDING"
decision_response = client.post(
f"/api/demo/approval/requests/{approval_id}/decision",
json={
"decision": "APPROVED",
"comment": "approved",
"operator": {"user_id": "u2001", "user_name": "bob"},
},
)
assert decision_response.status_code == 200
assert decision_response.json()["data"]["approval_status"] == "APPROVED"
get_task_response = client.get(f"/api/agent/tasks/{task_id}")
assert get_task_response.status_code == 200
assert get_task_response.json()["data"]["task_status"] == "RUNNING"
assert get_task_response.json()["data"]["approval_status"] == "APPROVED"
assert get_task_response.json()["data"]["software_a_task_id"] is not None
assert get_task_response.json()["data"]["software_a_task_status"] == "SUCCEEDED"
def test_demo_identity_and_software_a_endpoints() -> None:
with TestClient(app) as client:
login_response = client.post(
"/api/demo/identity/login",
json={"username": "alice", "password": "demo-password"},
)
assert login_response.status_code == 200
token = login_response.json()["data"]["access_token"]
me_response = client.get(
"/api/demo/identity/me",
headers={"Authorization": f"Bearer {token}"},
)
assert me_response.status_code == 200
assert me_response.json()["data"]["user_name"] == "alice"
deploy_response = client.post(
"/api/demo/software-a/deploy-tasks",
json={
"operator": {"user_id": "u1001", "user_name": "alice"},
"tenant_id": "tenant-demo",
"app_code": "order-service",
"env": "test",
"version": "1.2.3",
"target_nodes": ["10.0.0.12"],
"deploy_options": {"graceful": True},
},
)
assert deploy_response.status_code == 200
software_a_task_id = deploy_response.json()["data"]["software_a_task_id"]
query_response = client.get(f"/api/demo/software-a/deploy-tasks/{software_a_task_id}")
assert query_response.status_code == 200
assert query_response.json()["data"]["task_status"] == "SUCCEEDED"
def test_edge_heartbeat_pull_and_report_flow() -> None:
with TestClient(app) as client:
heartbeat_response = client.post(
"/api/agent/edge/heartbeat",
json={
"edge_id": "edge-shanghai-001",
"hostname": "customer-host-01",
"os_type": "WINDOWS",
"agent_version": "0.1.0",
"capabilities": ["http_health_check"],
},
)
assert heartbeat_response.status_code == 200
assert heartbeat_response.json()["data"]["node_status"] == "ONLINE"
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-003",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
confirm_response = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
assert confirm_response.status_code == 200
matched_tasks = report_all_edge_steps_success(client, task_id)
assert any(item["tool_name"] == "http_health_check" for item in matched_tasks)
assert any(item["tool_name"] == "check_port" for item in matched_tasks)
assert any(item["tool_name"] == "check_process" for item in matched_tasks)
get_response = client.get(f"/api/agent/tasks/{task_id}")
assert get_response.status_code == 200
assert get_response.json()["data"]["task_status"] == "SUCCEEDED"
assert get_response.json()["data"]["software_a_task_id"] is not None
assert get_response.json()["data"]["software_a_task_status"] == "SUCCEEDED"
assert get_response.json()["data"]["verification_result"]["http_ok"] is True
assert get_response.json()["data"]["verification_result"]["process_ok"] is True
assert get_response.json()["data"]["verification_result"]["port_ok"] is True
assert get_response.json()["data"]["verification_result"]["log_error_count"] == 0
def test_edge_event_report_endpoint() -> None:
with TestClient(app) as client:
event_response = client.post(
"/api/agent/edge/events",
json={
"edge_id": "edge-shanghai-001",
"event_type": "AGENT_EXCEPTION",
"message": "executor timeout",
"detail": {"tool_name": "http_health_check", "timeout_ms": 3000},
},
)
assert event_response.status_code == 200
assert event_response.json()["data"]["accepted"] is True
assert event_response.json()["data"]["event_type"] == "AGENT_EXCEPTION"
def test_task_report_contains_traces() -> None:
with TestClient(app) as client:
client.post(
"/api/agent/edge/heartbeat",
json={
"edge_id": "edge-shanghai-001",
"hostname": "customer-host-01",
"os_type": "WINDOWS",
"agent_version": "0.1.0",
"capabilities": ["http_health_check"],
},
)
create_response = client.post(
"/api/agent/tasks",
headers={"X-Request-Id": "req-report-create-001"},
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-004",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
client.post(
f"/api/agent/tasks/{task_id}/confirm",
headers={"X-Request-Id": "req-report-confirm-001"},
json={"confirmed": True, "comment": "confirm"},
)
matched_tasks = report_all_edge_steps_success(client, task_id)
report_response = client.get(f"/api/agent/tasks/{task_id}/report")
assert report_response.status_code == 200
payload = report_response.json()["data"]
assert payload["task_basic"]["task_id"] == task_id
assert len(payload["tool_trace"]) >= 6
assert len(payload["verification_trace"]) >= 5
assert len(payload["audit_trace"]) >= 7
assert payload["approval_trace"] == []
assert any(item["request_id"] == "req-report-confirm-001" for item in payload["tool_trace"])
assert any(item["operator_user_name"] == "alice" for item in payload["tool_trace"])
assert any(item["request_id"] == "req-report-create-001" for item in payload["audit_trace"])
assert payload["result_summary_detail"]["final_status"] == "SUCCEEDED"
assert payload["result_summary_detail"]["software_a"]["task_status"] == "SUCCEEDED"
assert payload["result_summary_detail"]["verification"]["success"] is True
assert payload["result_summary_detail"]["verification"]["step_status"] == "SUCCEEDED"
deploy_trace = next(item for item in payload["tool_trace"] if item["tool_name"] == "software_a_deploy")
assert deploy_trace["duration_ms"] is not None
verification_trace = payload["verification_trace"][0]
assert verification_trace["duration_ms"] == 100
task_metrics = payload["task_metrics"]
assert task_metrics["tool_call_count"] >= 6
assert task_metrics["tool_call_success_count"] >= 6
assert task_metrics["tool_call_failed_count"] == 0
assert task_metrics["software_a_duration_ms_total"] is not None
assert task_metrics["verification_step_count"] == len(matched_tasks)
assert task_metrics["verification_success_count"] == len(matched_tasks)
assert task_metrics["verification_failed_count"] == 0
assert task_metrics["verification_duration_ms_total"] == len(matched_tasks) * 100
assert task_metrics["verification_queue_wait_duration_ms_total"] is not None
assert task_metrics["verification_end_to_end_duration_ms_total"] is not None
assert task_metrics["tool_call_duration_ms_total"] is not None
assert task_metrics["confirm_wait_duration_ms"] is not None
assert task_metrics["execution_duration_ms"] is not None
assert task_metrics["total_duration_ms"] is not None
assert task_metrics["audit_event_count"] >= 3
assert task_metrics["audit_failure_count"] == 0
audit_summary = payload["audit_summary"]
assert audit_summary["audit_event_count"] >= 3
assert "CREATE_TASK" in audit_summary["action_types"]
assert "alice" in audit_summary["operator_user_names"]
assert audit_summary["result_counts"]["OK"] >= 1
def test_task_report_contains_metrics_for_approved_flow() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to prod",
"channel": "WEB",
"session_id": "sess-004a",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
confirm_response = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "need approval"},
)
approval_id = confirm_response.json()["data"]["approval_id"]
client.post(
f"/api/demo/approval/requests/{approval_id}/decision",
json={
"decision": "APPROVED",
"comment": "approved",
"operator": {"user_id": "u2001", "user_name": "bob"},
},
)
report_response = client.get(f"/api/agent/tasks/{task_id}/report")
assert report_response.status_code == 200
payload = report_response.json()["data"]
task_metrics = payload["task_metrics"]
assert task_metrics["approval_duration_ms"] is not None
assert task_metrics["tool_call_count"] >= 1
assert task_metrics["verification_step_count"] >= 5
assert task_metrics["audit_event_count"] >= 3
assert payload["audit_summary"]["result_counts"]["APPROVED"] >= 1
assert payload["approval_trace"][0]["approval_status"] == "APPROVED"
def test_edge_pull_uses_app_metadata_driven_params() -> None:
with TestClient(app) as client:
client.post(
"/api/agent/edge/heartbeat",
json={
"edge_id": "edge-shanghai-001",
"hostname": "customer-host-01",
"os_type": "WINDOWS",
"agent_version": "0.1.0",
"capabilities": ["http_health_check", "check_port", "check_process", "grep_log", "tcp_probe"],
},
)
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy payment-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-meta-001",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
pull_response = client.post(
"/api/agent/edge/tasks/pull",
json={"edge_id": "edge-shanghai-001", "max_tasks": 200},
)
matched_tasks = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id]
assert len(matched_tasks) == 5
by_tool_name = {item["tool_name"]: item for item in matched_tasks}
assert by_tool_name["check_process"]["params"]["command_contains"] == "payment-service"
assert by_tool_name["check_port"]["params"]["port"] == 8081
assert by_tool_name["tcp_probe"]["params"]["port"] == 8081
assert by_tool_name["http_health_check"]["params"]["url"] == "http://payment-service.test.demo/actuator/health"
assert by_tool_name["grep_log"]["params"]["path"] == "logs/payment-service.log"
assert by_tool_name["grep_log"]["params"]["keyword"] == "Started payment-service"
def test_cancel_running_task() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-005",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
cancel_response = client.post(
f"/api/agent/tasks/{task_id}/cancel",
json={"reason": "manual stop"},
)
assert cancel_response.status_code == 200
assert cancel_response.json()["data"]["task_status"] == "CANCELLED"
assert cancel_response.json()["data"]["software_a_task_status"] == "CANCELLED"
get_response = client.get(f"/api/agent/tasks/{task_id}")
assert get_response.status_code == 200
assert get_response.json()["data"]["task_status"] == "CANCELLED"
def test_confirm_twice_returns_conflict() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-006",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
first_confirm = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
assert first_confirm.status_code == 200
second_confirm = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm again"},
)
assert second_confirm.status_code == 409
assert second_confirm.json()["code"] == "CONFLICT"
def test_create_task_is_idempotent_for_same_request_id() -> None:
with TestClient(app) as client:
payload = {
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-006a",
"tenant_id": "tenant-demo",
"context": {},
}
first_response = client.post(
"/api/agent/tasks",
headers={"X-Request-Id": "req-idempotent-create-001"},
json=payload,
)
assert first_response.status_code == 200
first_task_id = first_response.json()["data"]["task_id"]
second_response = client.post(
"/api/agent/tasks",
headers={"X-Request-Id": "req-idempotent-create-001"},
json=payload,
)
assert second_response.status_code == 200
assert second_response.json()["data"]["task_id"] == first_task_id
def test_create_task_conflicts_when_same_request_id_has_different_payload() -> None:
with TestClient(app) as client:
first_response = client.post(
"/api/agent/tasks",
headers={"X-Request-Id": "req-idempotent-create-002"},
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-006b",
"tenant_id": "tenant-demo",
"context": {},
},
)
assert first_response.status_code == 200
second_response = client.post(
"/api/agent/tasks",
headers={"X-Request-Id": "req-idempotent-create-002"},
json={
"input_text": "deploy payment-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-006b",
"tenant_id": "tenant-demo",
"context": {},
},
)
assert second_response.status_code == 409
assert second_response.json()["code"] == "CONFLICT"
def test_approval_decision_conflicts_after_task_cancelled() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to prod",
"channel": "WEB",
"session_id": "sess-007",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
confirm_response = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "need approval"},
)
approval_id = confirm_response.json()["data"]["approval_id"]
cancel_response = client.post(
f"/api/agent/tasks/{task_id}/cancel",
json={"reason": "manual stop before approval"},
)
assert cancel_response.status_code == 200
assert cancel_response.json()["data"]["task_status"] == "CANCELLED"
decision_response = client.post(
f"/api/demo/approval/requests/{approval_id}/decision",
json={
"decision": "APPROVED",
"comment": "approved too late",
"operator": {"user_id": "u2001", "user_name": "bob"},
},
)
assert decision_response.status_code == 409
assert decision_response.json()["code"] == "CONFLICT"
def test_approval_decision_twice_returns_conflict() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to prod",
"channel": "WEB",
"session_id": "sess-007a",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
confirm_response = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "need approval"},
)
approval_id = confirm_response.json()["data"]["approval_id"]
first_decision = client.post(
f"/api/demo/approval/requests/{approval_id}/decision",
json={
"decision": "APPROVED",
"comment": "approved",
"operator": {"user_id": "u2001", "user_name": "bob"},
},
)
assert first_decision.status_code == 200
second_decision = client.post(
f"/api/demo/approval/requests/{approval_id}/decision",
json={
"decision": "APPROVED",
"comment": "approved twice",
"operator": {"user_id": "u2001", "user_name": "bob"},
},
)
assert second_decision.status_code == 409
assert second_decision.json()["code"] == "CONFLICT"
def test_duplicate_edge_report_returns_conflict() -> None:
with TestClient(app) as client:
client.post(
"/api/agent/edge/heartbeat",
json={
"edge_id": "edge-shanghai-001",
"hostname": "customer-host-01",
"os_type": "WINDOWS",
"agent_version": "0.1.0",
"capabilities": ["http_health_check"],
},
)
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-008",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
pull_response = client.post(
"/api/agent/edge/tasks/pull",
json={"edge_id": "edge-shanghai-001", "max_tasks": 200},
)
step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id][0]
first_report = client.post(
"/api/agent/edge/tasks/report",
json={
"edge_id": "edge-shanghai-001",
"task_id": task_id,
"step_id": step["step_id"],
"tool_name": step["tool_name"],
"success": True,
"code": "OK",
"message": "200 OK",
"data": {"status_code": 200},
"evidence": {"response_body": "{\"status\":\"UP\"}"},
"started_at": "2026-04-08 20:20:00.000",
"finished_at": "2026-04-08 20:20:00.100",
},
)
assert first_report.status_code == 200
second_report = client.post(
"/api/agent/edge/tasks/report",
json={
"edge_id": "edge-shanghai-001",
"task_id": task_id,
"step_id": step["step_id"],
"tool_name": step["tool_name"],
"success": True,
"code": "OK",
"message": "200 OK",
"data": {"status_code": 200},
"evidence": {"response_body": "{\"status\":\"UP\"}"},
"started_at": "2026-04-08 20:20:00.000",
"finished_at": "2026-04-08 20:20:00.100",
},
)
assert second_report.status_code == 409
assert second_report.json()["code"] == "CONFLICT"
def test_edge_report_with_wrong_edge_id_returns_conflict() -> None:
with TestClient(app) as client:
client.post(
"/api/agent/edge/heartbeat",
json={
"edge_id": "edge-shanghai-001",
"hostname": "customer-host-01",
"os_type": "WINDOWS",
"agent_version": "0.1.0",
"capabilities": ["http_health_check"],
},
)
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-008a",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
pull_response = client.post(
"/api/agent/edge/tasks/pull",
json={"edge_id": "edge-shanghai-001", "max_tasks": 200},
)
step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id][0]
wrong_report = client.post(
"/api/agent/edge/tasks/report",
json={
"edge_id": "edge-beijing-999",
"task_id": task_id,
"step_id": step["step_id"],
"tool_name": step["tool_name"],
"success": True,
"code": "OK",
"message": "200 OK",
"data": {"status_code": 200},
"evidence": {"response_body": "{\"status\":\"UP\"}"},
"started_at": "2026-04-08 20:20:00.000",
"finished_at": "2026-04-08 20:20:00.100",
},
)
assert wrong_report.status_code == 409
assert wrong_report.json()["code"] == "CONFLICT"
def test_cancel_twice_returns_conflict() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-008b",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
first_cancel = client.post(
f"/api/agent/tasks/{task_id}/cancel",
json={"reason": "manual stop"},
)
assert first_cancel.status_code == 200
second_cancel = client.post(
f"/api/agent/tasks/{task_id}/cancel",
json={"reason": "manual stop again"},
)
assert second_cancel.status_code == 409
assert second_cancel.json()["code"] == "CONFLICT"
def test_task_fails_when_software_a_deploy_fails() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy fail-service 1.2.3-fail to test",
"channel": "WEB",
"session_id": "sess-009",
"tenant_id": "tenant-demo",
"context": {},
},
)
assert create_response.status_code == 200
task_id = create_response.json()["data"]["task_id"]
confirm_response = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
assert confirm_response.status_code == 200
assert confirm_response.json()["data"]["task_status"] == "FAILED"
assert confirm_response.json()["data"]["software_a_task_status"] == "FAILED"
get_response = client.get(f"/api/agent/tasks/{task_id}")
assert get_response.status_code == 200
assert get_response.json()["data"]["task_status"] == "FAILED"
assert get_response.json()["data"]["software_a_task_status"] == "FAILED"
assert get_response.json()["data"]["result_summary_detail"]["final_status"] == "FAILED"
assert get_response.json()["data"]["result_summary_detail"]["software_a"]["error_detail"] is not None
pull_response = client.post(
"/api/agent/edge/tasks/pull",
json={"edge_id": "edge-shanghai-001", "max_tasks": 200},
)
assert pull_response.status_code == 200
assert all(item["task_id"] != task_id for item in pull_response.json()["data"]["tasks"])
def test_demo_software_a_endpoint_can_return_failed_task() -> None:
with TestClient(app) as client:
deploy_response = client.post(
"/api/demo/software-a/deploy-tasks",
json={
"operator": {"user_id": "u1001", "user_name": "alice"},
"tenant_id": "tenant-demo",
"app_code": "fail-service",
"env": "test",
"version": "1.2.3-fail",
"target_nodes": ["10.0.0.12"],
"deploy_options": {"graceful": True},
},
)
assert deploy_response.status_code == 200
assert deploy_response.json()["data"]["task_status"] == "FAILED"
software_a_task_id = deploy_response.json()["data"]["software_a_task_id"]
query_response = client.get(f"/api/demo/software-a/deploy-tasks/{software_a_task_id}")
assert query_response.status_code == 200
assert query_response.json()["data"]["task_status"] == "FAILED"
assert query_response.json()["data"]["error_detail"] is not None
def test_high_risk_task_can_be_rejected() -> None:
with TestClient(app) as client:
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to prod",
"channel": "WEB",
"session_id": "sess-010",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
confirm_response = client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "need approval"},
)
approval_id = confirm_response.json()["data"]["approval_id"]
decision_response = client.post(
f"/api/demo/approval/requests/{approval_id}/decision",
json={
"decision": "REJECTED",
"comment": "rejected",
"operator": {"user_id": "u2001", "user_name": "bob"},
},
)
assert decision_response.status_code == 200
assert decision_response.json()["data"]["approval_status"] == "REJECTED"
get_task_response = client.get(f"/api/agent/tasks/{task_id}")
assert get_task_response.status_code == 200
assert get_task_response.json()["data"]["task_status"] == "CANCELLED"
assert get_task_response.json()["data"]["approval_status"] == "REJECTED"
report_response = client.get(f"/api/agent/tasks/{task_id}/report")
assert report_response.status_code == 200
report_payload = report_response.json()["data"]
assert report_payload["task_metrics"]["audit_failure_count"] >= 1
assert report_payload["audit_summary"]["result_counts"]["REJECTED"] >= 1
def test_edge_failure_marks_task_failed() -> None:
with TestClient(app) as client:
client.post(
"/api/agent/edge/heartbeat",
json={
"edge_id": "edge-shanghai-001",
"hostname": "customer-host-01",
"os_type": "WINDOWS",
"agent_version": "0.1.0",
"capabilities": ["http_health_check"],
},
)
create_response = client.post(
"/api/agent/tasks",
json={
"input_text": "deploy order-service 1.2.3 to test",
"channel": "WEB",
"session_id": "sess-011",
"tenant_id": "tenant-demo",
"context": {},
},
)
task_id = create_response.json()["data"]["task_id"]
client.post(
f"/api/agent/tasks/{task_id}/confirm",
json={"confirmed": True, "comment": "confirm"},
)
pull_response = client.post(
"/api/agent/edge/tasks/pull",
json={"edge_id": "edge-shanghai-001", "max_tasks": 200},
)
step = [item for item in pull_response.json()["data"]["tasks"] if item["task_id"] == task_id and item["tool_name"] == "http_health_check"][0]
report_response = client.post(
"/api/agent/edge/tasks/report",
json={
"edge_id": "edge-shanghai-001",
"task_id": task_id,
"step_id": step["step_id"],
"tool_name": step["tool_name"],
"success": False,
"code": "HTTP_500",
"message": "health check failed",
"data": {"status_code": 500},
"evidence": {"response_body": "{\"status\":\"DOWN\"}"},
"started_at": "2026-04-08 20:20:00.000",
"finished_at": "2026-04-08 20:20:00.100",
},
)
assert report_response.status_code == 200
assert report_response.json()["data"]["task_status"] == "FAILED"
get_response = client.get(f"/api/agent/tasks/{task_id}")
assert get_response.status_code == 200
assert get_response.json()["data"]["task_status"] == "FAILED"
assert get_response.json()["data"]["verification_result"]["http_ok"] is False
assert get_response.json()["data"]["result_summary_detail"]["verification"]["success"] is False
assert get_response.json()["data"]["result_summary_detail"]["final_reason"] == "health check failed"
remaining_pull_response = client.post(
"/api/agent/edge/tasks/pull",
json={"edge_id": "edge-shanghai-001", "max_tasks": 200},
)
assert remaining_pull_response.status_code == 200
assert all(item["task_id"] != task_id for item in remaining_pull_response.json()["data"]["tasks"])