Files
agents_dashboard/tests/test_connect_e2e.py
T
egutierrez 9cade2f2f8 fix(crash): init data_table State.col_visible + col_order before render
Pressing Refresh Agents (or Test Connection — both trigger fetch + table
re-render) crashed the app with Windows exit code 5 (access violation).

Root cause: agents_tbl_state was default-constructed, so
State.col_visible (std::vector<bool>) and State.col_order
(std::vector<int>) were empty. render_grid_stage0 indexes them by column
index up to N_COLS=11 without bounds checking → undefined behaviour →
segfault on the first render after agents data populated.

Fix: at first render of the agents panel, assign col_visible=true * N_COLS,
fill col_order with [0..N_COLS), and ensure stages.size() >= 1. Same
pattern tql_apply.cpp uses (col_visible.assign(eff_cols, true)).

Diagnostic infra added (kept in place — minimal overhead):
- FN_DBG macro: fprintf(stderr, ...) + fflush. Survives crashes that
  fn_log's buffered file output doesn't.
- --auto-refresh CLI flag: triggers fetch_agents_async at frame 30,
  auto-exits at frame 180 (~3s @ 60Hz). Headless smoke for CI.
- DBG breadcrumbs through main → load_apikey → fn::run_app → render →
  fetch_agents_async (thread enter/request/response/parse/exit) → render
  table (pre/post). Each step flushes stderr immediately.

E2E regression guard: test_app_survives_auto_refresh_cycle. Runs the .exe
with --auto-refresh, asserts exit 0, asserts the breadcrumb chain reaches
both "fetch thread parsed" and "agents_panel POST-render" in stderr. 25
tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 23:31:25 +02:00

550 lines
20 KiB
Python

