""" E2E connect tests for agents_dashboard against the live backend. Runs the deployed .exe with --connect-test, which exercises the SAME fn_http code path the GUI uses (curl spawn via CreateProcessW on Windows). Verifies: * /health → 200 sin auth * /agents → 401 sin Bearer * /agents → 200 + JSON array con Bearer Skips if: * AGENTS_API_KEY env var missing (run with `pass agentes/api-key`) * Deployed exe not found Run from repo root: AGENTS_API_KEY=$(pass agentes/api-key) \\ python3 -m pytest -x -q projects/element_agents/apps/agents_dashboard/tests/test_connect_e2e.py """ from __future__ import annotations import json import os import shutil import subprocess from pathlib import Path import pytest DEFAULT_EXE = Path("/mnt/c/Users/lucas/Desktop/apps/agents_dashboard/agents_dashboard.exe") DEFAULT_URL = "https://agents.organic-machine.com" def _exe() -> Path: p = Path(os.environ.get("AGENTS_DASHBOARD_EXE", DEFAULT_EXE)) if not p.exists(): pytest.skip(f"deployed exe not found: {p}") return p def _url() -> str: return os.environ.get("AGENTS_DASHBOARD_URL", DEFAULT_URL) def _apikey() -> str: k = os.environ.get("AGENTS_API_KEY", "").strip() if not k: pytest.skip("AGENTS_API_KEY env var missing (try `pass agentes/api-key`)") return k def _run_connect_test(url: str, env: dict | None = None) -> subprocess.CompletedProcess: """Invoke .exe --connect-test with WSLENV so env propagates to Windows binary.""" full_env = os.environ.copy() if env: full_env.update(env) # WSLENV: comma/colon-separated list of env names to forward to Win32 keep = full_env.get("WSLENV", "") add = "AGENTS_API_KEY" full_env["WSLENV"] = f"{keep}:{add}" if keep else add return subprocess.run( [str(_exe()), "--connect-test", url], capture_output=True, text=True, timeout=30, env=full_env, ) # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- def test_connect_succeeds_with_valid_apikey(): """OK on stdout, exit 0, when env + URL + apikey are valid.""" _apikey() # skip if missing r = _run_connect_test(_url()) assert r.returncode == 0, f"exit={r.returncode}\nstdout={r.stdout}\nstderr={r.stderr}" assert r.stdout.startswith("OK "), f"stdout=[{r.stdout!r}]" n = int(r.stdout.strip().split()[1]) assert n > 0, f"expected at least 1 agent, got {n}" def test_connect_fails_without_apikey(): """FAIL on stderr, exit 1, when AGENTS_API_KEY is empty.""" # Force-empty AGENTS_API_KEY; bypass WSLENV by clearing it too. r = subprocess.run( [str(_exe()), "--connect-test", _url()], capture_output=True, text=True, timeout=30, env={**os.environ, "AGENTS_API_KEY": "", "WSLENV": "AGENTS_API_KEY"}, ) assert r.returncode != 0 assert "AGENTS_API_KEY" in r.stderr, f"stderr=[{r.stderr!r}]" def test_connect_fails_on_bad_host(): """Transport-level failure on unresolvable host.""" _apikey() r = _run_connect_test("https://this-host-does-not-exist-xyzzy.invalid") assert r.returncode != 0 assert "FAIL" in r.stderr, f"stderr=[{r.stderr!r}]" def test_count_matches_direct_curl(): """N agents returned by the .exe must equal what /agents returns via curl.""" apikey = _apikey() # Direct curl from WSL via Windows curl.exe (matches what fn_http uses). curl_bin = shutil.which("curl.exe") or "/mnt/c/Windows/System32/curl.exe" direct = subprocess.run( [ curl_bin, "-fsS", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/agents", ], capture_output=True, text=True, timeout=15, ) assert direct.returncode == 0, f"direct curl failed: {direct.stderr}" expected = len(json.loads(direct.stdout)) r = _run_connect_test(_url()) assert r.returncode == 0 got = int(r.stdout.strip().split()[1]) assert got == expected, f"exe says {got} agents, curl says {expected}" def test_url_trim_robust_to_whitespace(): """URL with leading/trailing whitespace + CR/LF must still connect.""" _apikey() # Wrap URL with whitespace + CRLF; trim_url() in main.cpp must strip. url = " \r\n " + _url() + " \r\n\t" r = _run_connect_test(url) assert r.returncode == 0, f"trim failed: stderr={r.stderr!r}" assert r.stdout.startswith("OK ") # --------------------------------------------------------------------------- # Backend direct: shape contract + SSE + control endpoints # --------------------------------------------------------------------------- def _curl(args: list[str], timeout: int = 15) -> subprocess.CompletedProcess: curl_bin = shutil.which("curl.exe") or "/mnt/c/Windows/System32/curl.exe" return subprocess.run([curl_bin, *args], capture_output=True, text=True, timeout=timeout) def test_agents_shape_has_fields_used_by_ui(): """Backend /agents response must carry the fields parse_agents() reads.""" apikey = _apikey() r = _curl([ "-fsS", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/agents", ]) assert r.returncode == 0, r.stderr agents = json.loads(r.stdout) assert isinstance(agents, list) and len(agents) > 0 required = {"id", "name", "enabled", "running"} for a in agents: missing = required - set(a.keys()) assert not missing, f"agent {a.get('id')} missing fields {missing}: {a}" assert isinstance(a["enabled"], bool) assert isinstance(a["running"], bool) def test_sse_status_streams_not_unsupported(): """/sse/status must return text/event-stream, NOT plain 'streaming unsupported'. Regression: statusWriter wrapper used to hide http.Flusher → SSE handlers aborted with 500 'streaming unsupported'. Fixed in agents_and_robots commit 4822208 by adding statusWriter.Flush(). """ apikey = _apikey() # -m 3 = max 3s; SSE keeps the connection open, so we capture initial bytes # and stop. -i = include response headers. r = _curl([ "-sN", "-i", "-m", "3", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/sse/status", ], timeout=8) body = r.stdout assert "streaming unsupported" not in body, f"SSE not flusher-aware: {body[:300]!r}" # Should see SSE content-type header assert "text/event-stream" in body.lower(), f"no SSE content-type: {body[:300]!r}" def test_sse_logs_streams_not_unsupported(): """/sse/agents/{id}/logs idem.""" apikey = _apikey() r = _curl([ "-sN", "-i", "-m", "3", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/sse/agents/assistant-bot/logs", ], timeout=8) body = r.stdout assert "streaming unsupported" not in body, f"SSE not flusher-aware: {body[:300]!r}" assert "text/event-stream" in body.lower(), f"no SSE content-type: {body[:300]!r}" def test_sse_status_emits_initial_ping_within_1s(): """Backend MUST emit ":ping" immediately after headers (regression guard). Without this, agents_dashboard sat on "connecting" indefinitely because fgets() blocked waiting for the first body byte. """ apikey = _apikey() r = _curl([ "-sN", "-m", "2", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/sse/status", ], timeout=5) # Body should contain at minimum the ping comment within 2s window assert ": ping" in r.stdout, f"no initial ping in 2s: {r.stdout[:300]!r}" def test_sse_logs_emits_initial_ping_within_1s(): apikey = _apikey() r = _curl([ "-sN", "-m", "2", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/sse/agents/assistant-bot/logs", ], timeout=5) assert ": ping" in r.stdout, f"no initial ping in 2s: {r.stdout[:300]!r}" def test_sse_logs_actually_streams_log_lines(): """Beyond the ping, the log SSE must emit `event: log` frames with data. assistant-bot is a long-running agent that emits log lines frequently, so we collect a 2s window and expect at least one log event. """ apikey = _apikey() r = _curl([ "-sN", "-m", "2", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/sse/agents/assistant-bot/logs", ], timeout=5) assert "event: log" in r.stdout, f"no log events in 2s window: {r.stdout[:500]!r}" def test_sse_unauthorized_without_bearer(): """SSE endpoints respect the same auth middleware as REST endpoints.""" r = _curl([ "-s", "-o", "/dev/null", "-w", "%{http_code}", f"{_url()}/sse/status", ]) assert r.stdout == "401", f"expected 401 got {r.stdout!r}" def test_health_no_auth_required(): """/health must respond 200 without any Authorization header.""" r = _curl(["-fsS", f"{_url()}/health"]) assert r.returncode == 0, r.stderr body = json.loads(r.stdout) assert body.get("status") == "ok" def test_agents_unauthorized_without_bearer(): """/agents must return 401 without auth.""" r = _curl(["-s", "-o", "/dev/null", "-w", "%{http_code}", f"{_url()}/agents"]) assert r.stdout == "401", f"expected 401 got {r.stdout!r}" def test_agents_unauthorized_with_bad_bearer(): """/agents must return 401 with wrong apikey.""" r = _curl([ "-s", "-o", "/dev/null", "-w", "%{http_code}", "-H", "Authorization: Bearer wrong-key-xxxxx", f"{_url()}/agents", ]) assert r.stdout == "401", f"expected 401 got {r.stdout!r}" def test_control_start_endpoint_responds(): """POST /agents/{id}/start returns JSON (idempotent in unified mode). Note: in unified mode (current backend), individual start/stop is a partial no-op — Manager.Start returns success since the goroutine is already running. Real per-agent control is a v0.2 backend feature (see issue 0131 if filed). Here we only assert the endpoint is reachable + returns valid JSON. """ apikey = _apikey() r = _curl([ "-sS", "-X", "POST", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/agents/test-bot/start", ]) assert r.returncode == 0, r.stderr body = json.loads(r.stdout) # accept either {status: started, ...} or {error: ...} — only assert it's JSON assert isinstance(body, dict) def test_get_single_agent_returns_logs(): """GET /agents/{id} must include 'logs' array (might be empty).""" apikey = _apikey() r = _curl([ "-fsS", "-H", f"Authorization: Bearer {apikey}", f"{_url()}/agents/assistant-bot", ]) assert r.returncode == 0, r.stderr body = json.loads(r.stdout) assert body.get("id") == "assistant-bot" assert "logs" in body assert isinstance(body["logs"], list)