diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7e8509a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +playground/playground.log +playground/__pycache__/ diff --git a/analysis.md b/analysis.md index 6aa9962..b779cfe 100644 --- a/analysis.md +++ b/analysis.md @@ -25,7 +25,23 @@ Analisis didactico de **NATS** como sistema de mensajeria pub/sub entre procesos Los scripts `notebooks/procs/publisher.py` y `notebooks/procs/subscriber.py` son los programas que el notebook 03 lanza como procesos reales. -El notebook 04 requiere `ipywidgets` (incluido en el `.venv` del análisis). El simulador es interactivo: al abrir el notebook en JupyterLab, ejecuta sus celdas hasta el widget y pulsa **▶ Ejecutar benchmark** (los sliders ajustan número de mensajes y de subscribers). La gráfica se anima mientras corre. +El notebook 04 requiere `ipywidgets` (incluido en el `.venv` del análisis). El simulador del notebook es interactivo: al abrir el notebook en JupyterLab, ejecuta sus celdas hasta el widget y pulsa **▶ Ejecutar benchmark** (los sliders ajustan número de mensajes y de subscribers). La gráfica se anima mientras corre. + +> Si al abrir el notebook el widget muestra `Error displaying widget: model not found`, **re-ejecuta la celda del widget** en tu kernel (los modelos de `ipywidgets` no se rehidratan desde un kernel anterior). Para una versión interactiva más robusta y sin depender de Jupyter, usa el **playground** (ver abajo). + +## Playground: simulador de rendimiento (webapp) + +`playground/` es una webapp standalone equivalente al simulador del notebook 04, pero sin `ipywidgets`: sirve una página con un botón y unos sliders, y al pulsarlo lanza el benchmark en el servidor y transmite las muestras por WebSocket a un canvas que dibuja la gráfica en movimiento en el navegador. Reutiliza el `.venv` del análisis (con `nats-py` y `websockets`); no tiene dependencias ni repo propios. + +```bash +cd analysis/nats/playground +../.venv/bin/python server.py +# abrir http://127.0.0.1:7788 (WebSocket en 7879) +``` + +Pulsa **▶ Ejecutar benchmark**: un publisher envía N mensajes (slider, hasta 200.000) a M subscribers (slider, hasta 12) y la gráfica muestra en vivo los acumulados de enviados vs recibidos. Verificado: 100.000 msgs → 4 subs = 400.000 entregas en ~1,1 s (fan-out ×4 exacto, ~367.000 entregas/s). + +Archivos: `playground/server.py` (servidor WebSocket + HTTP estático + benchmark NATS) y `playground/index.html` (UI con canvas, sin librerías externas). ### Como usar diff --git a/playground/index.html b/playground/index.html new file mode 100644 index 0000000..990df9c --- /dev/null +++ b/playground/index.html @@ -0,0 +1,221 @@ + + + + + +NATS · Simulador de rendimiento pub/sub + + + +
+

NATS · Simulador de rendimiento pub/sub

