#!/usr/bin/env python3 """Playground: simulador de rendimiento NATS pub/sub con grafica en vivo. Sirve una pagina (index.html) con un boton y unos sliders. Al pulsar el boton, el navegador abre un WebSocket y pide un benchmark; el servidor lanza 1 publisher que envia miles de mensajes a N subscribers (fan-out) y va emitiendo muestras (enviados, recibidos, throughput) por el WebSocket. El navegador las pinta en un canvas en tiempo real, de modo que la grafica se mueve mientras corre el test. Reutiliza el venv del analisis padre (analysis/nats/.venv, con nats-py). No tiene dependencias propias ni repo propio: vive dentro del sub-repo del analisis. Lanzar: cd analysis/nats/playground ../.venv/bin/python server.py # abrir http://127.0.0.1:7788 """ import asyncio import functools import http.server import json import os import subprocess import threading import time import nats from websockets.asyncio.server import serve HERE = os.path.dirname(os.path.abspath(__file__)) NATS_URL = "nats://127.0.0.1:4222" HTTP_PORT = 7788 WS_PORT = 7879 SUBJECT = "bench.load" PAYLOAD = b"x" * 128 # 128 bytes por mensaje # Limites de seguridad para los parametros que llegan del navegador MAX_MSGS = 500_000 MAX_SUBS = 12 def ensure_nats() -> str: """Arranca el broker NATS en Docker de forma idempotente.""" def docker(*args): return subprocess.run(["docker", *args], capture_output=True, text=True) state = docker("ps", "-a", "--filter", "name=^nats_demo$", "--format", "{{.State}}").stdout.strip() if state == "running": return "already-running" if state in ("exited", "created", "paused"): docker("start", "nats_demo") time.sleep(1.0) return "started" docker("run", "-d", "--name", "nats_demo", "-p", "4222:4222", "-p", "8222:8222", "nats:latest", "-js", "-m", "8222") time.sleep(1.5) return "created" def start_http_server() -> None: """Sirve los archivos estaticos del playground (index.html) en un thread.""" handler = functools.partial(http.server.SimpleHTTPRequestHandler, directory=HERE) http.server.ThreadingHTTPServer.allow_reuse_address = True httpd = http.server.ThreadingHTTPServer(("127.0.0.1", HTTP_PORT), handler) httpd.serve_forever() async def run_benchmark(ws, n_msgs: int, n_subs: int) -> None: """Un publisher envia n_msgs a n_subs subscribers. Emite muestras por el WebSocket.""" nc = await nats.connect(NATS_URL, name="playground-benchmark") counters = [0] * n_subs def make_cb(i): async def cb(_msg): counters[i] += 1 return cb subs = [await nc.subscribe(SUBJECT, cb=make_cb(i)) for i in range(n_subs)] sent = 0 t0 = time.monotonic() async def publish_all(): nonlocal sent for k in range(n_msgs): await nc.publish(SUBJECT, PAYLOAD) sent += 1 if k % 500 == 0: await nc.flush() await asyncio.sleep(0) # ceder al event loop para que corran los callbacks await nc.flush() await ws.send(json.dumps({"type": "start", "n_msgs": n_msgs, "n_subs": n_subs})) task = asyncio.create_task(publish_all()) # Muestreo periodico para la grafica en movimiento while not task.done() or sum(counters) < sent: await asyncio.sleep(0.04) t = time.monotonic() - t0 await ws.send(json.dumps({"type": "sample", "t": round(t, 3), "sent": sent, "recv": sum(counters)})) if t > 60: # tope de seguridad break await task # Drenaje final: dar tiempo a que los callbacks alcancen al publisher for _ in range(60): if sum(counters) >= sent: break await asyncio.sleep(0.05) dur = time.monotonic() - t0 recv = sum(counters) await ws.send(json.dumps({ "type": "done", "t": round(dur, 3), "sent": sent, "recv": recv, "per_sub": counters, "pub_tps": round(n_msgs / dur) if dur else 0, "recv_tps": round(recv / dur) if dur else 0, })) for s in subs: await s.unsubscribe() await nc.drain() async def ws_handler(ws) -> None: async for raw in ws: try: msg = json.loads(raw) except (json.JSONDecodeError, TypeError): continue if msg.get("action") == "start": n_msgs = max(1000, min(int(msg.get("n_msgs", 20000)), MAX_MSGS)) n_subs = max(1, min(int(msg.get("n_subs", 3)), MAX_SUBS)) try: await run_benchmark(ws, n_msgs, n_subs) except Exception as exc: # noqa: BLE001 - reportar cualquier fallo al navegador await ws.send(json.dumps({"type": "error", "msg": f"{type(exc).__name__}: {exc}"})) async def main() -> None: print("broker NATS:", ensure_nats()) threading.Thread(target=start_http_server, daemon=True).start() print(f"HTTP -> http://127.0.0.1:{HTTP_PORT}") print(f"WS -> ws://127.0.0.1:{WS_PORT}") async with serve(ws_handler, "127.0.0.1", WS_PORT): await asyncio.Future() # corre indefinidamente if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nplayground detenido")