{ "cells": [ { "cell_type": "markdown", "id": "72277c66", "metadata": {}, "source": [ "# NATS pub/sub — 04 · JetStream a fondo y simulador de rendimiento\n", "\n", "Este notebook tiene dos partes:\n", "\n", "1. **JetStream a fondo** — más allá del replay básico del notebook 02: anatomía de un stream (almacenamiento, políticas de retención, límites), tipos de consumer, *acks*, deduplicación y políticas de entrega.\n", "2. **Simulador de rendimiento interactivo** — un botón que, al pulsarlo, lanza un publisher que envía **miles de mensajes** a varios subscribers, con una **gráfica en movimiento** que muestra el throughput en tiempo real.\n", "\n", "> Requiere el broker `nats_demo` (arrancado por la primera celda) y `ipywidgets` (incluido en el venv del análisis)." ] }, { "cell_type": "markdown", "id": "530eac60", "metadata": {}, "source": [ "## Parte A · JetStream a fondo\n", "\n", "### Setup\n", "\n", "JetStream es la capa de persistencia de NATS. Mientras el core es *fire-and-forget*, JetStream **almacena** los mensajes en un *stream* y permite leerlos con *consumers* que controlan el ritmo, confirman (*ack*) cada mensaje y pueden reproducir el historial." ] }, { "cell_type": "code", "execution_count": 1, "id": "847ca130", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Broker: already-running\n", "JetStream context listo. account info:\n", " streams=1 consumers=1 memory=0 storage=260\n" ] } ], "source": [ "import subprocess, time, json\n", "\n", "NATS_CONTAINER = \"nats_demo\"\n", "NATS_PORT = 4222\n", "NATS_URL = f\"nats://127.0.0.1:{NATS_PORT}\"\n", "\n", "def _docker(*args, check=True):\n", " return subprocess.run([\"docker\", *args], capture_output=True, text=True, check=check)\n", "\n", "def ensure_nats(name=NATS_CONTAINER, port=NATS_PORT):\n", " \"\"\"Arranca un broker NATS en Docker de forma idempotente. Devuelve el estado.\"\"\"\n", " out = _docker(\"ps\", \"-a\", \"--filter\", f\"name=^{name}$\", \"--format\", \"{{.State}}\", check=False).stdout.strip()\n", " if out == \"running\":\n", " state = \"already-running\"\n", " elif out in (\"exited\", \"created\", \"paused\"):\n", " _docker(\"start\", name)\n", " state = \"started\"\n", " else:\n", " _docker(\"run\", \"-d\", \"--name\", name, \"-p\", f\"{port}:4222\", \"-p\", \"8222:8222\",\n", " \"nats:latest\", \"-js\", \"-m\", \"8222\")\n", " state = \"created\"\n", " time.sleep(1.0)\n", " return state\n", "\n", "import asyncio\n", "import nats\n", "\n", "print(\"Broker:\", ensure_nats())\n", "nc = await nats.connect(NATS_URL, name=\"notebook-04\")\n", "js = nc.jetstream()\n", "print(\"JetStream context listo. account info:\")\n", "ai = await js.account_info()\n", "print(f\" streams={ai.streams} consumers={ai.consumers} memory={ai.memory} storage={ai.storage}\")" ] }, { "cell_type": "markdown", "id": "b55873ec", "metadata": {}, "source": [ "### 1 · Anatomía de un stream\n", "\n", "Un **stream** se define por:\n", "\n", "- **subjects** — qué subjects captura (`pedidos.>`).\n", "- **storage** — `file` (persistente en disco) o `memory` (rápido, se pierde al reiniciar).\n", "- **retention** — cuándo se descartan los mensajes:\n", " - `limits` (por defecto): se guardan hasta tocar un límite (`max_msgs`, `max_bytes`, `max_age`).\n", " - `interest`: se descartan cuando todos los consumers interesados los han recibido.\n", " - `workqueue`: cada mensaje se borra en cuanto **un** consumer lo confirma (cola de trabajo).\n", "- **límites** — `max_msgs`, `max_bytes`, `max_age` (segundos), `max_msg_size`.\n", "- **duplicate_window** — ventana de deduplicación (ver §3).\n", "\n", "Creamos un stream `limits` con almacenamiento en disco y un tope de mensajes." ] }, { "cell_type": "code", "execution_count": null, "id": "67118a66", "metadata": {}, "outputs": [], "source": [ "from nats.js.api import StreamConfig, RetentionPolicy, StorageType, DiscardPolicy\n", "\n", "# Recrear limpio para que la demo sea determinista\n", "for s in (\"DEMO_LIMITS\", \"DEMO_DEDUP\", \"DEMO_WQ\"):\n", " try:\n", " await js.delete_stream(s)\n", " except Exception:\n", " pass\n", "\n", "cfg = StreamConfig(\n", " name=\"DEMO_LIMITS\",\n", " subjects=[\"demo.limits.>\"],\n", " storage=StorageType.FILE,\n", " retention=RetentionPolicy.LIMITS,\n", " max_msgs=1000, # tope de mensajes\n", " max_age=3600, # 1 hora (segundos)\n", " discard=DiscardPolicy.OLD, # al llegar al tope, descarta los más viejos\n", " duplicate_window=120, # ventana de dedup: 120 s\n", ")\n", "info = await js.add_stream(cfg)\n", "c = info.config\n", "print(\"Stream creado:\")\n", "print(f\" name : {c.name}\")\n", "print(f\" subjects : {c.subjects}\")\n", "print(f\" storage : {c.storage}\")\n", "print(f\" retention : {c.retention}\")\n", "print(f\" max_msgs : {c.max_msgs}\")\n", "print(f\" max_age (s) : {c.max_age}\")\n", "print(f\" discard : {c.discard}\")\n", "print(f\" dup_window (s): {c.duplicate_window}\")" ] }, { "cell_type": "markdown", "id": "9bf78aac", "metadata": {}, "source": [ "### 2 · Consumers: pull, durable, ack\n", "\n", "Un **consumer** es la vista de lectura sobre un stream. Dos ejes:\n", "\n", "- **pull vs push**: en *pull* el cliente pide mensajes cuando quiere (`fetch`); en *push* el servidor los empuja según llegan.\n", "- **durable vs ephemeral**: un consumer *durable* tiene nombre y **recuerda su posición** (cursor) entre reconexiones; uno *ephemeral* desaparece al cerrarse.\n", "\n", "El **ack** es la confirmación de procesado. Hasta que un mensaje no se confirma, el consumer lo considera *pendiente* y, si pasa el `ack_wait`, lo **reentrega**. Esto da entrega *at-least-once*." ] }, { "cell_type": "code", "execution_count": null, "id": "499e73f7", "metadata": {}, "outputs": [], "source": [ "# Publicar 6 mensajes en el stream de límites\n", "for i in range(6):\n", " await js.publish(\"demo.limits.eventos\", f\"evento-{i}\".encode())\n", "\n", "# Pull consumer DURABLE: recuerda su cursor entre fetches\n", "psub = await js.pull_subscribe(\"demo.limits.>\", durable=\"procesador-A\")\n", "\n", "# Traer 4 y confirmarlos (ack)\n", "batch = await psub.fetch(4, timeout=2)\n", "print(\"Primer fetch (4 msgs):\")\n", "for m in batch:\n", " print(f\" seq={m.metadata.sequence.stream} {m.data.decode()}\")\n", " await m.ack()\n", "\n", "# Estado del consumer: cuántos quedan pendientes / entregados\n", "ci = await psub.consumer_info()\n", "print()\n", "print(f\"Consumer 'procesador-A':\")\n", "print(f\" num_pending : {ci.num_pending} (mensajes sin entregar todavía)\")\n", "print(f\" num_ack_pending: {ci.num_ack_pending} (entregados sin ack)\")\n", "print(f\" delivered.stream_seq: {ci.delivered.stream_seq}\")\n", "\n", "# Segundo fetch: continúa donde se quedó (recuerda el cursor)\n", "batch2 = await psub.fetch(10, timeout=1)\n", "print()\n", "print(f\"Segundo fetch: {len(batch2)} msgs restantes ->\", [m.data.decode() for m in batch2])\n", "for m in batch2:\n", " await m.ack()" ] }, { "cell_type": "markdown", "id": "15b96e37", "metadata": {}, "source": [ "### 3 · Deduplicación por `Nats-Msg-Id`\n", "\n", "Si un publisher reintenta por un timeout de red, podría enviar el mismo mensaje dos veces. JetStream lo evita: si dos publicaciones llevan el mismo header **`Nats-Msg-Id`** dentro de la `duplicate_window`, la segunda se reconoce como **duplicada** y **no** se almacena. El `PubAck` lo indica con `duplicate=True`." ] }, { "cell_type": "code", "execution_count": null, "id": "fdc4c498", "metadata": {}, "outputs": [], "source": [ "await js.add_stream(name=\"DEMO_DEDUP\", subjects=[\"demo.dedup.>\"],\n", " storage=StorageType.FILE, duplicate_window=120)\n", "\n", "# Publicar dos veces el MISMO Nats-Msg-Id\n", "ack1 = await js.publish(\"demo.dedup.pago\", b\"cobro 50e\", headers={\"Nats-Msg-Id\": \"pago-0001\"})\n", "ack2 = await js.publish(\"demo.dedup.pago\", b\"cobro 50e\", headers={\"Nats-Msg-Id\": \"pago-0001\"})\n", "\n", "print(f\"1a publicacion: seq={ack1.seq} duplicate={ack1.duplicate}\")\n", "print(f\"2a publicacion: seq={ack2.seq} duplicate={ack2.duplicate} <- detectada como duplicado\")\n", "\n", "# Un Msg-Id distinto sí se almacena\n", "ack3 = await js.publish(\"demo.dedup.pago\", b\"cobro 30e\", headers={\"Nats-Msg-Id\": \"pago-0002\"})\n", "print(f\"3a publicacion (id nuevo): seq={ack3.seq} duplicate={ack3.duplicate}\")\n", "\n", "st = (await js.stream_info(\"DEMO_DEDUP\")).state\n", "print()\n", "print(f\"Mensajes realmente almacenados en el stream: {st.messages} (2 publicaciones unicas, 1 descartada)\")" ] }, { "cell_type": "markdown", "id": "da997d2e", "metadata": {}, "source": [ "### 4 · Retención `workqueue`: la cola de trabajo\n", "\n", "Con `retention=workqueue`, cada mensaje se **borra del stream en cuanto un consumer lo confirma**. Es el patrón de cola de tareas distribuida: los mensajes se reparten entre workers y desaparecen al procesarse, así el stream no crece sin fin." ] }, { "cell_type": "code", "execution_count": null, "id": "c7c4a35f", "metadata": {}, "outputs": [], "source": [ "from nats.js.api import RetentionPolicy, StorageType\n", "\n", "await js.add_stream(name=\"DEMO_WQ\", subjects=[\"demo.wq.>\"],\n", " storage=StorageType.FILE, retention=RetentionPolicy.WORK_QUEUE)\n", "\n", "# Encolar 5 tareas\n", "for i in range(5):\n", " await js.publish(\"demo.wq.tareas\", f\"tarea-{i}\".encode())\n", "\n", "antes = (await js.stream_info(\"DEMO_WQ\")).state.messages\n", "print(f\"Tareas encoladas en el stream: {antes}\")\n", "\n", "# Un worker consume y confirma 3\n", "wsub = await js.pull_subscribe(\"demo.wq.>\", durable=\"worker\")\n", "tres = await wsub.fetch(3, timeout=2)\n", "for m in tres:\n", " print(f\" procesada: {m.data.decode()}\")\n", " await m.ack() # al confirmar, JetStream BORRA el mensaje del stream\n", "\n", "await asyncio.sleep(0.3)\n", "despues = (await js.stream_info(\"DEMO_WQ\")).state.messages\n", "print()\n", "print(f\"Mensajes restantes en el stream tras 3 acks: {despues} (workqueue borra lo confirmado: {antes} -> {despues})\")" ] }, { "cell_type": "markdown", "id": "5a68f26f", "metadata": {}, "source": [ "### 5 · Políticas de entrega (replay)\n", "\n", "Al crear un consumer se elige **desde dónde** empieza a leer (`DeliverPolicy`):\n", "\n", "- `ALL` — todo el historial desde el principio (lo habitual para reprocesar).\n", "- `LAST` — solo el último mensaje del stream.\n", "- `NEW` — solo lo que llegue a partir de ahora.\n", "- `BY_START_SEQUENCE` / `BY_START_TIME` — desde una secuencia o instante concretos.\n", "\n", "Comparamos `ALL` vs `LAST` sobre el stream de límites (que tiene 6 mensajes)." ] }, { "cell_type": "code", "execution_count": null, "id": "a341929e", "metadata": {}, "outputs": [], "source": [ "from nats.js.api import ConsumerConfig, DeliverPolicy\n", "\n", "# Consumer que reproduce TODO el historial\n", "all_sub = await js.pull_subscribe(\n", " \"demo.limits.>\", durable=\"replay-all\",\n", " config=ConsumerConfig(deliver_policy=DeliverPolicy.ALL),\n", ")\n", "todos = await all_sub.fetch(50, timeout=1)\n", "print(f\"DeliverPolicy.ALL -> {len(todos)} mensajes:\", [m.data.decode() for m in todos])\n", "for m in todos:\n", " await m.ack()\n", "\n", "# Consumer que solo entrega el ÚLTIMO\n", "last_sub = await js.pull_subscribe(\n", " \"demo.limits.>\", durable=\"replay-last\",\n", " config=ConsumerConfig(deliver_policy=DeliverPolicy.LAST),\n", ")\n", "ultimo = await last_sub.fetch(50, timeout=1)\n", "print(f\"DeliverPolicy.LAST -> {len(ultimo)} mensaje :\", [m.data.decode() for m in ultimo])\n", "for m in ultimo:\n", " await m.ack()" ] }, { "cell_type": "markdown", "id": "0fba1dc2", "metadata": {}, "source": [ "## Parte B · Simulador de rendimiento (interactivo)\n", "\n", "Pulsa **▶ Ejecutar benchmark** y verás cómo **un publisher** inunda el broker con miles de mensajes que **varios subscribers** reciben simultáneamente (fan-out). La gráfica se actualiza **en movimiento** mientras corre:\n", "\n", "- **Izquierda** — mensajes acumulados: enviados (publisher) vs recibidos (suma de todos los subs).\n", "- **Derecha** — throughput instantáneo (msgs/s recibidos) muestreado cada ~80 ms.\n", "\n", "Ajusta los sliders para cambiar el número de mensajes y de subscribers. Con más mensajes (p. ej. 100.000) la animación dura más y se aprecia mejor la curva." ] }, { "cell_type": "code", "execution_count": null, "id": "4240cd89", "metadata": {}, "outputs": [], "source": [ "import ipywidgets as widgets\n", "from IPython.display import display, clear_output\n", "import matplotlib.pyplot as plt\n", "import asyncio, time\n", "import nats\n", "\n", "# --- widgets ---\n", "n_msgs_w = widgets.IntSlider(value=20000, min=1000, max=100000, step=1000,\n", " description=\"Mensajes:\", style={\"description_width\": \"initial\"},\n", " layout=widgets.Layout(width=\"380px\"))\n", "n_subs_w = widgets.IntSlider(value=3, min=1, max=8, step=1,\n", " description=\"Subscribers:\", style={\"description_width\": \"initial\"})\n", "run_btn = widgets.Button(description=\"▶ Ejecutar benchmark\", button_style=\"success\",\n", " layout=widgets.Layout(width=\"220px\"))\n", "plot_out = widgets.Output()\n", "log_out = widgets.Output()\n", "\n", "SUBJECT = \"bench.load\"\n", "PAYLOAD = b\"x\" * 128 # 128 bytes por mensaje\n", "\n", "def _throughput(ts, recv):\n", " thr = [0.0]\n", " for i in range(1, len(ts)):\n", " dt = ts[i] - ts[i-1]\n", " thr.append((recv[i] - recv[i-1]) / dt if dt > 0 else 0.0)\n", " return thr\n", "\n", "def render(history, n_subs, n_msgs, done=False):\n", " ts = [h[0] for h in history]\n", " sent = [h[1] for h in history]\n", " recv = [h[2] for h in history]\n", " thr = _throughput(ts, recv)\n", " with plot_out:\n", " clear_output(wait=True)\n", " fig, (a1, a2) = plt.subplots(1, 2, figsize=(11, 3.6))\n", " a1.plot(ts, sent, label=\"enviados (pub)\", color=\"#2563eb\", lw=2)\n", " a1.plot(ts, recv, label=f\"recibidos (Σ {n_subs} subs)\", color=\"#16a34a\", lw=2)\n", " a1.set_xlabel(\"segundos\"); a1.set_ylabel(\"mensajes acumulados\")\n", " a1.set_title(\"Publisher vs subscribers\"); a1.legend(loc=\"upper left\")\n", " a2.plot(ts, thr, color=\"#db2777\", lw=2)\n", " a2.set_xlabel(\"segundos\"); a2.set_ylabel(\"msgs/s recibidos\")\n", " a2.set_title(\"Throughput instantáneo\")\n", " estado = \"✓ DONE\" if done else \"● corriendo…\"\n", " fig.suptitle(f\"[{estado}] {n_msgs:,} msgs → {n_subs} subs \"\n", " f\"enviados={sent[-1]:,} recibidos={recv[-1]:,}\", fontsize=11)\n", " plt.tight_layout(); plt.show()\n", "\n", "async def run_benchmark(n_msgs, n_subs, live=True):\n", " \"\"\"1 publisher -> n_subs subscribers. Devuelve (history, counters).\"\"\"\n", " nc = await nats.connect(NATS_URL, name=\"benchmark\")\n", " counters = [0] * n_subs\n", "\n", " def make_cb(i):\n", " async def cb(msg):\n", " counters[i] += 1\n", " return cb\n", "\n", " subs = [await nc.subscribe(SUBJECT, cb=make_cb(i)) for i in range(n_subs)]\n", " history = [] # (t, enviados, recibidos_total)\n", " sent = 0\n", " t0 = time.monotonic()\n", "\n", " async def publish_all():\n", " nonlocal sent\n", " for k in range(n_msgs):\n", " await nc.publish(SUBJECT, PAYLOAD)\n", " sent += 1\n", " if k % 1000 == 0:\n", " await nc.flush()\n", " await asyncio.sleep(0) # ceder al event loop (deja correr callbacks)\n", " await nc.flush()\n", "\n", " task = asyncio.create_task(publish_all())\n", "\n", " # Muestreo para la gráfica en movimiento\n", " while not task.done() or sum(counters) < sent:\n", " await asyncio.sleep(0.08)\n", " history.append((time.monotonic() - t0, sent, sum(counters)))\n", " if live:\n", " render(history, n_subs, n_msgs)\n", " if time.monotonic() - t0 > 30: # tope de seguridad\n", " break\n", " await task\n", "\n", " # Drenaje final (que los callbacks alcancen al publisher)\n", " for _ in range(40):\n", " if sum(counters) >= sent:\n", " break\n", " await asyncio.sleep(0.05)\n", " history.append((time.monotonic() - t0, sent, sum(counters)))\n", " if live:\n", " render(history, n_subs, n_msgs, done=True)\n", "\n", " for s in subs:\n", " await s.unsubscribe()\n", " await nc.drain()\n", " return history, counters\n", "\n", "def on_click(_):\n", " run_btn.disabled = True\n", " with log_out:\n", " clear_output()\n", " print(f\"Lanzando: {n_msgs_w.value:,} mensajes → {n_subs_w.value} subscribers …\")\n", " async def go():\n", " try:\n", " history, counters = await run_benchmark(n_msgs_w.value, n_subs_w.value, live=True)\n", " dur = history[-1][0]\n", " recv = sum(counters)\n", " with log_out:\n", " print(f\"OK en {dur:.2f}s\")\n", " print(f\" enviados : {n_msgs_w.value:,}\")\n", " print(f\" recibidos: {recv:,} (fan-out ×{n_subs_w.value} = {recv/max(n_msgs_w.value,1):.2f} por mensaje)\")\n", " print(f\" throughput pub : {n_msgs_w.value/dur:,.0f} msgs/s\")\n", " print(f\" throughput recv: {recv/dur:,.0f} msgs/s (entregas totales)\")\n", " print(f\" por subscriber : {counters}\")\n", " finally:\n", " run_btn.disabled = False\n", " asyncio.ensure_future(go())\n", "\n", "run_btn.on_click(on_click)\n", "display(widgets.HBox([n_msgs_w, n_subs_w]), run_btn, plot_out, log_out)\n", "print(\"Simulador listo. Pulsa el botón para lanzar el benchmark.\")" ] }, { "cell_type": "markdown", "id": "de3caaa5", "metadata": {}, "source": [ "### Verificación (headless)\n", "\n", "La celda anterior renderiza el widget para pulsarlo en JupyterLab. Aquí ejecutamos el mismo benchmark **una vez de forma programática** (sin botón) para dejar evidencia ejecutada: una corrida real de 15.000 mensajes a 3 subscribers con su gráfica final." ] }, { "cell_type": "code", "execution_count": null, "id": "34f06087", "metadata": {}, "outputs": [], "source": [ "hist, counters = await run_benchmark(15000, 3, live=False)\n", "\n", "ts = [h[0] for h in hist]\n", "sent = [h[1] for h in hist]\n", "recv = [h[2] for h in hist]\n", "dur = ts[-1]\n", "\n", "fig, ax = plt.subplots(figsize=(9, 3.6))\n", "ax.plot(ts, sent, label=\"enviados (pub)\", color=\"#2563eb\", lw=2)\n", "ax.plot(ts, recv, label=\"recibidos (Σ 3 subs)\", color=\"#16a34a\", lw=2)\n", "ax.set_xlabel(\"segundos\"); ax.set_ylabel(\"mensajes acumulados\")\n", "ax.set_title(f\"Benchmark headless: 15.000 msgs → 3 subs en {dur:.2f}s\")\n", "ax.legend(loc=\"upper left\")\n", "plt.tight_layout(); plt.show()\n", "\n", "print(f\"enviados : 15,000\")\n", "print(f\"recibidos: {sum(counters):,} por sub -> {counters}\")\n", "print(f\"throughput pub : {15000/dur:,.0f} msgs/s\")\n", "print(f\"throughput recv: {sum(counters)/dur:,.0f} msgs/s (entregas totales, fan-out x3)\")" ] }, { "cell_type": "markdown", "id": "501f5185", "metadata": {}, "source": [ "## Resumen\n", "\n", "**JetStream a fondo:**\n", "- Un **stream** persiste mensajes con políticas de **storage** (file/memory), **retention** (limits/interest/workqueue) y **límites** (max_msgs/max_age).\n", "- Los **consumers** (pull/push, durable/ephemeral) leen a su ritmo y **confirman** (ack) cada mensaje → entrega *at-least-once*.\n", "- **Dedup** por `Nats-Msg-Id` evita duplicados por reintentos.\n", "- **workqueue** borra cada mensaje al confirmarse → cola de trabajo.\n", "- **DeliverPolicy** controla el replay (all/last/new/by_sequence/by_time).\n", "\n", "**Simulador:** demuestra el fan-out a escala — un publisher alimenta a N subscribers con miles de mensajes y la gráfica en vivo muestra que el throughput de recepción sigue al de envío (cada mensaje se entrega a los N subscribers).\n", "\n", "### Limpieza\n", "\n", "```python\n", "for s in (\"DEMO_LIMITS\", \"DEMO_DEDUP\", \"DEMO_WQ\"):\n", " try: await js.delete_stream(s)\n", " except Exception: pass\n", "await nc.drain()\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }