Files
nats/build_notebooks.py
2026-06-03 19:53:43 +02:00

581 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Generador de los notebooks del analisis NATS.
Construye los tres .ipynb con nbformat (sin ejecutar). La ejecucion se hace
despues a traves del MCP de Jupyter, contra el kernel del servidor, para que
los outputs queden persistidos y visibles en JupyterLab.
Reproducible: re-ejecutar este script regenera los notebooks desde cero.
"""
from pathlib import Path
import nbformat as nbf
NBDIR = Path("/home/enmanuel/fn_registry/analysis/nats/notebooks")
PROCS_ABS = str(NBDIR / "procs")
def build(filename: str, cells: list[tuple[str, str]]) -> None:
nb = nbf.v4.new_notebook()
nb.metadata["kernelspec"] = {
"name": "python3",
"display_name": "Python 3 (ipykernel)",
"language": "python",
}
nb.cells = [
nbf.v4.new_markdown_cell(src) if typ == "md" else nbf.v4.new_code_cell(src)
for typ, src in cells
]
nbf.write(nb, str(NBDIR / filename))
print(f"escrito {filename}: {len(cells)} celdas")
# Bloque de codigo reutilizado al inicio de cada notebook para arrancar el broker.
ENSURE_NATS = '''import subprocess, time, json
NATS_CONTAINER = "nats_demo"
NATS_PORT = 4222
NATS_URL = f"nats://127.0.0.1:{NATS_PORT}"
def _docker(*args, check=True):
return subprocess.run(["docker", *args], capture_output=True, text=True, check=check)
def ensure_nats(name=NATS_CONTAINER, port=NATS_PORT):
"""Arranca un broker NATS en Docker de forma idempotente. Devuelve el estado."""
out = _docker("ps", "-a", "--filter", f"name=^{name}$", "--format", "{{.State}}", check=False).stdout.strip()
if out == "running":
state = "already-running"
elif out in ("exited", "created", "paused"):
_docker("start", name)
state = "started"
else:
_docker("run", "-d", "--name", name, "-p", f"{port}:4222", "-p", "8222:8222",
"nats:latest", "-js", "-m", "8222")
state = "created"
time.sleep(1.0)
return state'''
# ----------------------------------------------------------------------------
# Notebook 01 — Core pub/sub y wildcards
# ----------------------------------------------------------------------------
nb1 = [
("md", """# NATS pub/sub — 01 · Core publish/subscribe y wildcards
**NATS** es un sistema de mensajería ligero y de alto rendimiento orientado a la comunicación entre procesos (microservicios, IoT, pipelines de datos). Su modelo central es **publish/subscribe** sobre *subjects*.
### Conceptos clave
- **Subject**: una cadena jerárquica separada por puntos, por ejemplo `pedidos.creados` o `sensor.cocina.temp`. Es la *dirección* de un mensaje. No hay que declararlo: existe en cuanto alguien publica o se suscribe.
- **Publisher**: un proceso que envía un mensaje a un subject. No sabe quién lo va a recibir (ni si alguien lo recibirá).
- **Subscriber**: un proceso que expresa interés en uno o varios subjects. Recibe todos los mensajes publicados en ellos mientras esté conectado.
- **Broker (`nats-server`)**: el proceso central que enruta cada mensaje publicado a todos los subscribers interesados. Desacopla a publishers de subscribers: ninguno conoce la dirección de red del otro, solo el broker y el subject.
### Garantías del *core* NATS
El core es **fire-and-forget** (*at-most-once*): si en el momento de publicar no hay ningún subscriber conectado a ese subject, el mensaje se descarta. No hay persistencia ni reintentos. Esto lo hace extremadamente rápido. Cuando se necesita persistencia y *replay* se usa **JetStream** (notebook 02).
En este notebook levantamos un broker en Docker y demostramos: conexión, pub/sub básico, *fan-out* a varios subscribers y *wildcards*."""),
("md", """## 0 · Entorno: el broker NATS en Docker
Levantamos `nats:latest` con el flag `-js` (habilita JetStream, que usaremos en el notebook 02) y publicamos el puerto estándar `4222` en el host. La función `ensure_nats` es **idempotente**: si el contenedor ya existe lo reutiliza, si está parado lo arranca, y solo lo crea si no existe. Así los tres notebooks de este análisis comparten el mismo broker."""),
("code", ENSURE_NATS + '''
state = ensure_nats()
print(f"Broker NATS: {state} en {NATS_URL}")
# El endpoint de monitorización (puerto 8222) expone métricas del servidor en JSON
import urllib.request
varz = json.loads(urllib.request.urlopen("http://127.0.0.1:8222/varz", timeout=3).read())
print(f"nats-server version: {varz['version']} | jetstream activo: {bool(varz.get('jetstream'))}")'''),
("md", """## 1 · Conectar un cliente
Usamos `nats-py`, el cliente oficial para Python basado en `asyncio`. IPython permite usar `await` directamente en las celdas (*top-level await*), así que mantenemos la conexión `nc` viva en una variable global y la reutilizamos a lo largo del notebook, igual que haría un proceso real."""),
("code", '''import asyncio
import nats
# Conexión persistente del notebook (simula el cliente de un proceso de larga vida)
nc = await nats.connect(NATS_URL, name="notebook-01")
info = nc._server_info # metadata que el broker envía en el handshake
print("Conectado.")
print(f" server_id : {info['server_id']}")
print(f" max_payload: {info['max_payload']/1024/1024:.0f} MiB por mensaje")
print(f" client_id : {info['client_id']}")'''),
("md", """## 2 · Publish/Subscribe básico
El patrón mínimo: un subscriber declara interés en un subject, un publisher envía un mensaje, el broker lo entrega.
Aquí usamos una **suscripción síncrona** (`sub.next_msg()`), cómoda para ir paso a paso en un notebook: pedimos explícitamente el siguiente mensaje. El payload siempre viaja como `bytes` — NATS no impone formato; aquí codificamos texto UTF-8."""),
("code", '''# El subscriber declara interés ANTES de que se publique (core = at-most-once)
sub = await nc.subscribe("saludos")
# El publisher envía. No sabe quién escucha; solo conoce el subject.
await nc.publish("saludos", b"hola desde el publisher")
await nc.flush() # fuerza el envío al broker antes de seguir
# El subscriber recoge el mensaje
msg = await sub.next_msg(timeout=2)
print(f"subject recibido : {msg.subject}")
print(f"payload : {msg.data.decode()}")
await sub.unsubscribe()'''),
("md", """## 3 · Fan-out: un publisher, N subscribers
La potencia del pub/sub es el **fan-out**: cuando varios subscribers están interesados en el mismo subject, el broker entrega una **copia a cada uno**. El publisher hace *una* llamada `publish` y no cambia nada en su código aunque haya 1 o 1000 subscribers.
Aquí registramos 3 subscribers al subject `noticias` mediante *callbacks* asíncronos (cada uno simula un proceso distinto) y publicamos un único mensaje."""),
("code", '''recibidos = [] # registro de quién recibió qué
def make_handler(nombre):
async def handler(msg):
recibidos.append({"subscriber": nombre, "subject": msg.subject, "data": msg.data.decode()})
return handler
# Tres subscribers independientes al MISMO subject
subs = []
for nombre in ["sub-A", "sub-B", "sub-C"]:
s = await nc.subscribe("noticias", cb=make_handler(nombre))
subs.append(s)
# Un único publish...
await nc.publish("noticias", b"titular: NATS entrega a todos")
await nc.flush()
await asyncio.sleep(0.3) # dar tiempo a los callbacks
for r in recibidos:
print(f"{r['subscriber']} <- [{r['subject']}] {r['data']}")
print()
print(f"Un publish -> {len(recibidos)} entregas (fan-out)")
for s in subs:
await s.unsubscribe()'''),
("md", """## 4 · Wildcards: `*` y `>`
Los subjects son jerárquicos y los subscribers pueden usar comodines para suscribirse a familias enteras de subjects:
- **`*`** (asterisco) — comodín de **un único token**. `sensor.*.temp` casa con `sensor.cocina.temp` y `sensor.salon.temp`, pero **no** con `sensor.cocina.planta1.temp`.
- **`>`** (mayor que) — comodín de **uno o más tokens** hasta el final. `sensor.>` casa con `sensor.cocina.temp`, `sensor.salon.humedad`, `sensor.garaje.puerta.estado`...
Esto permite que un proceso se suscriba a "todo lo de los sensores" o "la temperatura de cualquier habitación" sin conocer de antemano qué subjects concretos existirán."""),
("code", '''wild = []
async def on_star(msg):
wild.append({"patron": "sensor.*.temp", "subject": msg.subject, "data": msg.data.decode()})
async def on_gt(msg):
wild.append({"patron": "sensor.>", "subject": msg.subject, "data": msg.data.decode()})
s_star = await nc.subscribe("sensor.*.temp", cb=on_star)
s_gt = await nc.subscribe("sensor.>", cb=on_gt)
# Publicamos en varios subjects concretos
publicados = {
"sensor.cocina.temp": "21.5",
"sensor.salon.temp": "22.1",
"sensor.salon.humedad": "48",
"sensor.garaje.puerta.estado": "abierta",
}
for subj, val in publicados.items():
await nc.publish(subj, val.encode())
await nc.flush()
await asyncio.sleep(0.3)
print("Mensajes publicados:", list(publicados))
print()
for w in wild:
print(f"[{w['patron']:14}] casó {w['subject']}")
await s_star.unsubscribe(); await s_gt.unsubscribe()'''),
("md", """## 5 · Visualización: qué patrón casó qué subject
Una matriz `subject × patrón` deja claro el alcance de cada comodín: `sensor.>` captura los cuatro subjects; `sensor.*.temp` solo las dos temperaturas de un nivel (no la humedad, que no es `temp`, ni la del garaje, que tiene un token de más)."""),
("code", '''import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
patrones = ["sensor.*.temp", "sensor.>"]
subjects = list(publicados)
# Construir matriz de coincidencias a partir de lo que recibió cada suscripción
M = np.zeros((len(patrones), len(subjects)), dtype=int)
for w in wild:
i = patrones.index(w["patron"])
j = subjects.index(w["subject"])
M[i, j] = 1
fig, ax = plt.subplots(figsize=(9, 2.6))
ax.imshow(M, cmap="Greens", vmin=0, vmax=1, aspect="auto")
ax.set_xticks(range(len(subjects))); ax.set_xticklabels(subjects, rotation=25, ha="right", fontsize=9)
ax.set_yticks(range(len(patrones))); ax.set_yticklabels(patrones, fontsize=10)
for i in range(len(patrones)):
for j in range(len(subjects)):
ax.text(j, i, "OK" if M[i, j] else "-", ha="center", va="center",
color="white" if M[i, j] else "#999", fontsize=12)
ax.set_title("Coincidencia de wildcards (OK = el subscriber recibió el mensaje)")
plt.tight_layout(); plt.show()
pd.DataFrame(M, index=patrones, columns=subjects)'''),
("md", """## Resumen
- El **broker** desacopla publishers y subscribers: solo comparten el *subject*.
- El core es **fire-and-forget**: sin subscriber conectado, el mensaje se pierde.
- **Fan-out** automático: una publicación llega a todos los subscribers interesados.
- **Wildcards** `*` (un token) y `>` (resto de la jerarquía) permiten suscribirse a familias de subjects.
**Siguiente** → `02_queue_request_jetstream.ipynb`: repartir carga entre workers (*queue groups*), RPC (*request/reply*) y mensajería **persistente** con JetStream.
> La conexión `nc` y el contenedor `nats_demo` siguen vivos para los siguientes notebooks."""),
]
# ----------------------------------------------------------------------------
# Notebook 02 — Queue groups, Request/Reply, JetStream
# ----------------------------------------------------------------------------
nb2 = [
("md", """# NATS pub/sub — 02 · Queue groups, Request/Reply y JetStream
En el notebook 01 vimos el *fan-out* del core: una publicación llega a **todos** los subscribers. Aquí cubrimos tres patrones que construyen sobre esa base:
1. **Queue groups** — repartir la carga: varios *workers* comparten el trabajo y cada mensaje lo procesa **uno solo**.
2. **Request/Reply** — RPC sobre mensajería: un cliente pregunta y espera la respuesta de un servicio.
3. **JetStream** — la capa de **persistencia**: streams que almacenan los mensajes y permiten *replay*, a diferencia del core *fire-and-forget*.
> Requiere el broker `nats_demo` del notebook 01. La primera celda lo arranca de forma idempotente, así que este notebook también funciona de forma aislada."""),
("md", """## 0 · Setup: broker + conexión
Reutilizamos el mismo broker en Docker. `ensure_nats` es idempotente; si el contenedor sigue vivo del notebook 01, simplemente se reaprovecha."""),
("code", ENSURE_NATS + '''
import asyncio
import nats
print("Broker:", ensure_nats())
nc = await nats.connect(NATS_URL, name="notebook-02")
print("Conectado, client_id:", nc._server_info["client_id"])'''),
("md", """## 1 · Queue groups — reparto de carga entre workers
Un **queue group** convierte el fan-out en una **cola de trabajo**. Varios subscribers se suscriben al mismo subject pero declarando el mismo nombre de *queue*. El broker entonces entrega cada mensaje a **exactamente uno** de los miembros del grupo (balanceo de carga), en lugar de a todos.
Es el patrón de los *worker pools*: escalas el procesamiento añadiendo más workers al grupo, sin tocar al publisher. Si un worker cae, los demás siguen recibiendo. Aquí lanzamos 3 workers en el queue `procesadores` y publicamos 12 tareas."""),
("code", '''from collections import Counter
trabajo = Counter() # cuántas tareas procesó cada worker
orden = [] # traza temporal (worker, tarea)
def make_worker(nombre):
async def worker(msg):
tarea = msg.data.decode()
trabajo[nombre] += 1
orden.append((nombre, tarea))
return worker
# 3 workers, MISMO subject, MISMO queue group -> NATS reparte
workers = []
for nombre in ["worker-1", "worker-2", "worker-3"]:
s = await nc.subscribe("tareas", queue="procesadores", cb=make_worker(nombre))
workers.append(s)
# Publicar 12 tareas
for i in range(12):
await nc.publish("tareas", f"tarea-{i:02d}".encode())
await nc.flush()
await asyncio.sleep(0.5)
print("Reparto de carga (cada tarea fue a UN solo worker):")
for w, n in sorted(trabajo.items()):
print(f" {w}: {n} tareas")
print(f" TOTAL procesado: {sum(trabajo.values())} de 12 tareas")
for s in workers:
await s.unsubscribe()'''),
("md", """### Visualización del reparto
El total siempre suma 12 (ninguna tarea se duplica ni se pierde), repartido de forma aproximadamente equilibrada entre los workers."""),
("code", '''import matplotlib.pyplot as plt
nombres = [f"worker-{i}" for i in (1, 2, 3)]
valores = [trabajo.get(n, 0) for n in nombres]
fig, ax = plt.subplots(figsize=(7, 3))
barras = ax.bar(nombres, valores, color=["#2563eb", "#16a34a", "#db2777"])
ax.bar_label(barras, padding=3)
ax.set_ylabel("tareas procesadas")
ax.set_title(f"Queue group 'procesadores' — 12 tareas repartidas entre {len(nombres)} workers")
ax.set_ylim(0, max(valores) + 2)
plt.tight_layout(); plt.show()'''),
("md", """## 2 · Request/Reply — RPC sobre NATS
NATS implementa **petición/respuesta** sobre el mismo modelo pub/sub. El cliente usa `nc.request(subject, datos)`: por debajo, NATS crea un subject de respuesta temporal único (un *inbox*), lo adjunta al mensaje y espera la primera respuesta que llegue a ese inbox.
El servicio se suscribe al subject, procesa, y responde con `msg.respond(datos)`. Esto da RPC con descubrimiento automático (el cliente no conoce la dirección del servicio, solo el subject) y *timeouts* integrados. Si varios servicios escuchan en un queue group, además se balancea la carga de las peticiones."""),
("code", '''import nats.errors
# Servicio: convierte el texto recibido a mayúsculas y responde
async def servicio_mayusculas(msg):
respuesta = msg.data.decode().upper()
await msg.respond(respuesta.encode())
sub_svc = await nc.subscribe("servicio.mayusculas", cb=servicio_mayusculas)
# Cliente: pide y espera respuesta (con timeout)
peticiones = ["hola mundo", "nats request reply", "desacople total"]
for texto in peticiones:
resp = await nc.request("servicio.mayusculas", texto.encode(), timeout=1.0)
print(f" peticion : {texto!r}")
print(f" respuesta: {resp.data.decode()!r} (vino por el inbox {resp.subject})")
print()
# ¿Qué pasa si nadie escucha el subject? El broker avisa al instante con
# NoRespondersError (no hace falta esperar al timeout completo).
try:
await nc.request("servicio.inexistente", b"hay alguien?", timeout=0.5)
except nats.errors.NoRespondersError:
print("servicio.inexistente -> NoRespondersError: el broker confirma que no hay ningún servicio escuchando")
except (nats.errors.TimeoutError, asyncio.TimeoutError):
print("servicio.inexistente -> TimeoutError: nadie respondió a tiempo")
await sub_svc.unsubscribe()'''),
("md", """## 3 · JetStream — persistencia y replay
Todo lo anterior es **efímero**: si no hay nadie escuchando en el instante exacto de la publicación, el mensaje se pierde. **JetStream** añade una capa de almacenamiento:
- Un **stream** captura y persiste todos los mensajes de unos subjects dados.
- Los **consumers** leen del stream a su ritmo, pueden ser **durables** (recuerdan por dónde iban) y permiten **replay** de mensajes históricos.
Demostramos la diferencia clave con el core: publicamos a un stream **sin ningún consumidor activo** y, *después*, creamos un consumidor que recupera todos esos mensajes."""),
("code", '''js = nc.jetstream()
# Crear (o recrear limpio) un stream que persiste todo lo de 'eventos.>'
try:
await js.delete_stream("EVENTOS")
except Exception:
pass
stream = await js.add_stream(name="EVENTOS", subjects=["eventos.>"])
print(f"Stream creado: {stream.config.name} subjects={stream.config.subjects} storage={stream.config.storage}")
# Publicar 5 eventos SIN que haya ningún consumidor escuchando todavía
for i in range(5):
ack = await js.publish("eventos.click", f"evento #{i}".encode())
print(f" publicado eventos.click -> stream='{ack.stream}' seq={ack.seq}")
estado = (await js.stream_info("EVENTOS")).state
print()
print(f"Mensajes retenidos en el stream: {estado.messages} (siguen ahí aunque nadie los haya leído)")'''),
("md", """### Replay: leer los mensajes históricos
Ahora creamos un **consumer durable** (`lector-eventos`) y hacemos *fetch*. Recuperamos los 5 mensajes que se publicaron **antes** de que este consumidor existiera — algo imposible con el core NATS."""),
("code", '''# Pull consumer durable: pedimos los mensajes bajo demanda
psub = await js.pull_subscribe("eventos.>", durable="lector-eventos")
mensajes = await psub.fetch(5, timeout=2)
print(f"Recuperados {len(mensajes)} mensajes del stream (replay):")
for m in mensajes:
print(f" seq={m.metadata.sequence.stream} subject={m.subject} data={m.data.decode()!r}")
await m.ack() # confirmamos el procesado; el durable avanza su cursor
# Tras el ack, un segundo fetch no devuelve nada nuevo (cursor avanzado)
try:
extra = await psub.fetch(1, timeout=1)
except Exception:
extra = []
print()
print(f"Segundo fetch tras ack: {len(extra)} mensajes (el durable recuerda que ya los leyó)")'''),
("md", """## Resumen
| Patrón | Entrega | Persistencia | Caso de uso |
|---|---|---|---|
| Core pub/sub (nb 01) | a **todos** los subscribers | no (at-most-once) | eventos en vivo, telemetría |
| **Queue group** | a **uno** del grupo | no | *worker pool*, reparto de carga |
| **Request/Reply** | a uno, con respuesta | no | RPC, servicios |
| **JetStream** | configurable + **replay** | **sí** | event sourcing, colas durables, auditoría |
**Siguiente** → `03_procesos_reales.ipynb`: hasta ahora todo ha ocurrido dentro de un mismo kernel. Allí lanzamos publisher y subscribers como **procesos del sistema operativo independientes** para ver el desacople real."""),
]
# ----------------------------------------------------------------------------
# Notebook 03 — Procesos del sistema operativo reales
# ----------------------------------------------------------------------------
nb3 = [
("md", """# NATS pub/sub — 03 · Procesos del sistema operativo reales
En los notebooks 01 y 02 todo ocurrió dentro de un mismo kernel: varias conexiones `asyncio` simulaban procesos distintos. Eso es cómodo para explicar, pero NATS brilla precisamente cuando los participantes son **procesos del sistema operativo separados** —incluso en máquinas distintas— que solo comparten la dirección del broker y los nombres de subject.
Aquí lanzamos **procesos reales** con `subprocess`:
- un **publisher** (`procs/publisher.py`) que emite telemetría a `telemetria.cpu` y `telemetria.mem`;
- dos **subscribers** independientes (`procs/subscriber.py`), cada uno con su propio PID:
- `sub-todo` escucha `telemetria.>` (toda la telemetría),
- `sub-cpu` escucha solo `telemetria.cpu`.
Cada proceso abre su propia conexión al broker. El publisher **no sabe** cuántos subscribers hay ni qué escuchan: solo publica a un subject. Ese es el desacople real."""),
("md", """## 0 · Broker + scripts de los procesos
Arrancamos el broker (idempotente) y mostramos el código de los dos scripts que vamos a lanzar como procesos. Cada uno es un programa autónomo que se conecta a `nats://127.0.0.1:4222` y emite eventos como líneas JSON en su stdout, que el notebook recogerá."""),
("code", ENSURE_NATS + f'''
from pathlib import Path
PROCS = Path(r"{PROCS_ABS}")
print("Broker:", ensure_nats())
print("Scripts de proceso en:", PROCS)
print()
print("=== procs/publisher.py ===")
print(Path(PROCS / "publisher.py").read_text())'''),
("code", '''print("=== procs/subscriber.py ===")
print((PROCS / "subscriber.py").read_text())'''),
("md", """## 1 · Lanzar los procesos y orquestarlos
El notebook actúa de **orquestador**:
1. Lanza los dos subscribers como procesos (`subprocess.Popen`), cada uno con su PID. Les damos 1.5 s para que conecten y se suscriban.
2. Lanza el publisher, que emite 8 mensajes y termina.
3. Espera a que los subscribers terminen solos (su `--seconds`) y recoge su stdout.
Usamos `sys.executable` para que los procesos hijos usen el mismo intérprete (con `nats-py` instalado) que el kernel."""),
("code", '''import subprocess, sys, json, time
def lanzar_subscriber(nombre, subjects, seconds=4.5):
return subprocess.Popen(
[sys.executable, str(PROCS / "subscriber.py"),
"--name", nombre, "--subjects", subjects, "--seconds", str(seconds)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
)
# 1. Subscribers: procesos OS independientes
procs = {
"sub-todo": lanzar_subscriber("sub-todo", "telemetria.>"),
"sub-cpu": lanzar_subscriber("sub-cpu", "telemetria.cpu"),
}
print("Subscribers lanzados (PIDs del SO):", {n: p.pid for n, p in procs.items()})
time.sleep(1.5) # que conecten y se suscriban antes de publicar
# 2. Publisher: otro proceso OS, publica 8 mensajes y termina
pub = subprocess.run(
[sys.executable, str(PROCS / "publisher.py"), "--count", "8", "--interval", "0.15"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
)
pub_eventos = [json.loads(l) for l in pub.stdout.splitlines() if l.strip()]
print(f"Publisher (PID {pub_eventos[0]['pid']}) publicó {sum(1 for e in pub_eventos if e['event']=='published')} mensajes")
# 3. Recoger stdout de los subscribers (terminan solos por --seconds)
eventos = []
for nombre, p in procs.items():
out, err = p.communicate(timeout=10)
for l in out.splitlines():
if l.strip():
eventos.append(json.loads(l))
if err.strip():
print(f"[{nombre} stderr] {err.strip()[:200]}")
msgs = [e for e in eventos if e["event"] == "msg"]
print(f"\\nTotal de entregas recibidas entre todos los procesos: {len(msgs)}")'''),
("md", """## 2 · Qué recibió cada proceso
Cada subscriber es un PID distinto. `sub-todo` (suscrito a `telemetria.>`) recibe los 8 mensajes; `sub-cpu` (suscrito solo a `telemetria.cpu`) recibe únicamente los 4 de CPU. El broker filtró por subject sin que el publisher supiera nada de ello."""),
("code", '''import pandas as pd
df = pd.DataFrame(msgs)
# PID por nombre de proceso (demuestra que son procesos distintos)
pids = {e["name"]: e["pid"] for e in eventos if e["event"] == "ready"}
print("PID de cada proceso subscriber:", pids)
print()
# Conteo de mensajes por (proceso, subject)
tabla = df.groupby(["name", "subject"]).size().unstack(fill_value=0)
print("Mensajes recibidos por proceso y subject:")
print(tabla)
tabla'''),
("code", '''import matplotlib.pyplot as plt
resumen = df.groupby("name").size().reindex(["sub-todo", "sub-cpu"]).fillna(0).astype(int)
fig, ax = plt.subplots(figsize=(7, 3.2))
barras = ax.bar(resumen.index, resumen.values, color=["#7c3aed", "#0891b2"])
ax.bar_label(barras, padding=3)
ax.set_ylabel("mensajes recibidos")
ax.set_title("Telemetría recibida por cada PROCESO (8 publicados: 4 cpu + 4 mem)")
ax.set_ylim(0, 10)
for i, name in enumerate(resumen.index):
ax.text(i, -1.4, f"PID {pids.get(name, '?')}\\n{('telemetria.>' if name=='sub-todo' else 'telemetria.cpu')}",
ha="center", va="top", fontsize=8, color="#555")
plt.tight_layout(); plt.show()'''),
("md", """## 3 · Línea de tiempo de las entregas
Ordenando los mensajes por su marca temporal (`t`, segundos desde que cada proceso arrancó) se ve cómo ambos subscribers reciben los mensajes de CPU casi a la vez (fan-out), mientras que los de memoria solo llegan a `sub-todo`."""),
("code", '''fig, ax = plt.subplots(figsize=(9, 3))
colores = {"telemetria.cpu": "#ef4444", "telemetria.mem": "#3b82f6"}
y_de = {"sub-todo": 1, "sub-cpu": 0}
for e in msgs:
ax.scatter(e["t"], y_de[e["name"]], color=colores[e["subject"]], s=80, zorder=3)
ax.set_yticks([0, 1]); ax.set_yticklabels(["sub-cpu", "sub-todo"])
ax.set_xlabel("t (segundos desde el arranque de cada proceso)")
ax.set_title("Timeline de entregas — rojo: telemetria.cpu, azul: telemetria.mem")
ax.grid(axis="x", alpha=0.3)
from matplotlib.patches import Patch
ax.legend(handles=[Patch(color=c, label=s) for s, c in colores.items()], loc="upper right")
plt.tight_layout(); plt.show()'''),
("md", """## Resumen del análisis
A lo largo de los tres notebooks hemos visto cómo distintos procesos envían datos por pub/sub con NATS:
- **01** — el modelo base: publishers y subscribers desacoplados por un broker, *fan-out* y *wildcards*.
- **02** — patrones de orden superior: *queue groups* (reparto de carga), *request/reply* (RPC) y *JetStream* (persistencia y replay).
- **03** — **procesos del SO reales**: el desacople de verdad. El publisher no conoce a sus subscribers; el broker enruta por subject. Añadir o quitar procesos consumidores no cambia ni una línea del publisher.
Esa es la idea central de NATS: **los procesos se comunican por nombres de subject, no por direcciones**, y el broker se encarga del resto.
### Limpieza (opcional)
Para parar el broker cuando termines:
```python
import subprocess
subprocess.run(["docker", "stop", "nats_demo"]) # detener
subprocess.run(["docker", "rm", "nats_demo"]) # eliminar
```"""),
]
if __name__ == "__main__":
build("01_core_pubsub.ipynb", nb1)
build("02_queue_request_jetstream.ipynb", nb2)
build("03_procesos_reales.ipynb", nb3)
print("OK: 3 notebooks generados en", NBDIR)