c9e28b8135
JetStream: anatomia de streams (storage/retention/limits), consumers pull durables con ack y cursor, dedup por Nats-Msg-Id, retencion workqueue, deliver policies. Simulador: boton ipywidgets que lanza 1 publisher -> N subscribers con miles de mensajes y grafica en movimiento (acumulado + throughput instantaneo).
436 lines
18 KiB
Python
436 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""Generador del notebook 04 del analisis NATS: JetStream a fondo + simulador de
|
||
rendimiento interactivo.
|
||
|
||
Construye el .ipynb con nbformat (sin ejecutar). La ejecucion se hace despues
|
||
contra el servidor Jupyter del analisis (puerto 8890, su propio venv) para que
|
||
los outputs queden persistidos y el widget interactivo se renderice en JupyterLab.
|
||
|
||
Reproducible: re-ejecutar este script regenera el notebook 04 desde cero.
|
||
"""
|
||
from pathlib import Path
|
||
|
||
import nbformat as nbf
|
||
|
||
NBDIR = Path("/home/enmanuel/fn_registry/analysis/nats/notebooks")
|
||
|
||
|
||
def build(filename: str, cells: list[tuple[str, str]]) -> None:
|
||
nb = nbf.v4.new_notebook()
|
||
nb.metadata["kernelspec"] = {
|
||
"name": "python3",
|
||
"display_name": "Python 3 (ipykernel)",
|
||
"language": "python",
|
||
}
|
||
nb.cells = [
|
||
nbf.v4.new_markdown_cell(src) if typ == "md" else nbf.v4.new_code_cell(src)
|
||
for typ, src in cells
|
||
]
|
||
nbf.write(nb, str(NBDIR / filename))
|
||
print(f"escrito {filename}: {len(cells)} celdas")
|
||
|
||
|
||
ENSURE_NATS = '''import subprocess, time, json
|
||
|
||
NATS_CONTAINER = "nats_demo"
|
||
NATS_PORT = 4222
|
||
NATS_URL = f"nats://127.0.0.1:{NATS_PORT}"
|
||
|
||
def _docker(*args, check=True):
|
||
return subprocess.run(["docker", *args], capture_output=True, text=True, check=check)
|
||
|
||
def ensure_nats(name=NATS_CONTAINER, port=NATS_PORT):
|
||
"""Arranca un broker NATS en Docker de forma idempotente. Devuelve el estado."""
|
||
out = _docker("ps", "-a", "--filter", f"name=^{name}$", "--format", "{{.State}}", check=False).stdout.strip()
|
||
if out == "running":
|
||
state = "already-running"
|
||
elif out in ("exited", "created", "paused"):
|
||
_docker("start", name)
|
||
state = "started"
|
||
else:
|
||
_docker("run", "-d", "--name", name, "-p", f"{port}:4222", "-p", "8222:8222",
|
||
"nats:latest", "-js", "-m", "8222")
|
||
state = "created"
|
||
time.sleep(1.0)
|
||
return state'''
|
||
|
||
|
||
nb4 = [
|
||
("md", """# NATS pub/sub — 04 · JetStream a fondo y simulador de rendimiento
|
||
|
||
Este notebook tiene dos partes:
|
||
|
||
1. **JetStream a fondo** — más allá del replay básico del notebook 02: anatomía de un stream (almacenamiento, políticas de retención, límites), tipos de consumer, *acks*, deduplicación y políticas de entrega.
|
||
2. **Simulador de rendimiento interactivo** — un botón que, al pulsarlo, lanza un publisher que envía **miles de mensajes** a varios subscribers, con una **gráfica en movimiento** que muestra el throughput en tiempo real.
|
||
|
||
> Requiere el broker `nats_demo` (arrancado por la primera celda) y `ipywidgets` (incluido en el venv del análisis)."""),
|
||
|
||
# ---- Parte A: JetStream a fondo ----
|
||
("md", """## Parte A · JetStream a fondo
|
||
|
||
### Setup
|
||
|
||
JetStream es la capa de persistencia de NATS. Mientras el core es *fire-and-forget*, JetStream **almacena** los mensajes en un *stream* y permite leerlos con *consumers* que controlan el ritmo, confirman (*ack*) cada mensaje y pueden reproducir el historial."""),
|
||
|
||
("code", ENSURE_NATS + '''
|
||
|
||
import asyncio
|
||
import nats
|
||
|
||
print("Broker:", ensure_nats())
|
||
nc = await nats.connect(NATS_URL, name="notebook-04")
|
||
js = nc.jetstream()
|
||
print("JetStream context listo. account info:")
|
||
ai = await js.account_info()
|
||
print(f" streams={ai.streams} consumers={ai.consumers} memory={ai.memory} storage={ai.storage}")'''),
|
||
|
||
("md", """### 1 · Anatomía de un stream
|
||
|
||
Un **stream** se define por:
|
||
|
||
- **subjects** — qué subjects captura (`pedidos.>`).
|
||
- **storage** — `file` (persistente en disco) o `memory` (rápido, se pierde al reiniciar).
|
||
- **retention** — cuándo se descartan los mensajes:
|
||
- `limits` (por defecto): se guardan hasta tocar un límite (`max_msgs`, `max_bytes`, `max_age`).
|
||
- `interest`: se descartan cuando todos los consumers interesados los han recibido.
|
||
- `workqueue`: cada mensaje se borra en cuanto **un** consumer lo confirma (cola de trabajo).
|
||
- **límites** — `max_msgs`, `max_bytes`, `max_age` (segundos), `max_msg_size`.
|
||
- **duplicate_window** — ventana de deduplicación (ver §3).
|
||
|
||
Creamos un stream `limits` con almacenamiento en disco y un tope de mensajes."""),
|
||
|
||
("code", '''from nats.js.api import StreamConfig, RetentionPolicy, StorageType, DiscardPolicy
|
||
|
||
# Recrear limpio para que la demo sea determinista
|
||
for s in ("DEMO_LIMITS", "DEMO_DEDUP", "DEMO_WQ"):
|
||
try:
|
||
await js.delete_stream(s)
|
||
except Exception:
|
||
pass
|
||
|
||
cfg = StreamConfig(
|
||
name="DEMO_LIMITS",
|
||
subjects=["demo.limits.>"],
|
||
storage=StorageType.FILE,
|
||
retention=RetentionPolicy.LIMITS,
|
||
max_msgs=1000, # tope de mensajes
|
||
max_age=3600, # 1 hora (segundos)
|
||
discard=DiscardPolicy.OLD, # al llegar al tope, descarta los más viejos
|
||
duplicate_window=120, # ventana de dedup: 120 s
|
||
)
|
||
info = await js.add_stream(cfg)
|
||
c = info.config
|
||
print("Stream creado:")
|
||
print(f" name : {c.name}")
|
||
print(f" subjects : {c.subjects}")
|
||
print(f" storage : {c.storage}")
|
||
print(f" retention : {c.retention}")
|
||
print(f" max_msgs : {c.max_msgs}")
|
||
print(f" max_age (s) : {c.max_age}")
|
||
print(f" discard : {c.discard}")
|
||
print(f" dup_window (s): {c.duplicate_window}")'''),
|
||
|
||
("md", """### 2 · Consumers: pull, durable, ack
|
||
|
||
Un **consumer** es la vista de lectura sobre un stream. Dos ejes:
|
||
|
||
- **pull vs push**: en *pull* el cliente pide mensajes cuando quiere (`fetch`); en *push* el servidor los empuja según llegan.
|
||
- **durable vs ephemeral**: un consumer *durable* tiene nombre y **recuerda su posición** (cursor) entre reconexiones; uno *ephemeral* desaparece al cerrarse.
|
||
|
||
El **ack** es la confirmación de procesado. Hasta que un mensaje no se confirma, el consumer lo considera *pendiente* y, si pasa el `ack_wait`, lo **reentrega**. Esto da entrega *at-least-once*."""),
|
||
|
||
("code", '''# Publicar 6 mensajes en el stream de límites
|
||
for i in range(6):
|
||
await js.publish("demo.limits.eventos", f"evento-{i}".encode())
|
||
|
||
# Pull consumer DURABLE: recuerda su cursor entre fetches
|
||
psub = await js.pull_subscribe("demo.limits.>", durable="procesador-A")
|
||
|
||
# Traer 4 y confirmarlos (ack)
|
||
batch = await psub.fetch(4, timeout=2)
|
||
print("Primer fetch (4 msgs):")
|
||
for m in batch:
|
||
print(f" seq={m.metadata.sequence.stream} {m.data.decode()}")
|
||
await m.ack()
|
||
|
||
# Estado del consumer: cuántos quedan pendientes / entregados
|
||
ci = await psub.consumer_info()
|
||
print()
|
||
print(f"Consumer 'procesador-A':")
|
||
print(f" num_pending : {ci.num_pending} (mensajes sin entregar todavía)")
|
||
print(f" num_ack_pending: {ci.num_ack_pending} (entregados sin ack)")
|
||
print(f" delivered.stream_seq: {ci.delivered.stream_seq}")
|
||
|
||
# Segundo fetch: continúa donde se quedó (recuerda el cursor)
|
||
batch2 = await psub.fetch(10, timeout=1)
|
||
print()
|
||
print(f"Segundo fetch: {len(batch2)} msgs restantes ->", [m.data.decode() for m in batch2])
|
||
for m in batch2:
|
||
await m.ack()'''),
|
||
|
||
("md", """### 3 · Deduplicación por `Nats-Msg-Id`
|
||
|
||
Si un publisher reintenta por un timeout de red, podría enviar el mismo mensaje dos veces. JetStream lo evita: si dos publicaciones llevan el mismo header **`Nats-Msg-Id`** dentro de la `duplicate_window`, la segunda se reconoce como **duplicada** y **no** se almacena. El `PubAck` lo indica con `duplicate=True`."""),
|
||
|
||
("code", '''await js.add_stream(name="DEMO_DEDUP", subjects=["demo.dedup.>"],
|
||
storage=StorageType.FILE, duplicate_window=120)
|
||
|
||
# Publicar dos veces el MISMO Nats-Msg-Id
|
||
ack1 = await js.publish("demo.dedup.pago", b"cobro 50e", headers={"Nats-Msg-Id": "pago-0001"})
|
||
ack2 = await js.publish("demo.dedup.pago", b"cobro 50e", headers={"Nats-Msg-Id": "pago-0001"})
|
||
|
||
print(f"1a publicacion: seq={ack1.seq} duplicate={ack1.duplicate}")
|
||
print(f"2a publicacion: seq={ack2.seq} duplicate={ack2.duplicate} <- detectada como duplicado")
|
||
|
||
# Un Msg-Id distinto sí se almacena
|
||
ack3 = await js.publish("demo.dedup.pago", b"cobro 30e", headers={"Nats-Msg-Id": "pago-0002"})
|
||
print(f"3a publicacion (id nuevo): seq={ack3.seq} duplicate={ack3.duplicate}")
|
||
|
||
st = (await js.stream_info("DEMO_DEDUP")).state
|
||
print()
|
||
print(f"Mensajes realmente almacenados en el stream: {st.messages} (2 publicaciones unicas, 1 descartada)")'''),
|
||
|
||
("md", """### 4 · Retención `workqueue`: la cola de trabajo
|
||
|
||
Con `retention=workqueue`, cada mensaje se **borra del stream en cuanto un consumer lo confirma**. Es el patrón de cola de tareas distribuida: los mensajes se reparten entre workers y desaparecen al procesarse, así el stream no crece sin fin."""),
|
||
|
||
("code", '''from nats.js.api import RetentionPolicy, StorageType
|
||
|
||
await js.add_stream(name="DEMO_WQ", subjects=["demo.wq.>"],
|
||
storage=StorageType.FILE, retention=RetentionPolicy.WORK_QUEUE)
|
||
|
||
# Encolar 5 tareas
|
||
for i in range(5):
|
||
await js.publish("demo.wq.tareas", f"tarea-{i}".encode())
|
||
|
||
antes = (await js.stream_info("DEMO_WQ")).state.messages
|
||
print(f"Tareas encoladas en el stream: {antes}")
|
||
|
||
# Un worker consume y confirma 3
|
||
wsub = await js.pull_subscribe("demo.wq.>", durable="worker")
|
||
tres = await wsub.fetch(3, timeout=2)
|
||
for m in tres:
|
||
print(f" procesada: {m.data.decode()}")
|
||
await m.ack() # al confirmar, JetStream BORRA el mensaje del stream
|
||
|
||
await asyncio.sleep(0.3)
|
||
despues = (await js.stream_info("DEMO_WQ")).state.messages
|
||
print()
|
||
print(f"Mensajes restantes en el stream tras 3 acks: {despues} (workqueue borra lo confirmado: {antes} -> {despues})")'''),
|
||
|
||
("md", """### 5 · Políticas de entrega (replay)
|
||
|
||
Al crear un consumer se elige **desde dónde** empieza a leer (`DeliverPolicy`):
|
||
|
||
- `ALL` — todo el historial desde el principio (lo habitual para reprocesar).
|
||
- `LAST` — solo el último mensaje del stream.
|
||
- `NEW` — solo lo que llegue a partir de ahora.
|
||
- `BY_START_SEQUENCE` / `BY_START_TIME` — desde una secuencia o instante concretos.
|
||
|
||
Comparamos `ALL` vs `LAST` sobre el stream de límites (que tiene 6 mensajes)."""),
|
||
|
||
("code", '''from nats.js.api import ConsumerConfig, DeliverPolicy
|
||
|
||
# Consumer que reproduce TODO el historial
|
||
all_sub = await js.pull_subscribe(
|
||
"demo.limits.>", durable="replay-all",
|
||
config=ConsumerConfig(deliver_policy=DeliverPolicy.ALL),
|
||
)
|
||
todos = await all_sub.fetch(50, timeout=1)
|
||
print(f"DeliverPolicy.ALL -> {len(todos)} mensajes:", [m.data.decode() for m in todos])
|
||
for m in todos:
|
||
await m.ack()
|
||
|
||
# Consumer que solo entrega el ÚLTIMO
|
||
last_sub = await js.pull_subscribe(
|
||
"demo.limits.>", durable="replay-last",
|
||
config=ConsumerConfig(deliver_policy=DeliverPolicy.LAST),
|
||
)
|
||
ultimo = await last_sub.fetch(50, timeout=1)
|
||
print(f"DeliverPolicy.LAST -> {len(ultimo)} mensaje :", [m.data.decode() for m in ultimo])
|
||
for m in ultimo:
|
||
await m.ack()'''),
|
||
|
||
# ---- Parte B: simulador interactivo ----
|
||
("md", """## Parte B · Simulador de rendimiento (interactivo)
|
||
|
||
Pulsa **▶ Ejecutar benchmark** y verás cómo **un publisher** inunda el broker con miles de mensajes que **varios subscribers** reciben simultáneamente (fan-out). La gráfica se actualiza **en movimiento** mientras corre:
|
||
|
||
- **Izquierda** — mensajes acumulados: enviados (publisher) vs recibidos (suma de todos los subs).
|
||
- **Derecha** — throughput instantáneo (msgs/s recibidos) muestreado cada ~80 ms.
|
||
|
||
Ajusta los sliders para cambiar el número de mensajes y de subscribers. Con más mensajes (p. ej. 100.000) la animación dura más y se aprecia mejor la curva."""),
|
||
|
||
("code", '''import ipywidgets as widgets
|
||
from IPython.display import display, clear_output
|
||
import matplotlib.pyplot as plt
|
||
import asyncio, time
|
||
import nats
|
||
|
||
# --- widgets ---
|
||
n_msgs_w = widgets.IntSlider(value=20000, min=1000, max=100000, step=1000,
|
||
description="Mensajes:", style={"description_width": "initial"},
|
||
layout=widgets.Layout(width="380px"))
|
||
n_subs_w = widgets.IntSlider(value=3, min=1, max=8, step=1,
|
||
description="Subscribers:", style={"description_width": "initial"})
|
||
run_btn = widgets.Button(description="▶ Ejecutar benchmark", button_style="success",
|
||
layout=widgets.Layout(width="220px"))
|
||
plot_out = widgets.Output()
|
||
log_out = widgets.Output()
|
||
|
||
SUBJECT = "bench.load"
|
||
PAYLOAD = b"x" * 128 # 128 bytes por mensaje
|
||
|
||
def _throughput(ts, recv):
|
||
thr = [0.0]
|
||
for i in range(1, len(ts)):
|
||
dt = ts[i] - ts[i-1]
|
||
thr.append((recv[i] - recv[i-1]) / dt if dt > 0 else 0.0)
|
||
return thr
|
||
|
||
def render(history, n_subs, n_msgs, done=False):
|
||
ts = [h[0] for h in history]
|
||
sent = [h[1] for h in history]
|
||
recv = [h[2] for h in history]
|
||
thr = _throughput(ts, recv)
|
||
with plot_out:
|
||
clear_output(wait=True)
|
||
fig, (a1, a2) = plt.subplots(1, 2, figsize=(11, 3.6))
|
||
a1.plot(ts, sent, label="enviados (pub)", color="#2563eb", lw=2)
|
||
a1.plot(ts, recv, label=f"recibidos (Σ {n_subs} subs)", color="#16a34a", lw=2)
|
||
a1.set_xlabel("segundos"); a1.set_ylabel("mensajes acumulados")
|
||
a1.set_title("Publisher vs subscribers"); a1.legend(loc="upper left")
|
||
a2.plot(ts, thr, color="#db2777", lw=2)
|
||
a2.set_xlabel("segundos"); a2.set_ylabel("msgs/s recibidos")
|
||
a2.set_title("Throughput instantáneo")
|
||
estado = "✓ DONE" if done else "● corriendo…"
|
||
fig.suptitle(f"[{estado}] {n_msgs:,} msgs → {n_subs} subs "
|
||
f"enviados={sent[-1]:,} recibidos={recv[-1]:,}", fontsize=11)
|
||
plt.tight_layout(); plt.show()
|
||
|
||
async def run_benchmark(n_msgs, n_subs, live=True):
|
||
"""1 publisher -> n_subs subscribers. Devuelve (history, counters)."""
|
||
nc = await nats.connect(NATS_URL, name="benchmark")
|
||
counters = [0] * n_subs
|
||
|
||
def make_cb(i):
|
||
async def cb(msg):
|
||
counters[i] += 1
|
||
return cb
|
||
|
||
subs = [await nc.subscribe(SUBJECT, cb=make_cb(i)) for i in range(n_subs)]
|
||
history = [] # (t, enviados, recibidos_total)
|
||
sent = 0
|
||
t0 = time.monotonic()
|
||
|
||
async def publish_all():
|
||
nonlocal sent
|
||
for k in range(n_msgs):
|
||
await nc.publish(SUBJECT, PAYLOAD)
|
||
sent += 1
|
||
if k % 1000 == 0:
|
||
await nc.flush()
|
||
await asyncio.sleep(0) # ceder al event loop (deja correr callbacks)
|
||
await nc.flush()
|
||
|
||
task = asyncio.create_task(publish_all())
|
||
|
||
# Muestreo para la gráfica en movimiento
|
||
while not task.done() or sum(counters) < sent:
|
||
await asyncio.sleep(0.08)
|
||
history.append((time.monotonic() - t0, sent, sum(counters)))
|
||
if live:
|
||
render(history, n_subs, n_msgs)
|
||
if time.monotonic() - t0 > 30: # tope de seguridad
|
||
break
|
||
await task
|
||
|
||
# Drenaje final (que los callbacks alcancen al publisher)
|
||
for _ in range(40):
|
||
if sum(counters) >= sent:
|
||
break
|
||
await asyncio.sleep(0.05)
|
||
history.append((time.monotonic() - t0, sent, sum(counters)))
|
||
if live:
|
||
render(history, n_subs, n_msgs, done=True)
|
||
|
||
for s in subs:
|
||
await s.unsubscribe()
|
||
await nc.drain()
|
||
return history, counters
|
||
|
||
def on_click(_):
|
||
run_btn.disabled = True
|
||
with log_out:
|
||
clear_output()
|
||
print(f"Lanzando: {n_msgs_w.value:,} mensajes → {n_subs_w.value} subscribers …")
|
||
async def go():
|
||
try:
|
||
history, counters = await run_benchmark(n_msgs_w.value, n_subs_w.value, live=True)
|
||
dur = history[-1][0]
|
||
recv = sum(counters)
|
||
with log_out:
|
||
print(f"OK en {dur:.2f}s")
|
||
print(f" enviados : {n_msgs_w.value:,}")
|
||
print(f" recibidos: {recv:,} (fan-out ×{n_subs_w.value} = {recv/max(n_msgs_w.value,1):.2f} por mensaje)")
|
||
print(f" throughput pub : {n_msgs_w.value/dur:,.0f} msgs/s")
|
||
print(f" throughput recv: {recv/dur:,.0f} msgs/s (entregas totales)")
|
||
print(f" por subscriber : {counters}")
|
||
finally:
|
||
run_btn.disabled = False
|
||
asyncio.ensure_future(go())
|
||
|
||
run_btn.on_click(on_click)
|
||
display(widgets.HBox([n_msgs_w, n_subs_w]), run_btn, plot_out, log_out)
|
||
print("Simulador listo. Pulsa el botón para lanzar el benchmark.")'''),
|
||
|
||
("md", """### Verificación (headless)
|
||
|
||
La celda anterior renderiza el widget para pulsarlo en JupyterLab. Aquí ejecutamos el mismo benchmark **una vez de forma programática** (sin botón) para dejar evidencia ejecutada: una corrida real de 15.000 mensajes a 3 subscribers con su gráfica final."""),
|
||
|
||
("code", '''hist, counters = await run_benchmark(15000, 3, live=False)
|
||
|
||
ts = [h[0] for h in hist]
|
||
sent = [h[1] for h in hist]
|
||
recv = [h[2] for h in hist]
|
||
dur = ts[-1]
|
||
|
||
fig, ax = plt.subplots(figsize=(9, 3.6))
|
||
ax.plot(ts, sent, label="enviados (pub)", color="#2563eb", lw=2)
|
||
ax.plot(ts, recv, label="recibidos (Σ 3 subs)", color="#16a34a", lw=2)
|
||
ax.set_xlabel("segundos"); ax.set_ylabel("mensajes acumulados")
|
||
ax.set_title(f"Benchmark headless: 15.000 msgs → 3 subs en {dur:.2f}s")
|
||
ax.legend(loc="upper left")
|
||
plt.tight_layout(); plt.show()
|
||
|
||
print(f"enviados : 15,000")
|
||
print(f"recibidos: {sum(counters):,} por sub -> {counters}")
|
||
print(f"throughput pub : {15000/dur:,.0f} msgs/s")
|
||
print(f"throughput recv: {sum(counters)/dur:,.0f} msgs/s (entregas totales, fan-out x3)")'''),
|
||
|
||
("md", """## Resumen
|
||
|
||
**JetStream a fondo:**
|
||
- Un **stream** persiste mensajes con políticas de **storage** (file/memory), **retention** (limits/interest/workqueue) y **límites** (max_msgs/max_age).
|
||
- Los **consumers** (pull/push, durable/ephemeral) leen a su ritmo y **confirman** (ack) cada mensaje → entrega *at-least-once*.
|
||
- **Dedup** por `Nats-Msg-Id` evita duplicados por reintentos.
|
||
- **workqueue** borra cada mensaje al confirmarse → cola de trabajo.
|
||
- **DeliverPolicy** controla el replay (all/last/new/by_sequence/by_time).
|
||
|
||
**Simulador:** demuestra el fan-out a escala — un publisher alimenta a N subscribers con miles de mensajes y la gráfica en vivo muestra que el throughput de recepción sigue al de envío (cada mensaje se entrega a los N subscribers).
|
||
|
||
### Limpieza
|
||
|
||
```python
|
||
for s in ("DEMO_LIMITS", "DEMO_DEDUP", "DEMO_WQ"):
|
||
try: await js.delete_stream(s)
|
||
except Exception: pass
|
||
await nc.drain()
|
||
```"""),
|
||
]
|
||
|
||
|
||
if __name__ == "__main__":
|
||
build("04_jetstream_benchmark.ipynb", nb4)
|
||
print("OK: notebook 04 generado en", NBDIR)
|