"""
E2E connect tests for agents_dashboard against the live backend.
Runs the deployed .exe with --connect-test, which exercises the SAME
fn_http code path the GUI uses (curl spawn via CreateProcessW on Windows).
Verifies:
* /health → 200 sin auth
* /agents → 401 sin Bearer
* /agents → 200 + JSON array con Bearer
Skips if:
* AGENTS_API_KEY env var missing (run with `pass agentes/api-key`)
* Deployed exe not found
Run from repo root:
AGENTS_API_KEY=$(pass agentes/api-key) \\
python3 -m pytest -x -q projects/element_agents/apps/agents_dashboard/tests/test_connect_e2e.py
"""
from __future__ import annotations
import json
import os
import shutil
import subprocess
from pathlib import Path
import pytest
DEFAULT_EXE = Path("/mnt/c/Users/lucas/Desktop/apps/agents_dashboard/agents_dashboard.exe")
DEFAULT_URL = "https://agents.organic-machine.com"
def _exe() -> Path:
p = Path(os.environ.get("AGENTS_DASHBOARD_EXE", DEFAULT_EXE))
if not p.exists():
pytest.skip(f"deployed exe not found: {p}")
return p
def _url() -> str:
return os.environ.get("AGENTS_DASHBOARD_URL", DEFAULT_URL)
def _apikey() -> str:
k = os.environ.get("AGENTS_API_KEY", "").strip()
if not k:
pytest.skip("AGENTS_API_KEY env var missing (try `pass agentes/api-key`)")
return k
def _run_connect_test(url: str, env: dict | None = None) -> subprocess.CompletedProcess:
"""Invoke .exe --connect-test with WSLENV so env propagates to Windows binary."""
full_env = os.environ.copy()
if env:
full_env.update(env)
# WSLENV: comma/colon-separated list of env names to forward to Win32
keep = full_env.get("WSLENV", "")
add = "AGENTS_API_KEY"
full_env["WSLENV"] = f"{keep}:{add}" if keep else add
return subprocess.run(
[str(_exe()), "--connect-test", url],
capture_output=True,
text=True,
timeout=30,
env=full_env,
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_connect_succeeds_with_valid_apikey():
"""OK <N> on stdout, exit 0, when env + URL + apikey are valid."""
_apikey() # skip if missing
r = _run_connect_test(_url())
assert r.returncode == 0, f"exit={r.returncode}\nstdout={r.stdout}\nstderr={r.stderr}"
assert r.stdout.startswith("OK "), f"stdout=[{r.stdout!r}]"
n = int(r.stdout.strip().split()[1])
assert n > 0, f"expected at least 1 agent, got {n}"
def test_app_survives_auto_refresh_cycle():
"""Regression: app must NOT crash on Refresh Agents button click.
Bug history: v0.2 migration to data_table_cpp_viz left State.col_visible
and State.col_order uninitialized — render_grid_stage0 indexed into empty
std::vector<bool>, causing an access violation (Windows exit code 5).
The --auto-refresh CLI flag triggers fetch_agents_async + a full render
cycle from a headless GLFW window, then exits at frame 180 (~3s @ 60Hz).
Exit 0 means the agents panel rendered the live data without crashing.
"""
pass_check = subprocess.run(["pass", "agentes/api-key"],
capture_output=True, text=True, timeout=5)
if pass_check.returncode != 0 or not pass_check.stdout.strip():
pytest.skip("pass agentes/api-key not readable (GPG locked?)")
# WSL → Windows: launch the .exe and let it self-exit after 180 frames.
r = subprocess.run(
[str(_exe()), "--auto-refresh"],
capture_output=True, text=True, timeout=30,
)
assert r.returncode == 0, (
f"app crashed (exit={r.returncode}); last stderr:\n"
+ "\n".join(r.stderr.splitlines()[-20:])
)
# Sanity: stderr must show that fetch_agents reached the parse step.
assert "fetch thread parsed" in r.stderr, (
f"fetch never reached parse; stderr:\n{r.stderr[-1000:]}"
)
# Sanity: render must have completed at least once (POST-render logged).
assert "agents_panel POST-render" in r.stderr, (
f"render_grid_stage0 crashed before completing; stderr:\n{r.stderr[-1000:]}"
)
def test_connect_falls_back_to_pass_when_env_empty():
"""When AGENTS_API_KEY env is empty, the .exe must fetch apikey via
`wsl.exe pass agentes/api-key` (or `pass` on Linux). This is what makes
launching from the App Hub work without manual env injection.
Skipped if `pass agentes/api-key` itself can't be read (GPG locked).
"""
# Verify pass is unlocked before testing the fallback
pass_check = subprocess.run(
["pass", "agentes/api-key"],
capture_output=True, text=True, timeout=5,
)
if pass_check.returncode != 0 or not pass_check.stdout.strip():
pytest.skip("pass agentes/api-key not readable (GPG locked?)")
# Force-empty AGENTS_API_KEY + bypass WSLENV propagation
r = subprocess.run(
[str(_exe()), "--connect-test", _url()],
capture_output=True,
text=True,
timeout=30,
env={**os.environ, "AGENTS_API_KEY": "", "WSLENV": ""},
)
assert r.returncode == 0, f"pass fallback failed: stdout={r.stdout!r} stderr={r.stderr!r}"
assert r.stdout.startswith("OK "), f"stdout=[{r.stdout!r}]"
def test_connect_fails_on_bad_host():
"""Transport-level failure on unresolvable host."""
_apikey()
r = _run_connect_test("https://this-host-does-not-exist-xyzzy.invalid")
assert r.returncode != 0
assert "FAIL" in r.stderr, f"stderr=[{r.stderr!r}]"
def test_count_matches_direct_curl():
"""N agents returned by the .exe must equal what /agents returns via curl."""
apikey = _apikey()
# Direct curl from WSL via Windows curl.exe (matches what fn_http uses).
curl_bin = shutil.which("curl.exe") or "/mnt/c/Windows/System32/curl.exe"
direct = subprocess.run(
[
curl_bin, "-fsS",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/agents",
],
capture_output=True, text=True, timeout=15,
)
assert direct.returncode == 0, f"direct curl failed: {direct.stderr}"
expected = len(json.loads(direct.stdout))
r = _run_connect_test(_url())
assert r.returncode == 0
got = int(r.stdout.strip().split()[1])
assert got == expected, f"exe says {got} agents, curl says {expected}"
def test_url_trim_robust_to_whitespace():
"""URL with leading/trailing whitespace + CR/LF must still connect."""
_apikey()
# Wrap URL with whitespace + CRLF; trim_url() in main.cpp must strip.
url = " \r\n " + _url() + " \r\n\t"
r = _run_connect_test(url)
assert r.returncode == 0, f"trim failed: stderr={r.stderr!r}"
assert r.stdout.startswith("OK ")
# ---------------------------------------------------------------------------
# Backend direct: shape contract + SSE + control endpoints
# ---------------------------------------------------------------------------
def _curl(args: list[str], timeout: int = 15) -> subprocess.CompletedProcess:
curl_bin = shutil.which("curl.exe") or "/mnt/c/Windows/System32/curl.exe"
return subprocess.run([curl_bin, *args], capture_output=True, text=True, timeout=timeout)
def test_agents_shape_has_fields_used_by_ui():
"""Backend /agents response must carry the fields parse_agents() reads."""
apikey = _apikey()
r = _curl([
"-fsS",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/agents",
])
assert r.returncode == 0, r.stderr
agents = json.loads(r.stdout)
assert isinstance(agents, list) and len(agents) > 0
required = {"id", "name", "enabled", "running"}
for a in agents:
missing = required - set(a.keys())
assert not missing, f"agent {a.get('id')} missing fields {missing}: {a}"
assert isinstance(a["enabled"], bool)
assert isinstance(a["running"], bool)
def test_sse_status_streams_not_unsupported():
"""/sse/status must return text/event-stream, NOT plain 'streaming unsupported'.
Regression: statusWriter wrapper used to hide http.Flusher → SSE handlers
aborted with 500 'streaming unsupported'. Fixed in agents_and_robots
commit 4822208 by adding statusWriter.Flush().
"""
apikey = _apikey()
# -m 3 = max 3s; SSE keeps the connection open, so we capture initial bytes
# and stop. -i = include response headers.
r = _curl([
"-sN", "-i", "-m", "3",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/sse/status",
], timeout=8)
body = r.stdout
assert "streaming unsupported" not in body, f"SSE not flusher-aware: {body[:300]!r}"
# Should see SSE content-type header
assert "text/event-stream" in body.lower(), f"no SSE content-type: {body[:300]!r}"
def test_sse_logs_streams_not_unsupported():
"""/sse/agents/{id}/logs idem."""
apikey = _apikey()
r = _curl([
"-sN", "-i", "-m", "3",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/sse/agents/assistant-bot/logs",
], timeout=8)
body = r.stdout
assert "streaming unsupported" not in body, f"SSE not flusher-aware: {body[:300]!r}"
assert "text/event-stream" in body.lower(), f"no SSE content-type: {body[:300]!r}"
def test_sse_status_emits_initial_ping_within_1s():
"""Backend MUST emit ":ping" immediately after headers (regression guard).
Without this, agents_dashboard sat on "connecting" indefinitely because
fgets() blocked waiting for the first body byte.
"""
apikey = _apikey()
r = _curl([
"-sN", "-m", "2",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/sse/status",
], timeout=5)
# Body should contain at minimum the ping comment within 2s window
assert ": ping" in r.stdout, f"no initial ping in 2s: {r.stdout[:300]!r}"
def test_sse_logs_emits_initial_ping_within_1s():
apikey = _apikey()
r = _curl([
"-sN", "-m", "2",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/sse/agents/assistant-bot/logs",
], timeout=5)
assert ": ping" in r.stdout, f"no initial ping in 2s: {r.stdout[:300]!r}"
def test_sse_logs_actually_streams_log_lines():
"""Beyond the ping, the log SSE must emit `event: log` frames with data.
assistant-bot is a long-running agent that emits log lines frequently,
so we collect a 2s window and expect at least one log event.
"""
apikey = _apikey()
r = _curl([
"-sN", "-m", "2",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/sse/agents/assistant-bot/logs",
], timeout=5)
assert "event: log" in r.stdout, f"no log events in 2s window: {r.stdout[:500]!r}"
def test_sse_unauthorized_without_bearer():
"""SSE endpoints respect the same auth middleware as REST endpoints."""
r = _curl([
"-s", "-o", "/dev/null", "-w", "%{http_code}",
f"{_url()}/sse/status",
])
assert r.stdout == "401", f"expected 401 got {r.stdout!r}"
def test_health_no_auth_required():
"""/health must respond 200 without any Authorization header."""
r = _curl(["-fsS", f"{_url()}/health"])
assert r.returncode == 0, r.stderr
body = json.loads(r.stdout)
assert body.get("status") == "ok"
def test_agents_unauthorized_without_bearer():
"""/agents must return 401 without auth."""
r = _curl(["-s", "-o", "/dev/null", "-w", "%{http_code}", f"{_url()}/agents"])
assert r.stdout == "401", f"expected 401 got {r.stdout!r}"
def test_agents_unauthorized_with_bad_bearer():
"""/agents must return 401 with wrong apikey."""
r = _curl([
"-s", "-o", "/dev/null", "-w", "%{http_code}",
"-H", "Authorization: Bearer wrong-key-xxxxx",
f"{_url()}/agents",
])
assert r.stdout == "401", f"expected 401 got {r.stdout!r}"
def test_control_start_endpoint_responds():
"""POST /agents/{id}/start returns JSON (idempotent in unified mode).
Note: in unified mode (current backend), individual start/stop is a partial
no-op — Manager.Start returns success since the goroutine is already running.
Real per-agent control is a v0.2 backend feature (see issue 0131 if filed).
Here we only assert the endpoint is reachable + returns valid JSON.
"""
apikey = _apikey()
r = _curl([
"-sS", "-X", "POST",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/agents/test-bot/start",
])
assert r.returncode == 0, r.stderr
body = json.loads(r.stdout)
# accept either {status: started, ...} or {error: ...} — only assert it's JSON
assert isinstance(body, dict)
def test_get_single_agent_returns_logs():
"""GET /agents/{id} must include 'logs' array (might be empty)."""
apikey = _apikey()
r = _curl([
"-fsS",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/agents/assistant-bot",
])
assert r.returncode == 0, r.stderr
body = json.loads(r.stdout)
assert body.get("id") == "assistant-bot"
assert "logs" in body
assert isinstance(body["logs"], list)
# ---------------------------------------------------------------------------
# v0.2 — per-agent control + new fields (issue 0131)
# ---------------------------------------------------------------------------
def test_uptime_field_present():
"""GET /agents response must include uptime_seconds field (integer >= 0)."""
apikey = _apikey()
r = _curl([
"-fsS",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/agents",
])
assert r.returncode == 0, r.stderr
agents = json.loads(r.stdout)
assert len(agents) > 0, "no agents returned"
for a in agents:
assert "uptime_seconds" in a, f"agent {a.get('id')} missing uptime_seconds"
assert isinstance(a["uptime_seconds"], int) and a["uptime_seconds"] >= 0, \
f"agent {a.get('id')} uptime_seconds={a['uptime_seconds']!r} not a non-negative int"
def test_msg_24h_field_present():
"""GET /agents response must include messages_24h field (integer >= 0)."""
apikey = _apikey()
r = _curl([
"-fsS",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/agents",
])
assert r.returncode == 0, r.stderr
agents = json.loads(r.stdout)
assert len(agents) > 0, "no agents returned"
for a in agents:
assert "messages_24h" in a, f"agent {a.get('id')} missing messages_24h"
assert isinstance(a["messages_24h"], int) and a["messages_24h"] >= 0, \
f"agent {a.get('id')} messages_24h={a['messages_24h']!r} not a non-negative int"
def test_clear_memory_requires_apikey():
"""POST /agents/{id}/clear_memory must return 401 without Bearer token."""
r = _curl([
"-s", "-o", "/dev/null", "-w", "%{http_code}",
"-X", "POST",
f"{_url()}/agents/test-bot/clear_memory",
])
assert r.stdout == "401", f"expected 401 got {r.stdout!r}"
def test_delete_cache_requires_apikey():
"""POST /agents/{id}/delete_cache must return 401 without Bearer token."""
r = _curl([
"-s", "-o", "/dev/null", "-w", "%{http_code}",
"-X", "POST",
f"{_url()}/agents/test-bot/delete_cache",
])
assert r.stdout == "401", f"expected 401 got {r.stdout!r}"
def test_control_roundtrip():
"""Stop test-bot → poll running=false → start → restart.
SECURITY: Only test-bot is used here. Never call stop on real agents
during E2E. test-bot is a stateless robot with no ongoing conversations.
"""
apikey = _apikey()
BASE = _url()
def get_running(agent_id: str) -> bool:
r = _curl([
"-fsS",
"-H", f"Authorization: Bearer {apikey}",
f"{BASE}/agents/{agent_id}",
])
return bool(json.loads(r.stdout).get("running", False))
# --- Stop ---
r = _curl([
"-sS", "-X", "POST",
"-H", f"Authorization: Bearer {apikey}",
f"{BASE}/agents/test-bot/stop",
])
assert r.returncode == 0, f"stop failed: {r.stderr}"
body = json.loads(r.stdout)
# Accept "stopped" or "not_running" as valid outcomes
assert body.get("status") in ("stopped", "not_running", "ok"), f"unexpected stop status: {body}"
# Poll until running=false (max 3s)
import time
for _ in range(15):
if not get_running("test-bot"):
break
time.sleep(0.2)
assert not get_running("test-bot"), "test-bot still running after stop"
# --- Start ---
r = _curl([
"-sS", "-X", "POST",
"-H", f"Authorization: Bearer {apikey}",
f"{BASE}/agents/test-bot/start",
])
assert r.returncode == 0, f"start failed: {r.stderr}"
body = json.loads(r.stdout)
assert body.get("status") in ("started", "ok"), f"unexpected start status: {body}"
# Poll until running=true (max 5s)
for _ in range(25):
if get_running("test-bot"):
break
time.sleep(0.2)
assert get_running("test-bot"), "test-bot not running after start"
# --- Restart ---
r = _curl([
"-sS", "-X", "POST",
"-H", f"Authorization: Bearer {apikey}",
f"{BASE}/agents/test-bot/restart",
])
assert r.returncode == 0, f"restart failed: {r.stderr}"
body = json.loads(r.stdout)
assert body.get("status") in ("restarted", "ok"), f"unexpected restart status: {body}"
# After restart test-bot must be running again (max 5s)
for _ in range(25):
if get_running("test-bot"):
break
time.sleep(0.2)
assert get_running("test-bot"), "test-bot not running after restart"
def test_unified_stop_does_not_kill_launcher():
"""Stopping test-bot must leave assistant-bot (and other real agents) running.
In unified mode, all agents are goroutines. Per-agent stop must cancel
only the target goroutine — not the launcher process.
SECURITY: Only test-bot is stopped. assistant-bot is only read, never mutated.
"""
apikey = _apikey()
BASE = _url()
def get_running(agent_id: str) -> bool:
r = _curl([
"-fsS",
"-H", f"Authorization: Bearer {apikey}",
f"{BASE}/agents/{agent_id}",
])
if r.returncode != 0:
return False
return bool(json.loads(r.stdout).get("running", False))
# Stop test-bot only
r = _curl([
"-sS", "-X", "POST",
"-H", f"Authorization: Bearer {apikey}",
f"{BASE}/agents/test-bot/stop",
])
assert r.returncode == 0, f"stop test-bot failed: {r.stderr}"
# assistant-bot must still be running (launcher is alive)
assert get_running("assistant-bot"), \
"assistant-bot not running after stopping test-bot — launcher may have crashed"
# Restore test-bot
_curl([
"-sS", "-X", "POST",
"-H", f"Authorization: Bearer {apikey}",
f"{BASE}/agents/test-bot/start",
])
def test_clear_memory_response_shape():
"""POST /agents/{id}/clear_memory (authorized) returns JSON with expected fields.
Uses test-bot which has minimal/no memory, so deleting is always safe.
Verifies: status, messages_deleted, facts_deleted keys present.
"""
apikey = _apikey()
r = _curl([
"-sS", "-X", "POST",
"-H", f"Authorization: Bearer {apikey}",
f"{_url()}/agents/test-bot/clear_memory",
])
assert r.returncode == 0, f"clear_memory failed: {r.stderr}"
body = json.loads(r.stdout)
assert body.get("status") == "cleared", f"expected status=cleared: {body}"
assert "messages_deleted" in body, f"missing messages_deleted: {body}"
assert "facts_deleted" in body, f"missing facts_deleted: {body}"
assert isinstance(body["messages_deleted"], int), \
f"messages_deleted not int: {body['messages_deleted']!r}"
assert isinstance(body["facts_deleted"], int), \
f"facts_deleted not int: {body['facts_deleted']!r}"