8795f2842b
Panels: Connection + Agents + Logs + Status Feed.
- HTTP GET /agents + POST /agents/{id}/{start,stop,restart}
- SSE streaming: /sse/agents/{id}/logs + /sse/status
- DPAPI/XOR credential storage in local_files/agents_dashboard.db
- data_table style agents table with filter + status icons
- SQLite migrations via sqlite3_exec at startup
- --self-test mode: db + secret_store round-trip + subsystem checks
- pytest mock server emulating agents_and_robots API
Registry functions: http_request_cpp_core, sse_client_cpp_core,
secret_store_cpp_infra, logger_cpp_core
App icon: robot phosphor violet-500 (#8b5cf6)
Issue: 0129
Co-Authored-By: fn-orquestador <noreply@fn-registry.local>
296 lines
10 KiB
Python
296 lines
10 KiB
Python
"""
|
|
test_mock_server.py — Mock server tests for agents_dashboard.
|
|
|
|
Emulates the agents_and_robots HTTP API (issue 0128) to verify:
|
|
- GET /health → 200
|
|
- GET /agents → 200 with agent list
|
|
- POST /agents/{id}/start|stop|restart → 200
|
|
- GET /sse/agents/{id}/logs → text/event-stream
|
|
- GET /sse/status → text/event-stream
|
|
|
|
These tests validate the mock server itself (used for CI headless runs).
|
|
The C++ app is validated via --self-test and cmake --build in e2e_checks.
|
|
|
|
Requirements: pip install pytest flask requests (or uv add pytest flask requests)
|
|
Run: python3 -m pytest tests/ -x -q
|
|
"""
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
|
|
import pytest
|
|
|
|
try:
|
|
from flask import Flask, Response, jsonify, request as flask_request
|
|
import requests
|
|
DEPS_AVAILABLE = True
|
|
except ImportError:
|
|
DEPS_AVAILABLE = False
|
|
|
|
|
|
AGENTS = [
|
|
{"id": "assistant-bot", "name": "Assistant Bot", "status": "running", "uptime_seconds": 3600, "messages_24h": 42},
|
|
{"id": "monitor-bot", "name": "Monitor Bot", "status": "running", "uptime_seconds": 7200, "messages_24h": 15},
|
|
{"id": "test-bot", "name": "Test Bot", "status": "stopped", "uptime_seconds": 0, "messages_24h": 0},
|
|
{"id": "deploy-bot", "name": "Deploy Bot", "status": "running", "uptime_seconds": 1800, "messages_24h": 8},
|
|
{"id": "alert-bot", "name": "Alert Bot", "status": "crashed", "uptime_seconds": 0, "messages_24h": 3},
|
|
{"id": "backup-bot", "name": "Backup Bot", "status": "running", "uptime_seconds": 86400,"messages_24h": 2},
|
|
{"id": "notify-bot", "name": "Notify Bot", "status": "running", "uptime_seconds": 14400,"messages_24h": 22},
|
|
{"id": "scheduler-bot", "name": "Scheduler Bot", "status": "running", "uptime_seconds": 5400, "messages_24h": 11},
|
|
{"id": "analytics-bot", "name": "Analytics Bot", "status": "stopped", "uptime_seconds": 0, "messages_24h": 0},
|
|
{"id": "gateway-bot", "name": "Gateway Bot", "status": "running", "uptime_seconds": 10800,"messages_24h": 67},
|
|
{"id": "health-check-bot", "name": "Health Check Bot", "status": "running", "uptime_seconds": 3200, "messages_24h": 5},
|
|
]
|
|
|
|
TEST_APIKEY = "test-apikey-abc123"
|
|
TEST_PORT = 18499 # high port, unlikely to conflict
|
|
|
|
|
|
def create_app():
|
|
app = Flask("agents_mock")
|
|
app.config["TESTING"] = True
|
|
agent_states = {a["id"]: dict(a) for a in AGENTS}
|
|
|
|
def check_auth():
|
|
auth = flask_request.headers.get("Authorization", "")
|
|
if auth != f"Bearer {TEST_APIKEY}":
|
|
return False
|
|
return True
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
if not check_auth():
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
return jsonify({"status": "ok", "version": "0.1.0-mock"})
|
|
|
|
@app.route("/agents")
|
|
def list_agents():
|
|
if not check_auth():
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
return jsonify(list(agent_states.values()))
|
|
|
|
@app.route("/agents/<agent_id>/start", methods=["POST"])
|
|
def start_agent(agent_id):
|
|
if not check_auth():
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
if agent_id not in agent_states:
|
|
return jsonify({"error": "not found"}), 404
|
|
agent_states[agent_id]["status"] = "running"
|
|
agent_states[agent_id]["uptime_seconds"] = 0
|
|
return jsonify({"ok": True, "agent_id": agent_id, "action": "start"})
|
|
|
|
@app.route("/agents/<agent_id>/stop", methods=["POST"])
|
|
def stop_agent(agent_id):
|
|
if not check_auth():
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
if agent_id not in agent_states:
|
|
return jsonify({"error": "not found"}), 404
|
|
agent_states[agent_id]["status"] = "stopped"
|
|
agent_states[agent_id]["uptime_seconds"] = 0
|
|
return jsonify({"ok": True, "agent_id": agent_id, "action": "stop"})
|
|
|
|
@app.route("/agents/<agent_id>/restart", methods=["POST"])
|
|
def restart_agent(agent_id):
|
|
if not check_auth():
|
|
return jsonify({"error": "unauthorized"}), 401
|
|
if agent_id not in agent_states:
|
|
return jsonify({"error": "not found"}), 404
|
|
agent_states[agent_id]["status"] = "running"
|
|
agent_states[agent_id]["uptime_seconds"] = 0
|
|
return jsonify({"ok": True, "agent_id": agent_id, "action": "restart"})
|
|
|
|
@app.route("/sse/agents/<agent_id>/logs")
|
|
def sse_agent_logs(agent_id):
|
|
if not check_auth():
|
|
return Response("unauthorized", status=401)
|
|
|
|
def generate():
|
|
sample_logs = [
|
|
f"[INFO] Agent {agent_id} started",
|
|
f"[INFO] Processing message queue",
|
|
f"[DEBUG] Heartbeat OK",
|
|
f"[INFO] Handled event room.message",
|
|
]
|
|
for i, line in enumerate(sample_logs):
|
|
yield f"data: {line}\n\n"
|
|
time.sleep(0.05)
|
|
# Keep stream open briefly
|
|
time.sleep(0.2)
|
|
|
|
return Response(generate(), content_type="text/event-stream")
|
|
|
|
@app.route("/sse/status")
|
|
def sse_status():
|
|
if not check_auth():
|
|
return Response("unauthorized", status=401)
|
|
|
|
def generate():
|
|
events = [
|
|
'{"type":"agent_started","agent_id":"monitor-bot"}',
|
|
'{"type":"heartbeat","agents_running":8}',
|
|
]
|
|
for ev in events:
|
|
yield f"data: {ev}\n\n"
|
|
time.sleep(0.05)
|
|
time.sleep(0.2)
|
|
|
|
return Response(generate(), content_type="text/event-stream")
|
|
|
|
return app, agent_states
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mock_server():
|
|
if not DEPS_AVAILABLE:
|
|
pytest.skip("flask/requests not installed. Run: pip install flask requests")
|
|
app, agent_states = create_app()
|
|
server = None
|
|
started = threading.Event()
|
|
|
|
def run():
|
|
import logging
|
|
log = logging.getLogger("werkzeug")
|
|
log.setLevel(logging.ERROR)
|
|
app.run(port=TEST_PORT, threaded=True)
|
|
|
|
t = threading.Thread(target=run, daemon=True)
|
|
t.start()
|
|
time.sleep(0.5) # Wait for server to be ready
|
|
yield {"base_url": f"http://127.0.0.1:{TEST_PORT}", "apikey": TEST_APIKEY,
|
|
"agent_states": agent_states}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_health(mock_server):
|
|
r = requests.get(
|
|
f"{mock_server['base_url']}/health",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
timeout=5
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["status"] == "ok"
|
|
|
|
|
|
def test_health_unauthorized(mock_server):
|
|
r = requests.get(f"{mock_server['base_url']}/health", timeout=5)
|
|
assert r.status_code == 401
|
|
|
|
|
|
def test_list_agents(mock_server):
|
|
r = requests.get(
|
|
f"{mock_server['base_url']}/agents",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
timeout=5
|
|
)
|
|
assert r.status_code == 200
|
|
agents = r.json()
|
|
assert isinstance(agents, list)
|
|
assert len(agents) >= 7
|
|
ids = [a["id"] for a in agents]
|
|
assert "assistant-bot" in ids
|
|
assert "test-bot" in ids
|
|
|
|
|
|
def test_list_agents_fields(mock_server):
|
|
r = requests.get(
|
|
f"{mock_server['base_url']}/agents",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
timeout=5
|
|
)
|
|
agents = r.json()
|
|
for a in agents:
|
|
assert "id" in a
|
|
assert "status" in a
|
|
assert "uptime_seconds" in a
|
|
assert "messages_24h" in a
|
|
assert a["status"] in ("running", "stopped", "crashed")
|
|
|
|
|
|
def test_stop_agent(mock_server):
|
|
r = requests.post(
|
|
f"{mock_server['base_url']}/agents/test-bot/stop",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
timeout=5
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["ok"] is True
|
|
# Verify state changed
|
|
assert mock_server["agent_states"]["test-bot"]["status"] == "stopped"
|
|
|
|
|
|
def test_start_agent(mock_server):
|
|
r = requests.post(
|
|
f"{mock_server['base_url']}/agents/test-bot/start",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
timeout=5
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["ok"] is True
|
|
assert mock_server["agent_states"]["test-bot"]["status"] == "running"
|
|
|
|
|
|
def test_restart_agent(mock_server):
|
|
r = requests.post(
|
|
f"{mock_server['base_url']}/agents/assistant-bot/restart",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
timeout=5
|
|
)
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["ok"] is True
|
|
|
|
|
|
def test_action_not_found(mock_server):
|
|
r = requests.post(
|
|
f"{mock_server['base_url']}/agents/nonexistent-bot/start",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
timeout=5
|
|
)
|
|
assert r.status_code == 404
|
|
|
|
|
|
def test_sse_logs_streams(mock_server):
|
|
r = requests.get(
|
|
f"{mock_server['base_url']}/sse/agents/assistant-bot/logs",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
stream=True,
|
|
timeout=5
|
|
)
|
|
assert r.status_code == 200
|
|
assert "event-stream" in r.headers.get("Content-Type", "")
|
|
lines = []
|
|
for chunk in r.iter_lines(chunk_size=None):
|
|
if chunk:
|
|
lines.append(chunk.decode())
|
|
r.close()
|
|
data_lines = [l for l in lines if l.startswith("data:")]
|
|
assert len(data_lines) >= 1
|
|
|
|
|
|
def test_sse_status_streams(mock_server):
|
|
r = requests.get(
|
|
f"{mock_server['base_url']}/sse/status",
|
|
headers={"Authorization": f"Bearer {mock_server['apikey']}"},
|
|
stream=True,
|
|
timeout=5
|
|
)
|
|
assert r.status_code == 200
|
|
lines = []
|
|
for chunk in r.iter_lines():
|
|
if chunk:
|
|
lines.append(chunk.decode())
|
|
r.close()
|
|
data_lines = [l for l in lines if l.startswith("data:")]
|
|
assert len(data_lines) >= 1
|