+
Un publisher inunda el broker con miles de mensajes que varios subscribers reciben (fan-out). La gráfica se mueve en tiempo real.
+
+ +
+
+
+
+ + + 60 000 +
+
+ + + 3 +
+
+ + listo +
+
+ +
+
Enviados
0
+
Recibidos (Σ subs)
0
+
Throughput recv
0
+
Tiempo
0.00 s
+
+ + +
+ enviados (publisher) + recibidos (suma de subscribers) +
+
+
+
+ + + + diff --git a/playground/server.py b/playground/server.py new file mode 100644 index 0000000..9ba0af1 --- /dev/null +++ b/playground/server.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +"""Playground: simulador de rendimiento NATS pub/sub con grafica en vivo. + +Sirve una pagina (index.html) con un boton y unos sliders. Al pulsar el boton, +el navegador abre un WebSocket y pide un benchmark; el servidor lanza 1 publisher +que envia miles de mensajes a N subscribers (fan-out) y va emitiendo muestras +(enviados, recibidos, throughput) por el WebSocket. El navegador las pinta en un +canvas en tiempo real, de modo que la grafica se mueve mientras corre el test. + +Reutiliza el venv del analisis padre (analysis/nats/.venv, con nats-py). No tiene +dependencias propias ni repo propio: vive dentro del sub-repo del analisis. + +Lanzar: + cd analysis/nats/playground + ../.venv/bin/python server.py + # abrir http://127.0.0.1:7788 +""" +import asyncio +import functools +import http.server +import json +import os +import subprocess +import threading +import time + +import nats +from websockets.asyncio.server import serve + +HERE = os.path.dirname(os.path.abspath(__file__)) +NATS_URL = "nats://127.0.0.1:4222" +HTTP_PORT = 7788 +WS_PORT = 7879 +SUBJECT = "bench.load" +PAYLOAD = b"x" * 128 # 128 bytes por mensaje + +# Limites de seguridad para los parametros que llegan del navegador +MAX_MSGS = 500_000 +MAX_SUBS = 12 + + +def ensure_nats() -> str: + """Arranca el broker NATS en Docker de forma idempotente.""" + def docker(*args): + return subprocess.run(["docker", *args], capture_output=True, text=True) + + state = docker("ps", "-a", "--filter", "name=^nats_demo$", "--format", "{{.State}}").stdout.strip() + if state == "running": + return "already-running" + if state in ("exited", "created", "paused"): + docker("start", "nats_demo") + time.sleep(1.0) + return "started" + docker("run", "-d", "--name", "nats_demo", "-p", "4222:4222", "-p", "8222:8222", + "nats:latest", "-js", "-m", "8222") + time.sleep(1.5) + return "created" + + +def start_http_server() -> None: + """Sirve los archivos estaticos del playground (index.html) en un thread.""" + handler = functools.partial(http.server.SimpleHTTPRequestHandler, directory=HERE) + http.server.ThreadingHTTPServer.allow_reuse_address = True + httpd = http.server.ThreadingHTTPServer(("127.0.0.1", HTTP_PORT), handler) + httpd.serve_forever() + + +async def run_benchmark(ws, n_msgs: int, n_subs: int) -> None: + """Un publisher envia n_msgs a n_subs subscribers. Emite muestras por el WebSocket.""" + nc = await nats.connect(NATS_URL, name="playground-benchmark") + counters = [0] * n_subs + + def make_cb(i): + async def cb(_msg): + counters[i] += 1 + return cb + + subs = [await nc.subscribe(SUBJECT, cb=make_cb(i)) for i in range(n_subs)] + + sent = 0 + t0 = time.monotonic() + + async def publish_all(): + nonlocal sent + for k in range(n_msgs): + await nc.publish(SUBJECT, PAYLOAD) + sent += 1 + if k % 500 == 0: + await nc.flush() + await asyncio.sleep(0) # ceder al event loop para que corran los callbacks + await nc.flush() + + await ws.send(json.dumps({"type": "start", "n_msgs": n_msgs, "n_subs": n_subs})) + task = asyncio.create_task(publish_all()) + + # Muestreo periodico para la grafica en movimiento + while not task.done() or sum(counters) < sent: + await asyncio.sleep(0.04) + t = time.monotonic() - t0 + await ws.send(json.dumps({"type": "sample", "t": round(t, 3), "sent": sent, "recv": sum(counters)})) + if t > 60: # tope de seguridad + break + await task + + # Drenaje final: dar tiempo a que los callbacks alcancen al publisher + for _ in range(60): + if sum(counters) >= sent: + break + await asyncio.sleep(0.05) + + dur = time.monotonic() - t0 + recv = sum(counters) + await ws.send(json.dumps({ + "type": "done", + "t": round(dur, 3), + "sent": sent, + "recv": recv, + "per_sub": counters, + "pub_tps": round(n_msgs / dur) if dur else 0, + "recv_tps": round(recv / dur) if dur else 0, + })) + + for s in subs: + await s.unsubscribe() + await nc.drain() + + +async def ws_handler(ws) -> None: + async for raw in ws: + try: + msg = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + if msg.get("action") == "start": + n_msgs = max(1000, min(int(msg.get("n_msgs", 20000)), MAX_MSGS)) + n_subs = max(1, min(int(msg.get("n_subs", 3)), MAX_SUBS)) + try: + await run_benchmark(ws, n_msgs, n_subs) + except Exception as exc: # noqa: BLE001 - reportar cualquier fallo al navegador + await ws.send(json.dumps({"type": "error", "msg": f"{type(exc).__name__}: {exc}"})) + + +async def main() -> None: + print("broker NATS:", ensure_nats()) + threading.Thread(target=start_http_server, daemon=True).start() + print(f"HTTP -> http://127.0.0.1:{HTTP_PORT}") + print(f"WS -> ws://127.0.0.1:{WS_PORT}") + async with serve(ws_handler, "127.0.0.1", WS_PORT): + await asyncio.Future() # corre indefinidamente + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nplayground detenido")