b42f2b8255
Reemplaza el widget ipywidgets del notebook 04 (fragil: 'model not found' sin re-ejecutar) por una webapp standalone. server.py corre el benchmark NATS y transmite muestras por WebSocket; index.html dibuja la grafica en movimiento en un canvas sin dependencias front. Reutiliza el venv del analisis. Verificado: 100k msgs -> 4 subs = 400k entregas (~367k/s) en ~1.1s.
157 lines
5.2 KiB
Python
157 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Playground: simulador de rendimiento NATS pub/sub con grafica en vivo.
|
|
|
|
Sirve una pagina (index.html) con un boton y unos sliders. Al pulsar el boton,
|
|
el navegador abre un WebSocket y pide un benchmark; el servidor lanza 1 publisher
|
|
que envia miles de mensajes a N subscribers (fan-out) y va emitiendo muestras
|
|
(enviados, recibidos, throughput) por el WebSocket. El navegador las pinta en un
|
|
canvas en tiempo real, de modo que la grafica se mueve mientras corre el test.
|
|
|
|
Reutiliza el venv del analisis padre (analysis/nats/.venv, con nats-py). No tiene
|
|
dependencias propias ni repo propio: vive dentro del sub-repo del analisis.
|
|
|
|
Lanzar:
|
|
cd analysis/nats/playground
|
|
../.venv/bin/python server.py
|
|
# abrir http://127.0.0.1:7788
|
|
"""
|
|
import asyncio
|
|
import functools
|
|
import http.server
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
|
|
import nats
|
|
from websockets.asyncio.server import serve
|
|
|
|
HERE = os.path.dirname(os.path.abspath(__file__))
|
|
NATS_URL = "nats://127.0.0.1:4222"
|
|
HTTP_PORT = 7788
|
|
WS_PORT = 7879
|
|
SUBJECT = "bench.load"
|
|
PAYLOAD = b"x" * 128 # 128 bytes por mensaje
|
|
|
|
# Limites de seguridad para los parametros que llegan del navegador
|
|
MAX_MSGS = 500_000
|
|
MAX_SUBS = 12
|
|
|
|
|
|
def ensure_nats() -> str:
|
|
"""Arranca el broker NATS en Docker de forma idempotente."""
|
|
def docker(*args):
|
|
return subprocess.run(["docker", *args], capture_output=True, text=True)
|
|
|
|
state = docker("ps", "-a", "--filter", "name=^nats_demo$", "--format", "{{.State}}").stdout.strip()
|
|
if state == "running":
|
|
return "already-running"
|
|
if state in ("exited", "created", "paused"):
|
|
docker("start", "nats_demo")
|
|
time.sleep(1.0)
|
|
return "started"
|
|
docker("run", "-d", "--name", "nats_demo", "-p", "4222:4222", "-p", "8222:8222",
|
|
"nats:latest", "-js", "-m", "8222")
|
|
time.sleep(1.5)
|
|
return "created"
|
|
|
|
|
|
def start_http_server() -> None:
|
|
"""Sirve los archivos estaticos del playground (index.html) en un thread."""
|
|
handler = functools.partial(http.server.SimpleHTTPRequestHandler, directory=HERE)
|
|
http.server.ThreadingHTTPServer.allow_reuse_address = True
|
|
httpd = http.server.ThreadingHTTPServer(("127.0.0.1", HTTP_PORT), handler)
|
|
httpd.serve_forever()
|
|
|
|
|
|
async def run_benchmark(ws, n_msgs: int, n_subs: int) -> None:
|
|
"""Un publisher envia n_msgs a n_subs subscribers. Emite muestras por el WebSocket."""
|
|
nc = await nats.connect(NATS_URL, name="playground-benchmark")
|
|
counters = [0] * n_subs
|
|
|
|
def make_cb(i):
|
|
async def cb(_msg):
|
|
counters[i] += 1
|
|
return cb
|
|
|
|
subs = [await nc.subscribe(SUBJECT, cb=make_cb(i)) for i in range(n_subs)]
|
|
|
|
sent = 0
|
|
t0 = time.monotonic()
|
|
|
|
async def publish_all():
|
|
nonlocal sent
|
|
for k in range(n_msgs):
|
|
await nc.publish(SUBJECT, PAYLOAD)
|
|
sent += 1
|
|
if k % 500 == 0:
|
|
await nc.flush()
|
|
await asyncio.sleep(0) # ceder al event loop para que corran los callbacks
|
|
await nc.flush()
|
|
|
|
await ws.send(json.dumps({"type": "start", "n_msgs": n_msgs, "n_subs": n_subs}))
|
|
task = asyncio.create_task(publish_all())
|
|
|
|
# Muestreo periodico para la grafica en movimiento
|
|
while not task.done() or sum(counters) < sent:
|
|
await asyncio.sleep(0.04)
|
|
t = time.monotonic() - t0
|
|
await ws.send(json.dumps({"type": "sample", "t": round(t, 3), "sent": sent, "recv": sum(counters)}))
|
|
if t > 60: # tope de seguridad
|
|
break
|
|
await task
|
|
|
|
# Drenaje final: dar tiempo a que los callbacks alcancen al publisher
|
|
for _ in range(60):
|
|
if sum(counters) >= sent:
|
|
break
|
|
await asyncio.sleep(0.05)
|
|
|
|
dur = time.monotonic() - t0
|
|
recv = sum(counters)
|
|
await ws.send(json.dumps({
|
|
"type": "done",
|
|
"t": round(dur, 3),
|
|
"sent": sent,
|
|
"recv": recv,
|
|
"per_sub": counters,
|
|
"pub_tps": round(n_msgs / dur) if dur else 0,
|
|
"recv_tps": round(recv / dur) if dur else 0,
|
|
}))
|
|
|
|
for s in subs:
|
|
await s.unsubscribe()
|
|
await nc.drain()
|
|
|
|
|
|
async def ws_handler(ws) -> None:
|
|
async for raw in ws:
|
|
try:
|
|
msg = json.loads(raw)
|
|
except (json.JSONDecodeError, TypeError):
|
|
continue
|
|
if msg.get("action") == "start":
|
|
n_msgs = max(1000, min(int(msg.get("n_msgs", 20000)), MAX_MSGS))
|
|
n_subs = max(1, min(int(msg.get("n_subs", 3)), MAX_SUBS))
|
|
try:
|
|
await run_benchmark(ws, n_msgs, n_subs)
|
|
except Exception as exc: # noqa: BLE001 - reportar cualquier fallo al navegador
|
|
await ws.send(json.dumps({"type": "error", "msg": f"{type(exc).__name__}: {exc}"}))
|
|
|
|
|
|
async def main() -> None:
|
|
print("broker NATS:", ensure_nats())
|
|
threading.Thread(target=start_http_server, daemon=True).start()
|
|
print(f"HTTP -> http://127.0.0.1:{HTTP_PORT}")
|
|
print(f"WS -> ws://127.0.0.1:{WS_PORT}")
|
|
async with serve(ws_handler, "127.0.0.1", WS_PORT):
|
|
await asyncio.Future() # corre indefinidamente
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\nplayground detenido")
|