#!/usr/bin/env python3 """Generador de los notebooks del analisis NATS. Construye los tres .ipynb con nbformat (sin ejecutar). La ejecucion se hace despues a traves del MCP de Jupyter, contra el kernel del servidor, para que los outputs queden persistidos y visibles en JupyterLab. Reproducible: re-ejecutar este script regenera los notebooks desde cero. """ from pathlib import Path import nbformat as nbf NBDIR = Path("/home/enmanuel/fn_registry/analysis/nats/notebooks") PROCS_ABS = str(NBDIR / "procs") def build(filename: str, cells: list[tuple[str, str]]) -> None: nb = nbf.v4.new_notebook() nb.metadata["kernelspec"] = { "name": "python3", "display_name": "Python 3 (ipykernel)", "language": "python", } nb.cells = [ nbf.v4.new_markdown_cell(src) if typ == "md" else nbf.v4.new_code_cell(src) for typ, src in cells ] nbf.write(nb, str(NBDIR / filename)) print(f"escrito {filename}: {len(cells)} celdas") # Bloque de codigo reutilizado al inicio de cada notebook para arrancar el broker. ENSURE_NATS = '''import subprocess, time, json NATS_CONTAINER = "nats_demo" NATS_PORT = 4222 NATS_URL = f"nats://127.0.0.1:{NATS_PORT}" def _docker(*args, check=True): return subprocess.run(["docker", *args], capture_output=True, text=True, check=check) def ensure_nats(name=NATS_CONTAINER, port=NATS_PORT): """Arranca un broker NATS en Docker de forma idempotente. Devuelve el estado.""" out = _docker("ps", "-a", "--filter", f"name=^{name}$", "--format", "{{.State}}", check=False).stdout.strip() if out == "running": state = "already-running" elif out in ("exited", "created", "paused"): _docker("start", name) state = "started" else: _docker("run", "-d", "--name", name, "-p", f"{port}:4222", "-p", "8222:8222", "nats:latest", "-js", "-m", "8222") state = "created" time.sleep(1.0) return state''' # ---------------------------------------------------------------------------- # Notebook 01 — Core pub/sub y wildcards # ---------------------------------------------------------------------------- nb1 = [ ("md", """# NATS pub/sub — 01 · Core publish/subscribe y wildcards **NATS** es un sistema de mensajería ligero y de alto rendimiento orientado a la comunicación entre procesos (microservicios, IoT, pipelines de datos). Su modelo central es **publish/subscribe** sobre *subjects*. ### Conceptos clave - **Subject**: una cadena jerárquica separada por puntos, por ejemplo `pedidos.creados` o `sensor.cocina.temp`. Es la *dirección* de un mensaje. No hay que declararlo: existe en cuanto alguien publica o se suscribe. - **Publisher**: un proceso que envía un mensaje a un subject. No sabe quién lo va a recibir (ni si alguien lo recibirá). - **Subscriber**: un proceso que expresa interés en uno o varios subjects. Recibe todos los mensajes publicados en ellos mientras esté conectado. - **Broker (`nats-server`)**: el proceso central que enruta cada mensaje publicado a todos los subscribers interesados. Desacopla a publishers de subscribers: ninguno conoce la dirección de red del otro, solo el broker y el subject. ### Garantías del *core* NATS El core es **fire-and-forget** (*at-most-once*): si en el momento de publicar no hay ningún subscriber conectado a ese subject, el mensaje se descarta. No hay persistencia ni reintentos. Esto lo hace extremadamente rápido. Cuando se necesita persistencia y *replay* se usa **JetStream** (notebook 02). En este notebook levantamos un broker en Docker y demostramos: conexión, pub/sub básico, *fan-out* a varios subscribers y *wildcards*."""), ("md", """## 0 · Entorno: el broker NATS en Docker Levantamos `nats:latest` con el flag `-js` (habilita JetStream, que usaremos en el notebook 02) y publicamos el puerto estándar `4222` en el host. La función `ensure_nats` es **idempotente**: si el contenedor ya existe lo reutiliza, si está parado lo arranca, y solo lo crea si no existe. Así los tres notebooks de este análisis comparten el mismo broker."""), ("code", ENSURE_NATS + ''' state = ensure_nats() print(f"Broker NATS: {state} en {NATS_URL}") # El endpoint de monitorización (puerto 8222) expone métricas del servidor en JSON import urllib.request varz = json.loads(urllib.request.urlopen("http://127.0.0.1:8222/varz", timeout=3).read()) print(f"nats-server version: {varz['version']} | jetstream activo: {bool(varz.get('jetstream'))}")'''), ("md", """## 1 · Conectar un cliente Usamos `nats-py`, el cliente oficial para Python basado en `asyncio`. IPython permite usar `await` directamente en las celdas (*top-level await*), así que mantenemos la conexión `nc` viva en una variable global y la reutilizamos a lo largo del notebook, igual que haría un proceso real."""), ("code", '''import asyncio import nats # Conexión persistente del notebook (simula el cliente de un proceso de larga vida) nc = await nats.connect(NATS_URL, name="notebook-01") info = nc._server_info # metadata que el broker envía en el handshake print("Conectado.") print(f" server_id : {info['server_id']}") print(f" max_payload: {info['max_payload']/1024/1024:.0f} MiB por mensaje") print(f" client_id : {info['client_id']}")'''), ("md", """## 2 · Publish/Subscribe básico El patrón mínimo: un subscriber declara interés en un subject, un publisher envía un mensaje, el broker lo entrega. Aquí usamos una **suscripción síncrona** (`sub.next_msg()`), cómoda para ir paso a paso en un notebook: pedimos explícitamente el siguiente mensaje. El payload siempre viaja como `bytes` — NATS no impone formato; aquí codificamos texto UTF-8."""), ("code", '''# El subscriber declara interés ANTES de que se publique (core = at-most-once) sub = await nc.subscribe("saludos") # El publisher envía. No sabe quién escucha; solo conoce el subject. await nc.publish("saludos", b"hola desde el publisher") await nc.flush() # fuerza el envío al broker antes de seguir # El subscriber recoge el mensaje msg = await sub.next_msg(timeout=2) print(f"subject recibido : {msg.subject}") print(f"payload : {msg.data.decode()}") await sub.unsubscribe()'''), ("md", """## 3 · Fan-out: un publisher, N subscribers La potencia del pub/sub es el **fan-out**: cuando varios subscribers están interesados en el mismo subject, el broker entrega una **copia a cada uno**. El publisher hace *una* llamada `publish` y no cambia nada en su código aunque haya 1 o 1000 subscribers. Aquí registramos 3 subscribers al subject `noticias` mediante *callbacks* asíncronos (cada uno simula un proceso distinto) y publicamos un único mensaje."""), ("code", '''recibidos = [] # registro de quién recibió qué def make_handler(nombre): async def handler(msg): recibidos.append({"subscriber": nombre, "subject": msg.subject, "data": msg.data.decode()}) return handler # Tres subscribers independientes al MISMO subject subs = [] for nombre in ["sub-A", "sub-B", "sub-C"]: s = await nc.subscribe("noticias", cb=make_handler(nombre)) subs.append(s) # Un único publish... await nc.publish("noticias", b"titular: NATS entrega a todos") await nc.flush() await asyncio.sleep(0.3) # dar tiempo a los callbacks for r in recibidos: print(f"{r['subscriber']} <- [{r['subject']}] {r['data']}") print() print(f"Un publish -> {len(recibidos)} entregas (fan-out)") for s in subs: await s.unsubscribe()'''), ("md", """## 4 · Wildcards: `*` y `>` Los subjects son jerárquicos y los subscribers pueden usar comodines para suscribirse a familias enteras de subjects: - **`*`** (asterisco) — comodín de **un único token**. `sensor.*.temp` casa con `sensor.cocina.temp` y `sensor.salon.temp`, pero **no** con `sensor.cocina.planta1.temp`. - **`>`** (mayor que) — comodín de **uno o más tokens** hasta el final. `sensor.>` casa con `sensor.cocina.temp`, `sensor.salon.humedad`, `sensor.garaje.puerta.estado`... Esto permite que un proceso se suscriba a "todo lo de los sensores" o "la temperatura de cualquier habitación" sin conocer de antemano qué subjects concretos existirán."""), ("code", '''wild = [] async def on_star(msg): wild.append({"patron": "sensor.*.temp", "subject": msg.subject, "data": msg.data.decode()}) async def on_gt(msg): wild.append({"patron": "sensor.>", "subject": msg.subject, "data": msg.data.decode()}) s_star = await nc.subscribe("sensor.*.temp", cb=on_star) s_gt = await nc.subscribe("sensor.>", cb=on_gt) # Publicamos en varios subjects concretos publicados = { "sensor.cocina.temp": "21.5", "sensor.salon.temp": "22.1", "sensor.salon.humedad": "48", "sensor.garaje.puerta.estado": "abierta", } for subj, val in publicados.items(): await nc.publish(subj, val.encode()) await nc.flush() await asyncio.sleep(0.3) print("Mensajes publicados:", list(publicados)) print() for w in wild: print(f"[{w['patron']:14}] casó {w['subject']}") await s_star.unsubscribe(); await s_gt.unsubscribe()'''), ("md", """## 5 · Visualización: qué patrón casó qué subject Una matriz `subject × patrón` deja claro el alcance de cada comodín: `sensor.>` captura los cuatro subjects; `sensor.*.temp` solo las dos temperaturas de un nivel (no la humedad, que no es `temp`, ni la del garaje, que tiene un token de más)."""), ("code", '''import pandas as pd import matplotlib.pyplot as plt import numpy as np patrones = ["sensor.*.temp", "sensor.>"] subjects = list(publicados) # Construir matriz de coincidencias a partir de lo que recibió cada suscripción M = np.zeros((len(patrones), len(subjects)), dtype=int) for w in wild: i = patrones.index(w["patron"]) j = subjects.index(w["subject"]) M[i, j] = 1 fig, ax = plt.subplots(figsize=(9, 2.6)) ax.imshow(M, cmap="Greens", vmin=0, vmax=1, aspect="auto") ax.set_xticks(range(len(subjects))); ax.set_xticklabels(subjects, rotation=25, ha="right", fontsize=9) ax.set_yticks(range(len(patrones))); ax.set_yticklabels(patrones, fontsize=10) for i in range(len(patrones)): for j in range(len(subjects)): ax.text(j, i, "OK" if M[i, j] else "-", ha="center", va="center", color="white" if M[i, j] else "#999", fontsize=12) ax.set_title("Coincidencia de wildcards (OK = el subscriber recibió el mensaje)") plt.tight_layout(); plt.show() pd.DataFrame(M, index=patrones, columns=subjects)'''), ("md", """## Resumen - El **broker** desacopla publishers y subscribers: solo comparten el *subject*. - El core es **fire-and-forget**: sin subscriber conectado, el mensaje se pierde. - **Fan-out** automático: una publicación llega a todos los subscribers interesados. - **Wildcards** `*` (un token) y `>` (resto de la jerarquía) permiten suscribirse a familias de subjects. **Siguiente** → `02_queue_request_jetstream.ipynb`: repartir carga entre workers (*queue groups*), RPC (*request/reply*) y mensajería **persistente** con JetStream. > La conexión `nc` y el contenedor `nats_demo` siguen vivos para los siguientes notebooks."""), ] # ---------------------------------------------------------------------------- # Notebook 02 — Queue groups, Request/Reply, JetStream # ---------------------------------------------------------------------------- nb2 = [ ("md", """# NATS pub/sub — 02 · Queue groups, Request/Reply y JetStream En el notebook 01 vimos el *fan-out* del core: una publicación llega a **todos** los subscribers. Aquí cubrimos tres patrones que construyen sobre esa base: 1. **Queue groups** — repartir la carga: varios *workers* comparten el trabajo y cada mensaje lo procesa **uno solo**. 2. **Request/Reply** — RPC sobre mensajería: un cliente pregunta y espera la respuesta de un servicio. 3. **JetStream** — la capa de **persistencia**: streams que almacenan los mensajes y permiten *replay*, a diferencia del core *fire-and-forget*. > Requiere el broker `nats_demo` del notebook 01. La primera celda lo arranca de forma idempotente, así que este notebook también funciona de forma aislada."""), ("md", """## 0 · Setup: broker + conexión Reutilizamos el mismo broker en Docker. `ensure_nats` es idempotente; si el contenedor sigue vivo del notebook 01, simplemente se reaprovecha."""), ("code", ENSURE_NATS + ''' import asyncio import nats print("Broker:", ensure_nats()) nc = await nats.connect(NATS_URL, name="notebook-02") print("Conectado, client_id:", nc._server_info["client_id"])'''), ("md", """## 1 · Queue groups — reparto de carga entre workers Un **queue group** convierte el fan-out en una **cola de trabajo**. Varios subscribers se suscriben al mismo subject pero declarando el mismo nombre de *queue*. El broker entonces entrega cada mensaje a **exactamente uno** de los miembros del grupo (balanceo de carga), en lugar de a todos. Es el patrón de los *worker pools*: escalas el procesamiento añadiendo más workers al grupo, sin tocar al publisher. Si un worker cae, los demás siguen recibiendo. Aquí lanzamos 3 workers en el queue `procesadores` y publicamos 12 tareas."""), ("code", '''from collections import Counter trabajo = Counter() # cuántas tareas procesó cada worker orden = [] # traza temporal (worker, tarea) def make_worker(nombre): async def worker(msg): tarea = msg.data.decode() trabajo[nombre] += 1 orden.append((nombre, tarea)) return worker # 3 workers, MISMO subject, MISMO queue group -> NATS reparte workers = [] for nombre in ["worker-1", "worker-2", "worker-3"]: s = await nc.subscribe("tareas", queue="procesadores", cb=make_worker(nombre)) workers.append(s) # Publicar 12 tareas for i in range(12): await nc.publish("tareas", f"tarea-{i:02d}".encode()) await nc.flush() await asyncio.sleep(0.5) print("Reparto de carga (cada tarea fue a UN solo worker):") for w, n in sorted(trabajo.items()): print(f" {w}: {n} tareas") print(f" TOTAL procesado: {sum(trabajo.values())} de 12 tareas") for s in workers: await s.unsubscribe()'''), ("md", """### Visualización del reparto El total siempre suma 12 (ninguna tarea se duplica ni se pierde), repartido de forma aproximadamente equilibrada entre los workers."""), ("code", '''import matplotlib.pyplot as plt nombres = [f"worker-{i}" for i in (1, 2, 3)] valores = [trabajo.get(n, 0) for n in nombres] fig, ax = plt.subplots(figsize=(7, 3)) barras = ax.bar(nombres, valores, color=["#2563eb", "#16a34a", "#db2777"]) ax.bar_label(barras, padding=3) ax.set_ylabel("tareas procesadas") ax.set_title(f"Queue group 'procesadores' — 12 tareas repartidas entre {len(nombres)} workers") ax.set_ylim(0, max(valores) + 2) plt.tight_layout(); plt.show()'''), ("md", """## 2 · Request/Reply — RPC sobre NATS NATS implementa **petición/respuesta** sobre el mismo modelo pub/sub. El cliente usa `nc.request(subject, datos)`: por debajo, NATS crea un subject de respuesta temporal único (un *inbox*), lo adjunta al mensaje y espera la primera respuesta que llegue a ese inbox. El servicio se suscribe al subject, procesa, y responde con `msg.respond(datos)`. Esto da RPC con descubrimiento automático (el cliente no conoce la dirección del servicio, solo el subject) y *timeouts* integrados. Si varios servicios escuchan en un queue group, además se balancea la carga de las peticiones."""), ("code", '''import nats.errors # Servicio: convierte el texto recibido a mayúsculas y responde async def servicio_mayusculas(msg): respuesta = msg.data.decode().upper() await msg.respond(respuesta.encode()) sub_svc = await nc.subscribe("servicio.mayusculas", cb=servicio_mayusculas) # Cliente: pide y espera respuesta (con timeout) peticiones = ["hola mundo", "nats request reply", "desacople total"] for texto in peticiones: resp = await nc.request("servicio.mayusculas", texto.encode(), timeout=1.0) print(f" peticion : {texto!r}") print(f" respuesta: {resp.data.decode()!r} (vino por el inbox {resp.subject})") print() # ¿Qué pasa si nadie escucha el subject? El broker avisa al instante con # NoRespondersError (no hace falta esperar al timeout completo). try: await nc.request("servicio.inexistente", b"hay alguien?", timeout=0.5) except nats.errors.NoRespondersError: print("servicio.inexistente -> NoRespondersError: el broker confirma que no hay ningún servicio escuchando") except (nats.errors.TimeoutError, asyncio.TimeoutError): print("servicio.inexistente -> TimeoutError: nadie respondió a tiempo") await sub_svc.unsubscribe()'''), ("md", """## 3 · JetStream — persistencia y replay Todo lo anterior es **efímero**: si no hay nadie escuchando en el instante exacto de la publicación, el mensaje se pierde. **JetStream** añade una capa de almacenamiento: - Un **stream** captura y persiste todos los mensajes de unos subjects dados. - Los **consumers** leen del stream a su ritmo, pueden ser **durables** (recuerdan por dónde iban) y permiten **replay** de mensajes históricos. Demostramos la diferencia clave con el core: publicamos a un stream **sin ningún consumidor activo** y, *después*, creamos un consumidor que recupera todos esos mensajes."""), ("code", '''js = nc.jetstream() # Crear (o recrear limpio) un stream que persiste todo lo de 'eventos.>' try: await js.delete_stream("EVENTOS") except Exception: pass stream = await js.add_stream(name="EVENTOS", subjects=["eventos.>"]) print(f"Stream creado: {stream.config.name} subjects={stream.config.subjects} storage={stream.config.storage}") # Publicar 5 eventos SIN que haya ningún consumidor escuchando todavía for i in range(5): ack = await js.publish("eventos.click", f"evento #{i}".encode()) print(f" publicado eventos.click -> stream='{ack.stream}' seq={ack.seq}") estado = (await js.stream_info("EVENTOS")).state print() print(f"Mensajes retenidos en el stream: {estado.messages} (siguen ahí aunque nadie los haya leído)")'''), ("md", """### Replay: leer los mensajes históricos Ahora creamos un **consumer durable** (`lector-eventos`) y hacemos *fetch*. Recuperamos los 5 mensajes que se publicaron **antes** de que este consumidor existiera — algo imposible con el core NATS."""), ("code", '''# Pull consumer durable: pedimos los mensajes bajo demanda psub = await js.pull_subscribe("eventos.>", durable="lector-eventos") mensajes = await psub.fetch(5, timeout=2) print(f"Recuperados {len(mensajes)} mensajes del stream (replay):") for m in mensajes: print(f" seq={m.metadata.sequence.stream} subject={m.subject} data={m.data.decode()!r}") await m.ack() # confirmamos el procesado; el durable avanza su cursor # Tras el ack, un segundo fetch no devuelve nada nuevo (cursor avanzado) try: extra = await psub.fetch(1, timeout=1) except Exception: extra = [] print() print(f"Segundo fetch tras ack: {len(extra)} mensajes (el durable recuerda que ya los leyó)")'''), ("md", """## Resumen | Patrón | Entrega | Persistencia | Caso de uso | |---|---|---|---| | Core pub/sub (nb 01) | a **todos** los subscribers | no (at-most-once) | eventos en vivo, telemetría | | **Queue group** | a **uno** del grupo | no | *worker pool*, reparto de carga | | **Request/Reply** | a uno, con respuesta | no | RPC, servicios | | **JetStream** | configurable + **replay** | **sí** | event sourcing, colas durables, auditoría | **Siguiente** → `03_procesos_reales.ipynb`: hasta ahora todo ha ocurrido dentro de un mismo kernel. Allí lanzamos publisher y subscribers como **procesos del sistema operativo independientes** para ver el desacople real."""), ] # ---------------------------------------------------------------------------- # Notebook 03 — Procesos del sistema operativo reales # ---------------------------------------------------------------------------- nb3 = [ ("md", """# NATS pub/sub — 03 · Procesos del sistema operativo reales En los notebooks 01 y 02 todo ocurrió dentro de un mismo kernel: varias conexiones `asyncio` simulaban procesos distintos. Eso es cómodo para explicar, pero NATS brilla precisamente cuando los participantes son **procesos del sistema operativo separados** —incluso en máquinas distintas— que solo comparten la dirección del broker y los nombres de subject. Aquí lanzamos **procesos reales** con `subprocess`: - un **publisher** (`procs/publisher.py`) que emite telemetría a `telemetria.cpu` y `telemetria.mem`; - dos **subscribers** independientes (`procs/subscriber.py`), cada uno con su propio PID: - `sub-todo` escucha `telemetria.>` (toda la telemetría), - `sub-cpu` escucha solo `telemetria.cpu`. Cada proceso abre su propia conexión al broker. El publisher **no sabe** cuántos subscribers hay ni qué escuchan: solo publica a un subject. Ese es el desacople real."""), ("md", """## 0 · Broker + scripts de los procesos Arrancamos el broker (idempotente) y mostramos el código de los dos scripts que vamos a lanzar como procesos. Cada uno es un programa autónomo que se conecta a `nats://127.0.0.1:4222` y emite eventos como líneas JSON en su stdout, que el notebook recogerá."""), ("code", ENSURE_NATS + f''' from pathlib import Path PROCS = Path(r"{PROCS_ABS}") print("Broker:", ensure_nats()) print("Scripts de proceso en:", PROCS) print() print("=== procs/publisher.py ===") print(Path(PROCS / "publisher.py").read_text())'''), ("code", '''print("=== procs/subscriber.py ===") print((PROCS / "subscriber.py").read_text())'''), ("md", """## 1 · Lanzar los procesos y orquestarlos El notebook actúa de **orquestador**: 1. Lanza los dos subscribers como procesos (`subprocess.Popen`), cada uno con su PID. Les damos 1.5 s para que conecten y se suscriban. 2. Lanza el publisher, que emite 8 mensajes y termina. 3. Espera a que los subscribers terminen solos (su `--seconds`) y recoge su stdout. Usamos `sys.executable` para que los procesos hijos usen el mismo intérprete (con `nats-py` instalado) que el kernel."""), ("code", '''import subprocess, sys, json, time def lanzar_subscriber(nombre, subjects, seconds=4.5): return subprocess.Popen( [sys.executable, str(PROCS / "subscriber.py"), "--name", nombre, "--subjects", subjects, "--seconds", str(seconds)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) # 1. Subscribers: procesos OS independientes procs = { "sub-todo": lanzar_subscriber("sub-todo", "telemetria.>"), "sub-cpu": lanzar_subscriber("sub-cpu", "telemetria.cpu"), } print("Subscribers lanzados (PIDs del SO):", {n: p.pid for n, p in procs.items()}) time.sleep(1.5) # que conecten y se suscriban antes de publicar # 2. Publisher: otro proceso OS, publica 8 mensajes y termina pub = subprocess.run( [sys.executable, str(PROCS / "publisher.py"), "--count", "8", "--interval", "0.15"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) pub_eventos = [json.loads(l) for l in pub.stdout.splitlines() if l.strip()] print(f"Publisher (PID {pub_eventos[0]['pid']}) publicó {sum(1 for e in pub_eventos if e['event']=='published')} mensajes") # 3. Recoger stdout de los subscribers (terminan solos por --seconds) eventos = [] for nombre, p in procs.items(): out, err = p.communicate(timeout=10) for l in out.splitlines(): if l.strip(): eventos.append(json.loads(l)) if err.strip(): print(f"[{nombre} stderr] {err.strip()[:200]}") msgs = [e for e in eventos if e["event"] == "msg"] print(f"\\nTotal de entregas recibidas entre todos los procesos: {len(msgs)}")'''), ("md", """## 2 · Qué recibió cada proceso Cada subscriber es un PID distinto. `sub-todo` (suscrito a `telemetria.>`) recibe los 8 mensajes; `sub-cpu` (suscrito solo a `telemetria.cpu`) recibe únicamente los 4 de CPU. El broker filtró por subject sin que el publisher supiera nada de ello."""), ("code", '''import pandas as pd df = pd.DataFrame(msgs) # PID por nombre de proceso (demuestra que son procesos distintos) pids = {e["name"]: e["pid"] for e in eventos if e["event"] == "ready"} print("PID de cada proceso subscriber:", pids) print() # Conteo de mensajes por (proceso, subject) tabla = df.groupby(["name", "subject"]).size().unstack(fill_value=0) print("Mensajes recibidos por proceso y subject:") print(tabla) tabla'''), ("code", '''import matplotlib.pyplot as plt resumen = df.groupby("name").size().reindex(["sub-todo", "sub-cpu"]).fillna(0).astype(int) fig, ax = plt.subplots(figsize=(7, 3.2)) barras = ax.bar(resumen.index, resumen.values, color=["#7c3aed", "#0891b2"]) ax.bar_label(barras, padding=3) ax.set_ylabel("mensajes recibidos") ax.set_title("Telemetría recibida por cada PROCESO (8 publicados: 4 cpu + 4 mem)") ax.set_ylim(0, 10) for i, name in enumerate(resumen.index): ax.text(i, -1.4, f"PID {pids.get(name, '?')}\\n{('telemetria.>' if name=='sub-todo' else 'telemetria.cpu')}", ha="center", va="top", fontsize=8, color="#555") plt.tight_layout(); plt.show()'''), ("md", """## 3 · Línea de tiempo de las entregas Ordenando los mensajes por su marca temporal (`t`, segundos desde que cada proceso arrancó) se ve cómo ambos subscribers reciben los mensajes de CPU casi a la vez (fan-out), mientras que los de memoria solo llegan a `sub-todo`."""), ("code", '''fig, ax = plt.subplots(figsize=(9, 3)) colores = {"telemetria.cpu": "#ef4444", "telemetria.mem": "#3b82f6"} y_de = {"sub-todo": 1, "sub-cpu": 0} for e in msgs: ax.scatter(e["t"], y_de[e["name"]], color=colores[e["subject"]], s=80, zorder=3) ax.set_yticks([0, 1]); ax.set_yticklabels(["sub-cpu", "sub-todo"]) ax.set_xlabel("t (segundos desde el arranque de cada proceso)") ax.set_title("Timeline de entregas — rojo: telemetria.cpu, azul: telemetria.mem") ax.grid(axis="x", alpha=0.3) from matplotlib.patches import Patch ax.legend(handles=[Patch(color=c, label=s) for s, c in colores.items()], loc="upper right") plt.tight_layout(); plt.show()'''), ("md", """## Resumen del análisis A lo largo de los tres notebooks hemos visto cómo distintos procesos envían datos por pub/sub con NATS: - **01** — el modelo base: publishers y subscribers desacoplados por un broker, *fan-out* y *wildcards*. - **02** — patrones de orden superior: *queue groups* (reparto de carga), *request/reply* (RPC) y *JetStream* (persistencia y replay). - **03** — **procesos del SO reales**: el desacople de verdad. El publisher no conoce a sus subscribers; el broker enruta por subject. Añadir o quitar procesos consumidores no cambia ni una línea del publisher. Esa es la idea central de NATS: **los procesos se comunican por nombres de subject, no por direcciones**, y el broker se encarga del resto. ### Limpieza (opcional) Para parar el broker cuando termines: ```python import subprocess subprocess.run(["docker", "stop", "nats_demo"]) # detener subprocess.run(["docker", "rm", "nats_demo"]) # eliminar ```"""), ] if __name__ == "__main__": build("01_core_pubsub.ipynb", nb1) build("02_queue_request_jetstream.ipynb", nb2) build("03_procesos_reales.ipynb", nb3) print("OK: 3 notebooks generados en", NBDIR)