c9e28b8135
JetStream: anatomia de streams (storage/retention/limits), consumers pull durables con ack y cursor, dedup por Nats-Msg-Id, retencion workqueue, deliver policies. Simulador: boton ipywidgets que lanza 1 publisher -> N subscribers con miles de mensajes y grafica en movimiento (acumulado + throughput instantaneo).
540 lines
22 KiB
Plaintext
540 lines
22 KiB
Plaintext
{
|
||
"cells": [
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "72277c66",
|
||
"metadata": {},
|
||
"source": [
|
||
"# NATS pub/sub — 04 · JetStream a fondo y simulador de rendimiento\n",
|
||
"\n",
|
||
"Este notebook tiene dos partes:\n",
|
||
"\n",
|
||
"1. **JetStream a fondo** — más allá del replay básico del notebook 02: anatomía de un stream (almacenamiento, políticas de retención, límites), tipos de consumer, *acks*, deduplicación y políticas de entrega.\n",
|
||
"2. **Simulador de rendimiento interactivo** — un botón que, al pulsarlo, lanza un publisher que envía **miles de mensajes** a varios subscribers, con una **gráfica en movimiento** que muestra el throughput en tiempo real.\n",
|
||
"\n",
|
||
"> Requiere el broker `nats_demo` (arrancado por la primera celda) y `ipywidgets` (incluido en el venv del análisis)."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "530eac60",
|
||
"metadata": {},
|
||
"source": [
|
||
"## Parte A · JetStream a fondo\n",
|
||
"\n",
|
||
"### Setup\n",
|
||
"\n",
|
||
"JetStream es la capa de persistencia de NATS. Mientras el core es *fire-and-forget*, JetStream **almacena** los mensajes en un *stream* y permite leerlos con *consumers* que controlan el ritmo, confirman (*ack*) cada mensaje y pueden reproducir el historial."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 1,
|
||
"id": "847ca130",
|
||
"metadata": {},
|
||
"outputs": [
|
||
{
|
||
"name": "stdout",
|
||
"output_type": "stream",
|
||
"text": [
|
||
"Broker: already-running\n",
|
||
"JetStream context listo. account info:\n",
|
||
" streams=1 consumers=1 memory=0 storage=260\n"
|
||
]
|
||
}
|
||
],
|
||
"source": [
|
||
"import subprocess, time, json\n",
|
||
"\n",
|
||
"NATS_CONTAINER = \"nats_demo\"\n",
|
||
"NATS_PORT = 4222\n",
|
||
"NATS_URL = f\"nats://127.0.0.1:{NATS_PORT}\"\n",
|
||
"\n",
|
||
"def _docker(*args, check=True):\n",
|
||
" return subprocess.run([\"docker\", *args], capture_output=True, text=True, check=check)\n",
|
||
"\n",
|
||
"def ensure_nats(name=NATS_CONTAINER, port=NATS_PORT):\n",
|
||
" \"\"\"Arranca un broker NATS en Docker de forma idempotente. Devuelve el estado.\"\"\"\n",
|
||
" out = _docker(\"ps\", \"-a\", \"--filter\", f\"name=^{name}$\", \"--format\", \"{{.State}}\", check=False).stdout.strip()\n",
|
||
" if out == \"running\":\n",
|
||
" state = \"already-running\"\n",
|
||
" elif out in (\"exited\", \"created\", \"paused\"):\n",
|
||
" _docker(\"start\", name)\n",
|
||
" state = \"started\"\n",
|
||
" else:\n",
|
||
" _docker(\"run\", \"-d\", \"--name\", name, \"-p\", f\"{port}:4222\", \"-p\", \"8222:8222\",\n",
|
||
" \"nats:latest\", \"-js\", \"-m\", \"8222\")\n",
|
||
" state = \"created\"\n",
|
||
" time.sleep(1.0)\n",
|
||
" return state\n",
|
||
"\n",
|
||
"import asyncio\n",
|
||
"import nats\n",
|
||
"\n",
|
||
"print(\"Broker:\", ensure_nats())\n",
|
||
"nc = await nats.connect(NATS_URL, name=\"notebook-04\")\n",
|
||
"js = nc.jetstream()\n",
|
||
"print(\"JetStream context listo. account info:\")\n",
|
||
"ai = await js.account_info()\n",
|
||
"print(f\" streams={ai.streams} consumers={ai.consumers} memory={ai.memory} storage={ai.storage}\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "b55873ec",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 1 · Anatomía de un stream\n",
|
||
"\n",
|
||
"Un **stream** se define por:\n",
|
||
"\n",
|
||
"- **subjects** — qué subjects captura (`pedidos.>`).\n",
|
||
"- **storage** — `file` (persistente en disco) o `memory` (rápido, se pierde al reiniciar).\n",
|
||
"- **retention** — cuándo se descartan los mensajes:\n",
|
||
" - `limits` (por defecto): se guardan hasta tocar un límite (`max_msgs`, `max_bytes`, `max_age`).\n",
|
||
" - `interest`: se descartan cuando todos los consumers interesados los han recibido.\n",
|
||
" - `workqueue`: cada mensaje se borra en cuanto **un** consumer lo confirma (cola de trabajo).\n",
|
||
"- **límites** — `max_msgs`, `max_bytes`, `max_age` (segundos), `max_msg_size`.\n",
|
||
"- **duplicate_window** — ventana de deduplicación (ver §3).\n",
|
||
"\n",
|
||
"Creamos un stream `limits` con almacenamiento en disco y un tope de mensajes."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"id": "67118a66",
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"from nats.js.api import StreamConfig, RetentionPolicy, StorageType, DiscardPolicy\n",
|
||
"\n",
|
||
"# Recrear limpio para que la demo sea determinista\n",
|
||
"for s in (\"DEMO_LIMITS\", \"DEMO_DEDUP\", \"DEMO_WQ\"):\n",
|
||
" try:\n",
|
||
" await js.delete_stream(s)\n",
|
||
" except Exception:\n",
|
||
" pass\n",
|
||
"\n",
|
||
"cfg = StreamConfig(\n",
|
||
" name=\"DEMO_LIMITS\",\n",
|
||
" subjects=[\"demo.limits.>\"],\n",
|
||
" storage=StorageType.FILE,\n",
|
||
" retention=RetentionPolicy.LIMITS,\n",
|
||
" max_msgs=1000, # tope de mensajes\n",
|
||
" max_age=3600, # 1 hora (segundos)\n",
|
||
" discard=DiscardPolicy.OLD, # al llegar al tope, descarta los más viejos\n",
|
||
" duplicate_window=120, # ventana de dedup: 120 s\n",
|
||
")\n",
|
||
"info = await js.add_stream(cfg)\n",
|
||
"c = info.config\n",
|
||
"print(\"Stream creado:\")\n",
|
||
"print(f\" name : {c.name}\")\n",
|
||
"print(f\" subjects : {c.subjects}\")\n",
|
||
"print(f\" storage : {c.storage}\")\n",
|
||
"print(f\" retention : {c.retention}\")\n",
|
||
"print(f\" max_msgs : {c.max_msgs}\")\n",
|
||
"print(f\" max_age (s) : {c.max_age}\")\n",
|
||
"print(f\" discard : {c.discard}\")\n",
|
||
"print(f\" dup_window (s): {c.duplicate_window}\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "9bf78aac",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 2 · Consumers: pull, durable, ack\n",
|
||
"\n",
|
||
"Un **consumer** es la vista de lectura sobre un stream. Dos ejes:\n",
|
||
"\n",
|
||
"- **pull vs push**: en *pull* el cliente pide mensajes cuando quiere (`fetch`); en *push* el servidor los empuja según llegan.\n",
|
||
"- **durable vs ephemeral**: un consumer *durable* tiene nombre y **recuerda su posición** (cursor) entre reconexiones; uno *ephemeral* desaparece al cerrarse.\n",
|
||
"\n",
|
||
"El **ack** es la confirmación de procesado. Hasta que un mensaje no se confirma, el consumer lo considera *pendiente* y, si pasa el `ack_wait`, lo **reentrega**. Esto da entrega *at-least-once*."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"id": "499e73f7",
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"# Publicar 6 mensajes en el stream de límites\n",
|
||
"for i in range(6):\n",
|
||
" await js.publish(\"demo.limits.eventos\", f\"evento-{i}\".encode())\n",
|
||
"\n",
|
||
"# Pull consumer DURABLE: recuerda su cursor entre fetches\n",
|
||
"psub = await js.pull_subscribe(\"demo.limits.>\", durable=\"procesador-A\")\n",
|
||
"\n",
|
||
"# Traer 4 y confirmarlos (ack)\n",
|
||
"batch = await psub.fetch(4, timeout=2)\n",
|
||
"print(\"Primer fetch (4 msgs):\")\n",
|
||
"for m in batch:\n",
|
||
" print(f\" seq={m.metadata.sequence.stream} {m.data.decode()}\")\n",
|
||
" await m.ack()\n",
|
||
"\n",
|
||
"# Estado del consumer: cuántos quedan pendientes / entregados\n",
|
||
"ci = await psub.consumer_info()\n",
|
||
"print()\n",
|
||
"print(f\"Consumer 'procesador-A':\")\n",
|
||
"print(f\" num_pending : {ci.num_pending} (mensajes sin entregar todavía)\")\n",
|
||
"print(f\" num_ack_pending: {ci.num_ack_pending} (entregados sin ack)\")\n",
|
||
"print(f\" delivered.stream_seq: {ci.delivered.stream_seq}\")\n",
|
||
"\n",
|
||
"# Segundo fetch: continúa donde se quedó (recuerda el cursor)\n",
|
||
"batch2 = await psub.fetch(10, timeout=1)\n",
|
||
"print()\n",
|
||
"print(f\"Segundo fetch: {len(batch2)} msgs restantes ->\", [m.data.decode() for m in batch2])\n",
|
||
"for m in batch2:\n",
|
||
" await m.ack()"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "15b96e37",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 3 · Deduplicación por `Nats-Msg-Id`\n",
|
||
"\n",
|
||
"Si un publisher reintenta por un timeout de red, podría enviar el mismo mensaje dos veces. JetStream lo evita: si dos publicaciones llevan el mismo header **`Nats-Msg-Id`** dentro de la `duplicate_window`, la segunda se reconoce como **duplicada** y **no** se almacena. El `PubAck` lo indica con `duplicate=True`."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"id": "fdc4c498",
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"await js.add_stream(name=\"DEMO_DEDUP\", subjects=[\"demo.dedup.>\"],\n",
|
||
" storage=StorageType.FILE, duplicate_window=120)\n",
|
||
"\n",
|
||
"# Publicar dos veces el MISMO Nats-Msg-Id\n",
|
||
"ack1 = await js.publish(\"demo.dedup.pago\", b\"cobro 50e\", headers={\"Nats-Msg-Id\": \"pago-0001\"})\n",
|
||
"ack2 = await js.publish(\"demo.dedup.pago\", b\"cobro 50e\", headers={\"Nats-Msg-Id\": \"pago-0001\"})\n",
|
||
"\n",
|
||
"print(f\"1a publicacion: seq={ack1.seq} duplicate={ack1.duplicate}\")\n",
|
||
"print(f\"2a publicacion: seq={ack2.seq} duplicate={ack2.duplicate} <- detectada como duplicado\")\n",
|
||
"\n",
|
||
"# Un Msg-Id distinto sí se almacena\n",
|
||
"ack3 = await js.publish(\"demo.dedup.pago\", b\"cobro 30e\", headers={\"Nats-Msg-Id\": \"pago-0002\"})\n",
|
||
"print(f\"3a publicacion (id nuevo): seq={ack3.seq} duplicate={ack3.duplicate}\")\n",
|
||
"\n",
|
||
"st = (await js.stream_info(\"DEMO_DEDUP\")).state\n",
|
||
"print()\n",
|
||
"print(f\"Mensajes realmente almacenados en el stream: {st.messages} (2 publicaciones unicas, 1 descartada)\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "da997d2e",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 4 · Retención `workqueue`: la cola de trabajo\n",
|
||
"\n",
|
||
"Con `retention=workqueue`, cada mensaje se **borra del stream en cuanto un consumer lo confirma**. Es el patrón de cola de tareas distribuida: los mensajes se reparten entre workers y desaparecen al procesarse, así el stream no crece sin fin."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"id": "c7c4a35f",
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"from nats.js.api import RetentionPolicy, StorageType\n",
|
||
"\n",
|
||
"await js.add_stream(name=\"DEMO_WQ\", subjects=[\"demo.wq.>\"],\n",
|
||
" storage=StorageType.FILE, retention=RetentionPolicy.WORK_QUEUE)\n",
|
||
"\n",
|
||
"# Encolar 5 tareas\n",
|
||
"for i in range(5):\n",
|
||
" await js.publish(\"demo.wq.tareas\", f\"tarea-{i}\".encode())\n",
|
||
"\n",
|
||
"antes = (await js.stream_info(\"DEMO_WQ\")).state.messages\n",
|
||
"print(f\"Tareas encoladas en el stream: {antes}\")\n",
|
||
"\n",
|
||
"# Un worker consume y confirma 3\n",
|
||
"wsub = await js.pull_subscribe(\"demo.wq.>\", durable=\"worker\")\n",
|
||
"tres = await wsub.fetch(3, timeout=2)\n",
|
||
"for m in tres:\n",
|
||
" print(f\" procesada: {m.data.decode()}\")\n",
|
||
" await m.ack() # al confirmar, JetStream BORRA el mensaje del stream\n",
|
||
"\n",
|
||
"await asyncio.sleep(0.3)\n",
|
||
"despues = (await js.stream_info(\"DEMO_WQ\")).state.messages\n",
|
||
"print()\n",
|
||
"print(f\"Mensajes restantes en el stream tras 3 acks: {despues} (workqueue borra lo confirmado: {antes} -> {despues})\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "5a68f26f",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 5 · Políticas de entrega (replay)\n",
|
||
"\n",
|
||
"Al crear un consumer se elige **desde dónde** empieza a leer (`DeliverPolicy`):\n",
|
||
"\n",
|
||
"- `ALL` — todo el historial desde el principio (lo habitual para reprocesar).\n",
|
||
"- `LAST` — solo el último mensaje del stream.\n",
|
||
"- `NEW` — solo lo que llegue a partir de ahora.\n",
|
||
"- `BY_START_SEQUENCE` / `BY_START_TIME` — desde una secuencia o instante concretos.\n",
|
||
"\n",
|
||
"Comparamos `ALL` vs `LAST` sobre el stream de límites (que tiene 6 mensajes)."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"id": "a341929e",
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"from nats.js.api import ConsumerConfig, DeliverPolicy\n",
|
||
"\n",
|
||
"# Consumer que reproduce TODO el historial\n",
|
||
"all_sub = await js.pull_subscribe(\n",
|
||
" \"demo.limits.>\", durable=\"replay-all\",\n",
|
||
" config=ConsumerConfig(deliver_policy=DeliverPolicy.ALL),\n",
|
||
")\n",
|
||
"todos = await all_sub.fetch(50, timeout=1)\n",
|
||
"print(f\"DeliverPolicy.ALL -> {len(todos)} mensajes:\", [m.data.decode() for m in todos])\n",
|
||
"for m in todos:\n",
|
||
" await m.ack()\n",
|
||
"\n",
|
||
"# Consumer que solo entrega el ÚLTIMO\n",
|
||
"last_sub = await js.pull_subscribe(\n",
|
||
" \"demo.limits.>\", durable=\"replay-last\",\n",
|
||
" config=ConsumerConfig(deliver_policy=DeliverPolicy.LAST),\n",
|
||
")\n",
|
||
"ultimo = await last_sub.fetch(50, timeout=1)\n",
|
||
"print(f\"DeliverPolicy.LAST -> {len(ultimo)} mensaje :\", [m.data.decode() for m in ultimo])\n",
|
||
"for m in ultimo:\n",
|
||
" await m.ack()"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "0fba1dc2",
|
||
"metadata": {},
|
||
"source": [
|
||
"## Parte B · Simulador de rendimiento (interactivo)\n",
|
||
"\n",
|
||
"Pulsa **▶ Ejecutar benchmark** y verás cómo **un publisher** inunda el broker con miles de mensajes que **varios subscribers** reciben simultáneamente (fan-out). La gráfica se actualiza **en movimiento** mientras corre:\n",
|
||
"\n",
|
||
"- **Izquierda** — mensajes acumulados: enviados (publisher) vs recibidos (suma de todos los subs).\n",
|
||
"- **Derecha** — throughput instantáneo (msgs/s recibidos) muestreado cada ~80 ms.\n",
|
||
"\n",
|
||
"Ajusta los sliders para cambiar el número de mensajes y de subscribers. Con más mensajes (p. ej. 100.000) la animación dura más y se aprecia mejor la curva."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"id": "4240cd89",
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"import ipywidgets as widgets\n",
|
||
"from IPython.display import display, clear_output\n",
|
||
"import matplotlib.pyplot as plt\n",
|
||
"import asyncio, time\n",
|
||
"import nats\n",
|
||
"\n",
|
||
"# --- widgets ---\n",
|
||
"n_msgs_w = widgets.IntSlider(value=20000, min=1000, max=100000, step=1000,\n",
|
||
" description=\"Mensajes:\", style={\"description_width\": \"initial\"},\n",
|
||
" layout=widgets.Layout(width=\"380px\"))\n",
|
||
"n_subs_w = widgets.IntSlider(value=3, min=1, max=8, step=1,\n",
|
||
" description=\"Subscribers:\", style={\"description_width\": \"initial\"})\n",
|
||
"run_btn = widgets.Button(description=\"▶ Ejecutar benchmark\", button_style=\"success\",\n",
|
||
" layout=widgets.Layout(width=\"220px\"))\n",
|
||
"plot_out = widgets.Output()\n",
|
||
"log_out = widgets.Output()\n",
|
||
"\n",
|
||
"SUBJECT = \"bench.load\"\n",
|
||
"PAYLOAD = b\"x\" * 128 # 128 bytes por mensaje\n",
|
||
"\n",
|
||
"def _throughput(ts, recv):\n",
|
||
" thr = [0.0]\n",
|
||
" for i in range(1, len(ts)):\n",
|
||
" dt = ts[i] - ts[i-1]\n",
|
||
" thr.append((recv[i] - recv[i-1]) / dt if dt > 0 else 0.0)\n",
|
||
" return thr\n",
|
||
"\n",
|
||
"def render(history, n_subs, n_msgs, done=False):\n",
|
||
" ts = [h[0] for h in history]\n",
|
||
" sent = [h[1] for h in history]\n",
|
||
" recv = [h[2] for h in history]\n",
|
||
" thr = _throughput(ts, recv)\n",
|
||
" with plot_out:\n",
|
||
" clear_output(wait=True)\n",
|
||
" fig, (a1, a2) = plt.subplots(1, 2, figsize=(11, 3.6))\n",
|
||
" a1.plot(ts, sent, label=\"enviados (pub)\", color=\"#2563eb\", lw=2)\n",
|
||
" a1.plot(ts, recv, label=f\"recibidos (Σ {n_subs} subs)\", color=\"#16a34a\", lw=2)\n",
|
||
" a1.set_xlabel(\"segundos\"); a1.set_ylabel(\"mensajes acumulados\")\n",
|
||
" a1.set_title(\"Publisher vs subscribers\"); a1.legend(loc=\"upper left\")\n",
|
||
" a2.plot(ts, thr, color=\"#db2777\", lw=2)\n",
|
||
" a2.set_xlabel(\"segundos\"); a2.set_ylabel(\"msgs/s recibidos\")\n",
|
||
" a2.set_title(\"Throughput instantáneo\")\n",
|
||
" estado = \"✓ DONE\" if done else \"● corriendo…\"\n",
|
||
" fig.suptitle(f\"[{estado}] {n_msgs:,} msgs → {n_subs} subs \"\n",
|
||
" f\"enviados={sent[-1]:,} recibidos={recv[-1]:,}\", fontsize=11)\n",
|
||
" plt.tight_layout(); plt.show()\n",
|
||
"\n",
|
||
"async def run_benchmark(n_msgs, n_subs, live=True):\n",
|
||
" \"\"\"1 publisher -> n_subs subscribers. Devuelve (history, counters).\"\"\"\n",
|
||
" nc = await nats.connect(NATS_URL, name=\"benchmark\")\n",
|
||
" counters = [0] * n_subs\n",
|
||
"\n",
|
||
" def make_cb(i):\n",
|
||
" async def cb(msg):\n",
|
||
" counters[i] += 1\n",
|
||
" return cb\n",
|
||
"\n",
|
||
" subs = [await nc.subscribe(SUBJECT, cb=make_cb(i)) for i in range(n_subs)]\n",
|
||
" history = [] # (t, enviados, recibidos_total)\n",
|
||
" sent = 0\n",
|
||
" t0 = time.monotonic()\n",
|
||
"\n",
|
||
" async def publish_all():\n",
|
||
" nonlocal sent\n",
|
||
" for k in range(n_msgs):\n",
|
||
" await nc.publish(SUBJECT, PAYLOAD)\n",
|
||
" sent += 1\n",
|
||
" if k % 1000 == 0:\n",
|
||
" await nc.flush()\n",
|
||
" await asyncio.sleep(0) # ceder al event loop (deja correr callbacks)\n",
|
||
" await nc.flush()\n",
|
||
"\n",
|
||
" task = asyncio.create_task(publish_all())\n",
|
||
"\n",
|
||
" # Muestreo para la gráfica en movimiento\n",
|
||
" while not task.done() or sum(counters) < sent:\n",
|
||
" await asyncio.sleep(0.08)\n",
|
||
" history.append((time.monotonic() - t0, sent, sum(counters)))\n",
|
||
" if live:\n",
|
||
" render(history, n_subs, n_msgs)\n",
|
||
" if time.monotonic() - t0 > 30: # tope de seguridad\n",
|
||
" break\n",
|
||
" await task\n",
|
||
"\n",
|
||
" # Drenaje final (que los callbacks alcancen al publisher)\n",
|
||
" for _ in range(40):\n",
|
||
" if sum(counters) >= sent:\n",
|
||
" break\n",
|
||
" await asyncio.sleep(0.05)\n",
|
||
" history.append((time.monotonic() - t0, sent, sum(counters)))\n",
|
||
" if live:\n",
|
||
" render(history, n_subs, n_msgs, done=True)\n",
|
||
"\n",
|
||
" for s in subs:\n",
|
||
" await s.unsubscribe()\n",
|
||
" await nc.drain()\n",
|
||
" return history, counters\n",
|
||
"\n",
|
||
"def on_click(_):\n",
|
||
" run_btn.disabled = True\n",
|
||
" with log_out:\n",
|
||
" clear_output()\n",
|
||
" print(f\"Lanzando: {n_msgs_w.value:,} mensajes → {n_subs_w.value} subscribers …\")\n",
|
||
" async def go():\n",
|
||
" try:\n",
|
||
" history, counters = await run_benchmark(n_msgs_w.value, n_subs_w.value, live=True)\n",
|
||
" dur = history[-1][0]\n",
|
||
" recv = sum(counters)\n",
|
||
" with log_out:\n",
|
||
" print(f\"OK en {dur:.2f}s\")\n",
|
||
" print(f\" enviados : {n_msgs_w.value:,}\")\n",
|
||
" print(f\" recibidos: {recv:,} (fan-out ×{n_subs_w.value} = {recv/max(n_msgs_w.value,1):.2f} por mensaje)\")\n",
|
||
" print(f\" throughput pub : {n_msgs_w.value/dur:,.0f} msgs/s\")\n",
|
||
" print(f\" throughput recv: {recv/dur:,.0f} msgs/s (entregas totales)\")\n",
|
||
" print(f\" por subscriber : {counters}\")\n",
|
||
" finally:\n",
|
||
" run_btn.disabled = False\n",
|
||
" asyncio.ensure_future(go())\n",
|
||
"\n",
|
||
"run_btn.on_click(on_click)\n",
|
||
"display(widgets.HBox([n_msgs_w, n_subs_w]), run_btn, plot_out, log_out)\n",
|
||
"print(\"Simulador listo. Pulsa el botón para lanzar el benchmark.\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "de3caaa5",
|
||
"metadata": {},
|
||
"source": [
|
||
"### Verificación (headless)\n",
|
||
"\n",
|
||
"La celda anterior renderiza el widget para pulsarlo en JupyterLab. Aquí ejecutamos el mismo benchmark **una vez de forma programática** (sin botón) para dejar evidencia ejecutada: una corrida real de 15.000 mensajes a 3 subscribers con su gráfica final."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"id": "34f06087",
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"hist, counters = await run_benchmark(15000, 3, live=False)\n",
|
||
"\n",
|
||
"ts = [h[0] for h in hist]\n",
|
||
"sent = [h[1] for h in hist]\n",
|
||
"recv = [h[2] for h in hist]\n",
|
||
"dur = ts[-1]\n",
|
||
"\n",
|
||
"fig, ax = plt.subplots(figsize=(9, 3.6))\n",
|
||
"ax.plot(ts, sent, label=\"enviados (pub)\", color=\"#2563eb\", lw=2)\n",
|
||
"ax.plot(ts, recv, label=\"recibidos (Σ 3 subs)\", color=\"#16a34a\", lw=2)\n",
|
||
"ax.set_xlabel(\"segundos\"); ax.set_ylabel(\"mensajes acumulados\")\n",
|
||
"ax.set_title(f\"Benchmark headless: 15.000 msgs → 3 subs en {dur:.2f}s\")\n",
|
||
"ax.legend(loc=\"upper left\")\n",
|
||
"plt.tight_layout(); plt.show()\n",
|
||
"\n",
|
||
"print(f\"enviados : 15,000\")\n",
|
||
"print(f\"recibidos: {sum(counters):,} por sub -> {counters}\")\n",
|
||
"print(f\"throughput pub : {15000/dur:,.0f} msgs/s\")\n",
|
||
"print(f\"throughput recv: {sum(counters)/dur:,.0f} msgs/s (entregas totales, fan-out x3)\")"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"id": "501f5185",
|
||
"metadata": {},
|
||
"source": [
|
||
"## Resumen\n",
|
||
"\n",
|
||
"**JetStream a fondo:**\n",
|
||
"- Un **stream** persiste mensajes con políticas de **storage** (file/memory), **retention** (limits/interest/workqueue) y **límites** (max_msgs/max_age).\n",
|
||
"- Los **consumers** (pull/push, durable/ephemeral) leen a su ritmo y **confirman** (ack) cada mensaje → entrega *at-least-once*.\n",
|
||
"- **Dedup** por `Nats-Msg-Id` evita duplicados por reintentos.\n",
|
||
"- **workqueue** borra cada mensaje al confirmarse → cola de trabajo.\n",
|
||
"- **DeliverPolicy** controla el replay (all/last/new/by_sequence/by_time).\n",
|
||
"\n",
|
||
"**Simulador:** demuestra el fan-out a escala — un publisher alimenta a N subscribers con miles de mensajes y la gráfica en vivo muestra que el throughput de recepción sigue al de envío (cada mensaje se entrega a los N subscribers).\n",
|
||
"\n",
|
||
"### Limpieza\n",
|
||
"\n",
|
||
"```python\n",
|
||
"for s in (\"DEMO_LIMITS\", \"DEMO_DEDUP\", \"DEMO_WQ\"):\n",
|
||
" try: await js.delete_stream(s)\n",
|
||
" except Exception: pass\n",
|
||
"await nc.drain()\n",
|
||
"```"
|
||
]
|
||
}
|
||
],
|
||
"metadata": {
|
||
"kernelspec": {
|
||
"display_name": "Python 3 (ipykernel)",
|
||
"language": "python",
|
||
"name": "python3"
|
||
}
|
||
},
|
||
"nbformat": 4,
|
||
"nbformat_minor": 5
|
||
}
|