""" test_mock_server.py — Mock server tests for agents_dashboard. Emulates the agents_and_robots HTTP API (issue 0128) to verify: - GET /health → 200 - GET /agents → 200 with agent list - POST /agents/{id}/start|stop|restart → 200 - GET /sse/agents/{id}/logs → text/event-stream - GET /sse/status → text/event-stream These tests validate the mock server itself (used for CI headless runs). The C++ app is validated via --self-test and cmake --build in e2e_checks. Requirements: pip install pytest flask requests (or uv add pytest flask requests) Run: python3 -m pytest tests/ -x -q """ import json import threading import time import pytest try: from flask import Flask, Response, jsonify, request as flask_request import requests DEPS_AVAILABLE = True except ImportError: DEPS_AVAILABLE = False AGENTS = [ {"id": "assistant-bot", "name": "Assistant Bot", "status": "running", "uptime_seconds": 3600, "messages_24h": 42}, {"id": "monitor-bot", "name": "Monitor Bot", "status": "running", "uptime_seconds": 7200, "messages_24h": 15}, {"id": "test-bot", "name": "Test Bot", "status": "stopped", "uptime_seconds": 0, "messages_24h": 0}, {"id": "deploy-bot", "name": "Deploy Bot", "status": "running", "uptime_seconds": 1800, "messages_24h": 8}, {"id": "alert-bot", "name": "Alert Bot", "status": "crashed", "uptime_seconds": 0, "messages_24h": 3}, {"id": "backup-bot", "name": "Backup Bot", "status": "running", "uptime_seconds": 86400,"messages_24h": 2}, {"id": "notify-bot", "name": "Notify Bot", "status": "running", "uptime_seconds": 14400,"messages_24h": 22}, {"id": "scheduler-bot", "name": "Scheduler Bot", "status": "running", "uptime_seconds": 5400, "messages_24h": 11}, {"id": "analytics-bot", "name": "Analytics Bot", "status": "stopped", "uptime_seconds": 0, "messages_24h": 0}, {"id": "gateway-bot", "name": "Gateway Bot", "status": "running", "uptime_seconds": 10800,"messages_24h": 67}, {"id": "health-check-bot", "name": "Health Check Bot", "status": "running", "uptime_seconds": 3200, "messages_24h": 5}, ] TEST_APIKEY = "test-apikey-abc123" TEST_PORT = 18499 # high port, unlikely to conflict def create_app(): app = Flask("agents_mock") app.config["TESTING"] = True agent_states = {a["id"]: dict(a) for a in AGENTS} def check_auth(): auth = flask_request.headers.get("Authorization", "") if auth != f"Bearer {TEST_APIKEY}": return False return True @app.route("/health") def health(): if not check_auth(): return jsonify({"error": "unauthorized"}), 401 return jsonify({"status": "ok", "version": "0.1.0-mock"}) @app.route("/agents") def list_agents(): if not check_auth(): return jsonify({"error": "unauthorized"}), 401 return jsonify(list(agent_states.values())) @app.route("/agents//start", methods=["POST"]) def start_agent(agent_id): if not check_auth(): return jsonify({"error": "unauthorized"}), 401 if agent_id not in agent_states: return jsonify({"error": "not found"}), 404 agent_states[agent_id]["status"] = "running" agent_states[agent_id]["uptime_seconds"] = 0 return jsonify({"ok": True, "agent_id": agent_id, "action": "start"}) @app.route("/agents//stop", methods=["POST"]) def stop_agent(agent_id): if not check_auth(): return jsonify({"error": "unauthorized"}), 401 if agent_id not in agent_states: return jsonify({"error": "not found"}), 404 agent_states[agent_id]["status"] = "stopped" agent_states[agent_id]["uptime_seconds"] = 0 return jsonify({"ok": True, "agent_id": agent_id, "action": "stop"}) @app.route("/agents//restart", methods=["POST"]) def restart_agent(agent_id): if not check_auth(): return jsonify({"error": "unauthorized"}), 401 if agent_id not in agent_states: return jsonify({"error": "not found"}), 404 agent_states[agent_id]["status"] = "running" agent_states[agent_id]["uptime_seconds"] = 0 return jsonify({"ok": True, "agent_id": agent_id, "action": "restart"}) @app.route("/sse/agents//logs") def sse_agent_logs(agent_id): if not check_auth(): return Response("unauthorized", status=401) def generate(): sample_logs = [ f"[INFO] Agent {agent_id} started", f"[INFO] Processing message queue", f"[DEBUG] Heartbeat OK", f"[INFO] Handled event room.message", ] for i, line in enumerate(sample_logs): yield f"data: {line}\n\n" time.sleep(0.05) # Keep stream open briefly time.sleep(0.2) return Response(generate(), content_type="text/event-stream") @app.route("/sse/status") def sse_status(): if not check_auth(): return Response("unauthorized", status=401) def generate(): events = [ '{"type":"agent_started","agent_id":"monitor-bot"}', '{"type":"heartbeat","agents_running":8}', ] for ev in events: yield f"data: {ev}\n\n" time.sleep(0.05) time.sleep(0.2) return Response(generate(), content_type="text/event-stream") return app, agent_states # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture(scope="module") def mock_server(): if not DEPS_AVAILABLE: pytest.skip("flask/requests not installed. Run: pip install flask requests") app, agent_states = create_app() server = None started = threading.Event() def run(): import logging log = logging.getLogger("werkzeug") log.setLevel(logging.ERROR) app.run(port=TEST_PORT, threaded=True) t = threading.Thread(target=run, daemon=True) t.start() time.sleep(0.5) # Wait for server to be ready yield {"base_url": f"http://127.0.0.1:{TEST_PORT}", "apikey": TEST_APIKEY, "agent_states": agent_states} # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- def test_health(mock_server): r = requests.get( f"{mock_server['base_url']}/health", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, timeout=5 ) assert r.status_code == 200 data = r.json() assert data["status"] == "ok" def test_health_unauthorized(mock_server): r = requests.get(f"{mock_server['base_url']}/health", timeout=5) assert r.status_code == 401 def test_list_agents(mock_server): r = requests.get( f"{mock_server['base_url']}/agents", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, timeout=5 ) assert r.status_code == 200 agents = r.json() assert isinstance(agents, list) assert len(agents) >= 7 ids = [a["id"] for a in agents] assert "assistant-bot" in ids assert "test-bot" in ids def test_list_agents_fields(mock_server): r = requests.get( f"{mock_server['base_url']}/agents", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, timeout=5 ) agents = r.json() for a in agents: assert "id" in a assert "status" in a assert "uptime_seconds" in a assert "messages_24h" in a assert a["status"] in ("running", "stopped", "crashed") def test_stop_agent(mock_server): r = requests.post( f"{mock_server['base_url']}/agents/test-bot/stop", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, timeout=5 ) assert r.status_code == 200 data = r.json() assert data["ok"] is True # Verify state changed assert mock_server["agent_states"]["test-bot"]["status"] == "stopped" def test_start_agent(mock_server): r = requests.post( f"{mock_server['base_url']}/agents/test-bot/start", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, timeout=5 ) assert r.status_code == 200 data = r.json() assert data["ok"] is True assert mock_server["agent_states"]["test-bot"]["status"] == "running" def test_restart_agent(mock_server): r = requests.post( f"{mock_server['base_url']}/agents/assistant-bot/restart", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, timeout=5 ) assert r.status_code == 200 data = r.json() assert data["ok"] is True def test_action_not_found(mock_server): r = requests.post( f"{mock_server['base_url']}/agents/nonexistent-bot/start", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, timeout=5 ) assert r.status_code == 404 def test_sse_logs_streams(mock_server): r = requests.get( f"{mock_server['base_url']}/sse/agents/assistant-bot/logs", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, stream=True, timeout=5 ) assert r.status_code == 200 assert "event-stream" in r.headers.get("Content-Type", "") lines = [] for chunk in r.iter_lines(chunk_size=None): if chunk: lines.append(chunk.decode()) r.close() data_lines = [l for l in lines if l.startswith("data:")] assert len(data_lines) >= 1 def test_sse_status_streams(mock_server): r = requests.get( f"{mock_server['base_url']}/sse/status", headers={"Authorization": f"Bearer {mock_server['apikey']}"}, stream=True, timeout=5 ) assert r.status_code == 200 lines = [] for chunk in r.iter_lines(): if chunk: lines.append(chunk.decode()) r.close() data_lines = [l for l in lines if l.startswith("data:")] assert len(data_lines) >= 1