Files
Egutierrez b42f2b8255 feat: playground simulador de rendimiento NATS (webapp WebSocket + canvas)
Reemplaza el widget ipywidgets del notebook 04 (fragil: 'model not found' sin
re-ejecutar) por una webapp standalone. server.py corre el benchmark NATS y
transmite muestras por WebSocket; index.html dibuja la grafica en movimiento en
un canvas sin dependencias front. Reutiliza el venv del analisis.
Verificado: 100k msgs -> 4 subs = 400k entregas (~367k/s) en ~1.1s.
2026-06-03 22:02:08 +02:00

157 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""Playground: simulador de rendimiento NATS pub/sub con grafica en vivo.
Sirve una pagina (index.html) con un boton y unos sliders. Al pulsar el boton,
el navegador abre un WebSocket y pide un benchmark; el servidor lanza 1 publisher
que envia miles de mensajes a N subscribers (fan-out) y va emitiendo muestras
(enviados, recibidos, throughput) por el WebSocket. El navegador las pinta en un
canvas en tiempo real, de modo que la grafica se mueve mientras corre el test.
Reutiliza el venv del analisis padre (analysis/nats/.venv, con nats-py). No tiene
dependencias propias ni repo propio: vive dentro del sub-repo del analisis.
Lanzar:
cd analysis/nats/playground
../.venv/bin/python server.py
# abrir http://127.0.0.1:7788
"""
import asyncio
import functools
import http.server
import json
import os
import subprocess
import threading
import time
import nats
from websockets.asyncio.server import serve
HERE = os.path.dirname(os.path.abspath(__file__))
NATS_URL = "nats://127.0.0.1:4222"
HTTP_PORT = 7788
WS_PORT = 7879
SUBJECT = "bench.load"
PAYLOAD = b"x" * 128 # 128 bytes por mensaje
# Limites de seguridad para los parametros que llegan del navegador
MAX_MSGS = 500_000
MAX_SUBS = 12
def ensure_nats() -> str:
"""Arranca el broker NATS en Docker de forma idempotente."""
def docker(*args):
return subprocess.run(["docker", *args], capture_output=True, text=True)
state = docker("ps", "-a", "--filter", "name=^nats_demo$", "--format", "{{.State}}").stdout.strip()
if state == "running":
return "already-running"
if state in ("exited", "created", "paused"):
docker("start", "nats_demo")
time.sleep(1.0)
return "started"
docker("run", "-d", "--name", "nats_demo", "-p", "4222:4222", "-p", "8222:8222",
"nats:latest", "-js", "-m", "8222")
time.sleep(1.5)
return "created"
def start_http_server() -> None:
"""Sirve los archivos estaticos del playground (index.html) en un thread."""
handler = functools.partial(http.server.SimpleHTTPRequestHandler, directory=HERE)
http.server.ThreadingHTTPServer.allow_reuse_address = True
httpd = http.server.ThreadingHTTPServer(("127.0.0.1", HTTP_PORT), handler)
httpd.serve_forever()
async def run_benchmark(ws, n_msgs: int, n_subs: int) -> None:
"""Un publisher envia n_msgs a n_subs subscribers. Emite muestras por el WebSocket."""
nc = await nats.connect(NATS_URL, name="playground-benchmark")
counters = [0] * n_subs
def make_cb(i):
async def cb(_msg):
counters[i] += 1
return cb
subs = [await nc.subscribe(SUBJECT, cb=make_cb(i)) for i in range(n_subs)]
sent = 0
t0 = time.monotonic()
async def publish_all():
nonlocal sent
for k in range(n_msgs):
await nc.publish(SUBJECT, PAYLOAD)
sent += 1
if k % 500 == 0:
await nc.flush()
await asyncio.sleep(0) # ceder al event loop para que corran los callbacks
await nc.flush()
await ws.send(json.dumps({"type": "start", "n_msgs": n_msgs, "n_subs": n_subs}))
task = asyncio.create_task(publish_all())
# Muestreo periodico para la grafica en movimiento
while not task.done() or sum(counters) < sent:
await asyncio.sleep(0.04)
t = time.monotonic() - t0
await ws.send(json.dumps({"type": "sample", "t": round(t, 3), "sent": sent, "recv": sum(counters)}))
if t > 60: # tope de seguridad
break
await task
# Drenaje final: dar tiempo a que los callbacks alcancen al publisher
for _ in range(60):
if sum(counters) >= sent:
break
await asyncio.sleep(0.05)
dur = time.monotonic() - t0
recv = sum(counters)
await ws.send(json.dumps({
"type": "done",
"t": round(dur, 3),
"sent": sent,
"recv": recv,
"per_sub": counters,
"pub_tps": round(n_msgs / dur) if dur else 0,
"recv_tps": round(recv / dur) if dur else 0,
}))
for s in subs:
await s.unsubscribe()
await nc.drain()
async def ws_handler(ws) -> None:
async for raw in ws:
try:
msg = json.loads(raw)
except (json.JSONDecodeError, TypeError):
continue
if msg.get("action") == "start":
n_msgs = max(1000, min(int(msg.get("n_msgs", 20000)), MAX_MSGS))
n_subs = max(1, min(int(msg.get("n_subs", 3)), MAX_SUBS))
try:
await run_benchmark(ws, n_msgs, n_subs)
except Exception as exc: # noqa: BLE001 - reportar cualquier fallo al navegador
await ws.send(json.dumps({"type": "error", "msg": f"{type(exc).__name__}: {exc}"}))
async def main() -> None:
print("broker NATS:", ensure_nats())
threading.Thread(target=start_http_server, daemon=True).start()
print(f"HTTP -> http://127.0.0.1:{HTTP_PORT}")
print(f"WS -> ws://127.0.0.1:{WS_PORT}")
async with serve(ws_handler, "127.0.0.1", WS_PORT):
await asyncio.Future() # corre indefinidamente
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nplayground detenido")