581 lines
26 KiB
Python
581 lines
26 KiB
Python
#!/usr/bin/env python3
|
||
"""Generador de los notebooks del analisis NATS.
|
||
|
||
Construye los tres .ipynb con nbformat (sin ejecutar). La ejecucion se hace
|
||
despues a traves del MCP de Jupyter, contra el kernel del servidor, para que
|
||
los outputs queden persistidos y visibles en JupyterLab.
|
||
|
||
Reproducible: re-ejecutar este script regenera los notebooks desde cero.
|
||
"""
|
||
from pathlib import Path
|
||
|
||
import nbformat as nbf
|
||
|
||
NBDIR = Path("/home/enmanuel/fn_registry/analysis/nats/notebooks")
|
||
PROCS_ABS = str(NBDIR / "procs")
|
||
|
||
|
||
def build(filename: str, cells: list[tuple[str, str]]) -> None:
|
||
nb = nbf.v4.new_notebook()
|
||
nb.metadata["kernelspec"] = {
|
||
"name": "python3",
|
||
"display_name": "Python 3 (ipykernel)",
|
||
"language": "python",
|
||
}
|
||
nb.cells = [
|
||
nbf.v4.new_markdown_cell(src) if typ == "md" else nbf.v4.new_code_cell(src)
|
||
for typ, src in cells
|
||
]
|
||
nbf.write(nb, str(NBDIR / filename))
|
||
print(f"escrito {filename}: {len(cells)} celdas")
|
||
|
||
|
||
# Bloque de codigo reutilizado al inicio de cada notebook para arrancar el broker.
|
||
ENSURE_NATS = '''import subprocess, time, json
|
||
|
||
NATS_CONTAINER = "nats_demo"
|
||
NATS_PORT = 4222
|
||
NATS_URL = f"nats://127.0.0.1:{NATS_PORT}"
|
||
|
||
def _docker(*args, check=True):
|
||
return subprocess.run(["docker", *args], capture_output=True, text=True, check=check)
|
||
|
||
def ensure_nats(name=NATS_CONTAINER, port=NATS_PORT):
|
||
"""Arranca un broker NATS en Docker de forma idempotente. Devuelve el estado."""
|
||
out = _docker("ps", "-a", "--filter", f"name=^{name}$", "--format", "{{.State}}", check=False).stdout.strip()
|
||
if out == "running":
|
||
state = "already-running"
|
||
elif out in ("exited", "created", "paused"):
|
||
_docker("start", name)
|
||
state = "started"
|
||
else:
|
||
_docker("run", "-d", "--name", name, "-p", f"{port}:4222", "-p", "8222:8222",
|
||
"nats:latest", "-js", "-m", "8222")
|
||
state = "created"
|
||
time.sleep(1.0)
|
||
return state'''
|
||
|
||
|
||
# ----------------------------------------------------------------------------
|
||
# Notebook 01 — Core pub/sub y wildcards
|
||
# ----------------------------------------------------------------------------
|
||
nb1 = [
|
||
("md", """# NATS pub/sub — 01 · Core publish/subscribe y wildcards
|
||
|
||
**NATS** es un sistema de mensajería ligero y de alto rendimiento orientado a la comunicación entre procesos (microservicios, IoT, pipelines de datos). Su modelo central es **publish/subscribe** sobre *subjects*.
|
||
|
||
### Conceptos clave
|
||
|
||
- **Subject**: una cadena jerárquica separada por puntos, por ejemplo `pedidos.creados` o `sensor.cocina.temp`. Es la *dirección* de un mensaje. No hay que declararlo: existe en cuanto alguien publica o se suscribe.
|
||
- **Publisher**: un proceso que envía un mensaje a un subject. No sabe quién lo va a recibir (ni si alguien lo recibirá).
|
||
- **Subscriber**: un proceso que expresa interés en uno o varios subjects. Recibe todos los mensajes publicados en ellos mientras esté conectado.
|
||
- **Broker (`nats-server`)**: el proceso central que enruta cada mensaje publicado a todos los subscribers interesados. Desacopla a publishers de subscribers: ninguno conoce la dirección de red del otro, solo el broker y el subject.
|
||
|
||
### Garantías del *core* NATS
|
||
|
||
El core es **fire-and-forget** (*at-most-once*): si en el momento de publicar no hay ningún subscriber conectado a ese subject, el mensaje se descarta. No hay persistencia ni reintentos. Esto lo hace extremadamente rápido. Cuando se necesita persistencia y *replay* se usa **JetStream** (notebook 02).
|
||
|
||
En este notebook levantamos un broker en Docker y demostramos: conexión, pub/sub básico, *fan-out* a varios subscribers y *wildcards*."""),
|
||
|
||
("md", """## 0 · Entorno: el broker NATS en Docker
|
||
|
||
Levantamos `nats:latest` con el flag `-js` (habilita JetStream, que usaremos en el notebook 02) y publicamos el puerto estándar `4222` en el host. La función `ensure_nats` es **idempotente**: si el contenedor ya existe lo reutiliza, si está parado lo arranca, y solo lo crea si no existe. Así los tres notebooks de este análisis comparten el mismo broker."""),
|
||
|
||
("code", ENSURE_NATS + '''
|
||
|
||
state = ensure_nats()
|
||
print(f"Broker NATS: {state} en {NATS_URL}")
|
||
# El endpoint de monitorización (puerto 8222) expone métricas del servidor en JSON
|
||
import urllib.request
|
||
varz = json.loads(urllib.request.urlopen("http://127.0.0.1:8222/varz", timeout=3).read())
|
||
print(f"nats-server version: {varz['version']} | jetstream activo: {bool(varz.get('jetstream'))}")'''),
|
||
|
||
("md", """## 1 · Conectar un cliente
|
||
|
||
Usamos `nats-py`, el cliente oficial para Python basado en `asyncio`. IPython permite usar `await` directamente en las celdas (*top-level await*), así que mantenemos la conexión `nc` viva en una variable global y la reutilizamos a lo largo del notebook, igual que haría un proceso real."""),
|
||
|
||
("code", '''import asyncio
|
||
import nats
|
||
|
||
# Conexión persistente del notebook (simula el cliente de un proceso de larga vida)
|
||
nc = await nats.connect(NATS_URL, name="notebook-01")
|
||
info = nc._server_info # metadata que el broker envía en el handshake
|
||
print("Conectado.")
|
||
print(f" server_id : {info['server_id']}")
|
||
print(f" max_payload: {info['max_payload']/1024/1024:.0f} MiB por mensaje")
|
||
print(f" client_id : {info['client_id']}")'''),
|
||
|
||
("md", """## 2 · Publish/Subscribe básico
|
||
|
||
El patrón mínimo: un subscriber declara interés en un subject, un publisher envía un mensaje, el broker lo entrega.
|
||
|
||
Aquí usamos una **suscripción síncrona** (`sub.next_msg()`), cómoda para ir paso a paso en un notebook: pedimos explícitamente el siguiente mensaje. El payload siempre viaja como `bytes` — NATS no impone formato; aquí codificamos texto UTF-8."""),
|
||
|
||
("code", '''# El subscriber declara interés ANTES de que se publique (core = at-most-once)
|
||
sub = await nc.subscribe("saludos")
|
||
|
||
# El publisher envía. No sabe quién escucha; solo conoce el subject.
|
||
await nc.publish("saludos", b"hola desde el publisher")
|
||
await nc.flush() # fuerza el envío al broker antes de seguir
|
||
|
||
# El subscriber recoge el mensaje
|
||
msg = await sub.next_msg(timeout=2)
|
||
print(f"subject recibido : {msg.subject}")
|
||
print(f"payload : {msg.data.decode()}")
|
||
|
||
await sub.unsubscribe()'''),
|
||
|
||
("md", """## 3 · Fan-out: un publisher, N subscribers
|
||
|
||
La potencia del pub/sub es el **fan-out**: cuando varios subscribers están interesados en el mismo subject, el broker entrega una **copia a cada uno**. El publisher hace *una* llamada `publish` y no cambia nada en su código aunque haya 1 o 1000 subscribers.
|
||
|
||
Aquí registramos 3 subscribers al subject `noticias` mediante *callbacks* asíncronos (cada uno simula un proceso distinto) y publicamos un único mensaje."""),
|
||
|
||
("code", '''recibidos = [] # registro de quién recibió qué
|
||
|
||
def make_handler(nombre):
|
||
async def handler(msg):
|
||
recibidos.append({"subscriber": nombre, "subject": msg.subject, "data": msg.data.decode()})
|
||
return handler
|
||
|
||
# Tres subscribers independientes al MISMO subject
|
||
subs = []
|
||
for nombre in ["sub-A", "sub-B", "sub-C"]:
|
||
s = await nc.subscribe("noticias", cb=make_handler(nombre))
|
||
subs.append(s)
|
||
|
||
# Un único publish...
|
||
await nc.publish("noticias", b"titular: NATS entrega a todos")
|
||
await nc.flush()
|
||
await asyncio.sleep(0.3) # dar tiempo a los callbacks
|
||
|
||
for r in recibidos:
|
||
print(f"{r['subscriber']} <- [{r['subject']}] {r['data']}")
|
||
print()
|
||
print(f"Un publish -> {len(recibidos)} entregas (fan-out)")
|
||
|
||
for s in subs:
|
||
await s.unsubscribe()'''),
|
||
|
||
("md", """## 4 · Wildcards: `*` y `>`
|
||
|
||
Los subjects son jerárquicos y los subscribers pueden usar comodines para suscribirse a familias enteras de subjects:
|
||
|
||
- **`*`** (asterisco) — comodín de **un único token**. `sensor.*.temp` casa con `sensor.cocina.temp` y `sensor.salon.temp`, pero **no** con `sensor.cocina.planta1.temp`.
|
||
- **`>`** (mayor que) — comodín de **uno o más tokens** hasta el final. `sensor.>` casa con `sensor.cocina.temp`, `sensor.salon.humedad`, `sensor.garaje.puerta.estado`...
|
||
|
||
Esto permite que un proceso se suscriba a "todo lo de los sensores" o "la temperatura de cualquier habitación" sin conocer de antemano qué subjects concretos existirán."""),
|
||
|
||
("code", '''wild = []
|
||
|
||
async def on_star(msg):
|
||
wild.append({"patron": "sensor.*.temp", "subject": msg.subject, "data": msg.data.decode()})
|
||
|
||
async def on_gt(msg):
|
||
wild.append({"patron": "sensor.>", "subject": msg.subject, "data": msg.data.decode()})
|
||
|
||
s_star = await nc.subscribe("sensor.*.temp", cb=on_star)
|
||
s_gt = await nc.subscribe("sensor.>", cb=on_gt)
|
||
|
||
# Publicamos en varios subjects concretos
|
||
publicados = {
|
||
"sensor.cocina.temp": "21.5",
|
||
"sensor.salon.temp": "22.1",
|
||
"sensor.salon.humedad": "48",
|
||
"sensor.garaje.puerta.estado": "abierta",
|
||
}
|
||
for subj, val in publicados.items():
|
||
await nc.publish(subj, val.encode())
|
||
await nc.flush()
|
||
await asyncio.sleep(0.3)
|
||
|
||
print("Mensajes publicados:", list(publicados))
|
||
print()
|
||
for w in wild:
|
||
print(f"[{w['patron']:14}] casó {w['subject']}")
|
||
|
||
await s_star.unsubscribe(); await s_gt.unsubscribe()'''),
|
||
|
||
("md", """## 5 · Visualización: qué patrón casó qué subject
|
||
|
||
Una matriz `subject × patrón` deja claro el alcance de cada comodín: `sensor.>` captura los cuatro subjects; `sensor.*.temp` solo las dos temperaturas de un nivel (no la humedad, que no es `temp`, ni la del garaje, que tiene un token de más)."""),
|
||
|
||
("code", '''import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
import numpy as np
|
||
|
||
patrones = ["sensor.*.temp", "sensor.>"]
|
||
subjects = list(publicados)
|
||
|
||
# Construir matriz de coincidencias a partir de lo que recibió cada suscripción
|
||
M = np.zeros((len(patrones), len(subjects)), dtype=int)
|
||
for w in wild:
|
||
i = patrones.index(w["patron"])
|
||
j = subjects.index(w["subject"])
|
||
M[i, j] = 1
|
||
|
||
fig, ax = plt.subplots(figsize=(9, 2.6))
|
||
ax.imshow(M, cmap="Greens", vmin=0, vmax=1, aspect="auto")
|
||
ax.set_xticks(range(len(subjects))); ax.set_xticklabels(subjects, rotation=25, ha="right", fontsize=9)
|
||
ax.set_yticks(range(len(patrones))); ax.set_yticklabels(patrones, fontsize=10)
|
||
for i in range(len(patrones)):
|
||
for j in range(len(subjects)):
|
||
ax.text(j, i, "OK" if M[i, j] else "-", ha="center", va="center",
|
||
color="white" if M[i, j] else "#999", fontsize=12)
|
||
ax.set_title("Coincidencia de wildcards (OK = el subscriber recibió el mensaje)")
|
||
plt.tight_layout(); plt.show()
|
||
|
||
pd.DataFrame(M, index=patrones, columns=subjects)'''),
|
||
|
||
("md", """## Resumen
|
||
|
||
- El **broker** desacopla publishers y subscribers: solo comparten el *subject*.
|
||
- El core es **fire-and-forget**: sin subscriber conectado, el mensaje se pierde.
|
||
- **Fan-out** automático: una publicación llega a todos los subscribers interesados.
|
||
- **Wildcards** `*` (un token) y `>` (resto de la jerarquía) permiten suscribirse a familias de subjects.
|
||
|
||
**Siguiente** → `02_queue_request_jetstream.ipynb`: repartir carga entre workers (*queue groups*), RPC (*request/reply*) y mensajería **persistente** con JetStream.
|
||
|
||
> La conexión `nc` y el contenedor `nats_demo` siguen vivos para los siguientes notebooks."""),
|
||
]
|
||
|
||
|
||
# ----------------------------------------------------------------------------
|
||
# Notebook 02 — Queue groups, Request/Reply, JetStream
|
||
# ----------------------------------------------------------------------------
|
||
nb2 = [
|
||
("md", """# NATS pub/sub — 02 · Queue groups, Request/Reply y JetStream
|
||
|
||
En el notebook 01 vimos el *fan-out* del core: una publicación llega a **todos** los subscribers. Aquí cubrimos tres patrones que construyen sobre esa base:
|
||
|
||
1. **Queue groups** — repartir la carga: varios *workers* comparten el trabajo y cada mensaje lo procesa **uno solo**.
|
||
2. **Request/Reply** — RPC sobre mensajería: un cliente pregunta y espera la respuesta de un servicio.
|
||
3. **JetStream** — la capa de **persistencia**: streams que almacenan los mensajes y permiten *replay*, a diferencia del core *fire-and-forget*.
|
||
|
||
> Requiere el broker `nats_demo` del notebook 01. La primera celda lo arranca de forma idempotente, así que este notebook también funciona de forma aislada."""),
|
||
|
||
("md", """## 0 · Setup: broker + conexión
|
||
|
||
Reutilizamos el mismo broker en Docker. `ensure_nats` es idempotente; si el contenedor sigue vivo del notebook 01, simplemente se reaprovecha."""),
|
||
|
||
("code", ENSURE_NATS + '''
|
||
|
||
import asyncio
|
||
import nats
|
||
|
||
print("Broker:", ensure_nats())
|
||
nc = await nats.connect(NATS_URL, name="notebook-02")
|
||
print("Conectado, client_id:", nc._server_info["client_id"])'''),
|
||
|
||
("md", """## 1 · Queue groups — reparto de carga entre workers
|
||
|
||
Un **queue group** convierte el fan-out en una **cola de trabajo**. Varios subscribers se suscriben al mismo subject pero declarando el mismo nombre de *queue*. El broker entonces entrega cada mensaje a **exactamente uno** de los miembros del grupo (balanceo de carga), en lugar de a todos.
|
||
|
||
Es el patrón de los *worker pools*: escalas el procesamiento añadiendo más workers al grupo, sin tocar al publisher. Si un worker cae, los demás siguen recibiendo. Aquí lanzamos 3 workers en el queue `procesadores` y publicamos 12 tareas."""),
|
||
|
||
("code", '''from collections import Counter
|
||
|
||
trabajo = Counter() # cuántas tareas procesó cada worker
|
||
orden = [] # traza temporal (worker, tarea)
|
||
|
||
def make_worker(nombre):
|
||
async def worker(msg):
|
||
tarea = msg.data.decode()
|
||
trabajo[nombre] += 1
|
||
orden.append((nombre, tarea))
|
||
return worker
|
||
|
||
# 3 workers, MISMO subject, MISMO queue group -> NATS reparte
|
||
workers = []
|
||
for nombre in ["worker-1", "worker-2", "worker-3"]:
|
||
s = await nc.subscribe("tareas", queue="procesadores", cb=make_worker(nombre))
|
||
workers.append(s)
|
||
|
||
# Publicar 12 tareas
|
||
for i in range(12):
|
||
await nc.publish("tareas", f"tarea-{i:02d}".encode())
|
||
await nc.flush()
|
||
await asyncio.sleep(0.5)
|
||
|
||
print("Reparto de carga (cada tarea fue a UN solo worker):")
|
||
for w, n in sorted(trabajo.items()):
|
||
print(f" {w}: {n} tareas")
|
||
print(f" TOTAL procesado: {sum(trabajo.values())} de 12 tareas")
|
||
|
||
for s in workers:
|
||
await s.unsubscribe()'''),
|
||
|
||
("md", """### Visualización del reparto
|
||
|
||
El total siempre suma 12 (ninguna tarea se duplica ni se pierde), repartido de forma aproximadamente equilibrada entre los workers."""),
|
||
|
||
("code", '''import matplotlib.pyplot as plt
|
||
|
||
nombres = [f"worker-{i}" for i in (1, 2, 3)]
|
||
valores = [trabajo.get(n, 0) for n in nombres]
|
||
|
||
fig, ax = plt.subplots(figsize=(7, 3))
|
||
barras = ax.bar(nombres, valores, color=["#2563eb", "#16a34a", "#db2777"])
|
||
ax.bar_label(barras, padding=3)
|
||
ax.set_ylabel("tareas procesadas")
|
||
ax.set_title(f"Queue group 'procesadores' — 12 tareas repartidas entre {len(nombres)} workers")
|
||
ax.set_ylim(0, max(valores) + 2)
|
||
plt.tight_layout(); plt.show()'''),
|
||
|
||
("md", """## 2 · Request/Reply — RPC sobre NATS
|
||
|
||
NATS implementa **petición/respuesta** sobre el mismo modelo pub/sub. El cliente usa `nc.request(subject, datos)`: por debajo, NATS crea un subject de respuesta temporal único (un *inbox*), lo adjunta al mensaje y espera la primera respuesta que llegue a ese inbox.
|
||
|
||
El servicio se suscribe al subject, procesa, y responde con `msg.respond(datos)`. Esto da RPC con descubrimiento automático (el cliente no conoce la dirección del servicio, solo el subject) y *timeouts* integrados. Si varios servicios escuchan en un queue group, además se balancea la carga de las peticiones."""),
|
||
|
||
("code", '''import nats.errors
|
||
|
||
# Servicio: convierte el texto recibido a mayúsculas y responde
|
||
async def servicio_mayusculas(msg):
|
||
respuesta = msg.data.decode().upper()
|
||
await msg.respond(respuesta.encode())
|
||
|
||
sub_svc = await nc.subscribe("servicio.mayusculas", cb=servicio_mayusculas)
|
||
|
||
# Cliente: pide y espera respuesta (con timeout)
|
||
peticiones = ["hola mundo", "nats request reply", "desacople total"]
|
||
for texto in peticiones:
|
||
resp = await nc.request("servicio.mayusculas", texto.encode(), timeout=1.0)
|
||
print(f" peticion : {texto!r}")
|
||
print(f" respuesta: {resp.data.decode()!r} (vino por el inbox {resp.subject})")
|
||
print()
|
||
|
||
# ¿Qué pasa si nadie escucha el subject? El broker avisa al instante con
|
||
# NoRespondersError (no hace falta esperar al timeout completo).
|
||
try:
|
||
await nc.request("servicio.inexistente", b"hay alguien?", timeout=0.5)
|
||
except nats.errors.NoRespondersError:
|
||
print("servicio.inexistente -> NoRespondersError: el broker confirma que no hay ningún servicio escuchando")
|
||
except (nats.errors.TimeoutError, asyncio.TimeoutError):
|
||
print("servicio.inexistente -> TimeoutError: nadie respondió a tiempo")
|
||
|
||
await sub_svc.unsubscribe()'''),
|
||
|
||
("md", """## 3 · JetStream — persistencia y replay
|
||
|
||
Todo lo anterior es **efímero**: si no hay nadie escuchando en el instante exacto de la publicación, el mensaje se pierde. **JetStream** añade una capa de almacenamiento:
|
||
|
||
- Un **stream** captura y persiste todos los mensajes de unos subjects dados.
|
||
- Los **consumers** leen del stream a su ritmo, pueden ser **durables** (recuerdan por dónde iban) y permiten **replay** de mensajes históricos.
|
||
|
||
Demostramos la diferencia clave con el core: publicamos a un stream **sin ningún consumidor activo** y, *después*, creamos un consumidor que recupera todos esos mensajes."""),
|
||
|
||
("code", '''js = nc.jetstream()
|
||
|
||
# Crear (o recrear limpio) un stream que persiste todo lo de 'eventos.>'
|
||
try:
|
||
await js.delete_stream("EVENTOS")
|
||
except Exception:
|
||
pass
|
||
stream = await js.add_stream(name="EVENTOS", subjects=["eventos.>"])
|
||
print(f"Stream creado: {stream.config.name} subjects={stream.config.subjects} storage={stream.config.storage}")
|
||
|
||
# Publicar 5 eventos SIN que haya ningún consumidor escuchando todavía
|
||
for i in range(5):
|
||
ack = await js.publish("eventos.click", f"evento #{i}".encode())
|
||
print(f" publicado eventos.click -> stream='{ack.stream}' seq={ack.seq}")
|
||
|
||
estado = (await js.stream_info("EVENTOS")).state
|
||
print()
|
||
print(f"Mensajes retenidos en el stream: {estado.messages} (siguen ahí aunque nadie los haya leído)")'''),
|
||
|
||
("md", """### Replay: leer los mensajes históricos
|
||
|
||
Ahora creamos un **consumer durable** (`lector-eventos`) y hacemos *fetch*. Recuperamos los 5 mensajes que se publicaron **antes** de que este consumidor existiera — algo imposible con el core NATS."""),
|
||
|
||
("code", '''# Pull consumer durable: pedimos los mensajes bajo demanda
|
||
psub = await js.pull_subscribe("eventos.>", durable="lector-eventos")
|
||
|
||
mensajes = await psub.fetch(5, timeout=2)
|
||
print(f"Recuperados {len(mensajes)} mensajes del stream (replay):")
|
||
for m in mensajes:
|
||
print(f" seq={m.metadata.sequence.stream} subject={m.subject} data={m.data.decode()!r}")
|
||
await m.ack() # confirmamos el procesado; el durable avanza su cursor
|
||
|
||
# Tras el ack, un segundo fetch no devuelve nada nuevo (cursor avanzado)
|
||
try:
|
||
extra = await psub.fetch(1, timeout=1)
|
||
except Exception:
|
||
extra = []
|
||
print()
|
||
print(f"Segundo fetch tras ack: {len(extra)} mensajes (el durable recuerda que ya los leyó)")'''),
|
||
|
||
("md", """## Resumen
|
||
|
||
| Patrón | Entrega | Persistencia | Caso de uso |
|
||
|---|---|---|---|
|
||
| Core pub/sub (nb 01) | a **todos** los subscribers | no (at-most-once) | eventos en vivo, telemetría |
|
||
| **Queue group** | a **uno** del grupo | no | *worker pool*, reparto de carga |
|
||
| **Request/Reply** | a uno, con respuesta | no | RPC, servicios |
|
||
| **JetStream** | configurable + **replay** | **sí** | event sourcing, colas durables, auditoría |
|
||
|
||
**Siguiente** → `03_procesos_reales.ipynb`: hasta ahora todo ha ocurrido dentro de un mismo kernel. Allí lanzamos publisher y subscribers como **procesos del sistema operativo independientes** para ver el desacople real."""),
|
||
]
|
||
|
||
|
||
# ----------------------------------------------------------------------------
|
||
# Notebook 03 — Procesos del sistema operativo reales
|
||
# ----------------------------------------------------------------------------
|
||
nb3 = [
|
||
("md", """# NATS pub/sub — 03 · Procesos del sistema operativo reales
|
||
|
||
En los notebooks 01 y 02 todo ocurrió dentro de un mismo kernel: varias conexiones `asyncio` simulaban procesos distintos. Eso es cómodo para explicar, pero NATS brilla precisamente cuando los participantes son **procesos del sistema operativo separados** —incluso en máquinas distintas— que solo comparten la dirección del broker y los nombres de subject.
|
||
|
||
Aquí lanzamos **procesos reales** con `subprocess`:
|
||
|
||
- un **publisher** (`procs/publisher.py`) que emite telemetría a `telemetria.cpu` y `telemetria.mem`;
|
||
- dos **subscribers** independientes (`procs/subscriber.py`), cada uno con su propio PID:
|
||
- `sub-todo` escucha `telemetria.>` (toda la telemetría),
|
||
- `sub-cpu` escucha solo `telemetria.cpu`.
|
||
|
||
Cada proceso abre su propia conexión al broker. El publisher **no sabe** cuántos subscribers hay ni qué escuchan: solo publica a un subject. Ese es el desacople real."""),
|
||
|
||
("md", """## 0 · Broker + scripts de los procesos
|
||
|
||
Arrancamos el broker (idempotente) y mostramos el código de los dos scripts que vamos a lanzar como procesos. Cada uno es un programa autónomo que se conecta a `nats://127.0.0.1:4222` y emite eventos como líneas JSON en su stdout, que el notebook recogerá."""),
|
||
|
||
("code", ENSURE_NATS + f'''
|
||
|
||
from pathlib import Path
|
||
|
||
PROCS = Path(r"{PROCS_ABS}")
|
||
print("Broker:", ensure_nats())
|
||
print("Scripts de proceso en:", PROCS)
|
||
print()
|
||
print("=== procs/publisher.py ===")
|
||
print(Path(PROCS / "publisher.py").read_text())'''),
|
||
|
||
("code", '''print("=== procs/subscriber.py ===")
|
||
print((PROCS / "subscriber.py").read_text())'''),
|
||
|
||
("md", """## 1 · Lanzar los procesos y orquestarlos
|
||
|
||
El notebook actúa de **orquestador**:
|
||
|
||
1. Lanza los dos subscribers como procesos (`subprocess.Popen`), cada uno con su PID. Les damos 1.5 s para que conecten y se suscriban.
|
||
2. Lanza el publisher, que emite 8 mensajes y termina.
|
||
3. Espera a que los subscribers terminen solos (su `--seconds`) y recoge su stdout.
|
||
|
||
Usamos `sys.executable` para que los procesos hijos usen el mismo intérprete (con `nats-py` instalado) que el kernel."""),
|
||
|
||
("code", '''import subprocess, sys, json, time
|
||
|
||
def lanzar_subscriber(nombre, subjects, seconds=4.5):
|
||
return subprocess.Popen(
|
||
[sys.executable, str(PROCS / "subscriber.py"),
|
||
"--name", nombre, "--subjects", subjects, "--seconds", str(seconds)],
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
|
||
)
|
||
|
||
# 1. Subscribers: procesos OS independientes
|
||
procs = {
|
||
"sub-todo": lanzar_subscriber("sub-todo", "telemetria.>"),
|
||
"sub-cpu": lanzar_subscriber("sub-cpu", "telemetria.cpu"),
|
||
}
|
||
print("Subscribers lanzados (PIDs del SO):", {n: p.pid for n, p in procs.items()})
|
||
time.sleep(1.5) # que conecten y se suscriban antes de publicar
|
||
|
||
# 2. Publisher: otro proceso OS, publica 8 mensajes y termina
|
||
pub = subprocess.run(
|
||
[sys.executable, str(PROCS / "publisher.py"), "--count", "8", "--interval", "0.15"],
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
|
||
)
|
||
pub_eventos = [json.loads(l) for l in pub.stdout.splitlines() if l.strip()]
|
||
print(f"Publisher (PID {pub_eventos[0]['pid']}) publicó {sum(1 for e in pub_eventos if e['event']=='published')} mensajes")
|
||
|
||
# 3. Recoger stdout de los subscribers (terminan solos por --seconds)
|
||
eventos = []
|
||
for nombre, p in procs.items():
|
||
out, err = p.communicate(timeout=10)
|
||
for l in out.splitlines():
|
||
if l.strip():
|
||
eventos.append(json.loads(l))
|
||
if err.strip():
|
||
print(f"[{nombre} stderr] {err.strip()[:200]}")
|
||
|
||
msgs = [e for e in eventos if e["event"] == "msg"]
|
||
print(f"\\nTotal de entregas recibidas entre todos los procesos: {len(msgs)}")'''),
|
||
|
||
("md", """## 2 · Qué recibió cada proceso
|
||
|
||
Cada subscriber es un PID distinto. `sub-todo` (suscrito a `telemetria.>`) recibe los 8 mensajes; `sub-cpu` (suscrito solo a `telemetria.cpu`) recibe únicamente los 4 de CPU. El broker filtró por subject sin que el publisher supiera nada de ello."""),
|
||
|
||
("code", '''import pandas as pd
|
||
|
||
df = pd.DataFrame(msgs)
|
||
# PID por nombre de proceso (demuestra que son procesos distintos)
|
||
pids = {e["name"]: e["pid"] for e in eventos if e["event"] == "ready"}
|
||
print("PID de cada proceso subscriber:", pids)
|
||
print()
|
||
|
||
# Conteo de mensajes por (proceso, subject)
|
||
tabla = df.groupby(["name", "subject"]).size().unstack(fill_value=0)
|
||
print("Mensajes recibidos por proceso y subject:")
|
||
print(tabla)
|
||
tabla'''),
|
||
|
||
("code", '''import matplotlib.pyplot as plt
|
||
|
||
resumen = df.groupby("name").size().reindex(["sub-todo", "sub-cpu"]).fillna(0).astype(int)
|
||
|
||
fig, ax = plt.subplots(figsize=(7, 3.2))
|
||
barras = ax.bar(resumen.index, resumen.values, color=["#7c3aed", "#0891b2"])
|
||
ax.bar_label(barras, padding=3)
|
||
ax.set_ylabel("mensajes recibidos")
|
||
ax.set_title("Telemetría recibida por cada PROCESO (8 publicados: 4 cpu + 4 mem)")
|
||
ax.set_ylim(0, 10)
|
||
for i, name in enumerate(resumen.index):
|
||
ax.text(i, -1.4, f"PID {pids.get(name, '?')}\\n{('telemetria.>' if name=='sub-todo' else 'telemetria.cpu')}",
|
||
ha="center", va="top", fontsize=8, color="#555")
|
||
plt.tight_layout(); plt.show()'''),
|
||
|
||
("md", """## 3 · Línea de tiempo de las entregas
|
||
|
||
Ordenando los mensajes por su marca temporal (`t`, segundos desde que cada proceso arrancó) se ve cómo ambos subscribers reciben los mensajes de CPU casi a la vez (fan-out), mientras que los de memoria solo llegan a `sub-todo`."""),
|
||
|
||
("code", '''fig, ax = plt.subplots(figsize=(9, 3))
|
||
colores = {"telemetria.cpu": "#ef4444", "telemetria.mem": "#3b82f6"}
|
||
y_de = {"sub-todo": 1, "sub-cpu": 0}
|
||
for e in msgs:
|
||
ax.scatter(e["t"], y_de[e["name"]], color=colores[e["subject"]], s=80, zorder=3)
|
||
ax.set_yticks([0, 1]); ax.set_yticklabels(["sub-cpu", "sub-todo"])
|
||
ax.set_xlabel("t (segundos desde el arranque de cada proceso)")
|
||
ax.set_title("Timeline de entregas — rojo: telemetria.cpu, azul: telemetria.mem")
|
||
ax.grid(axis="x", alpha=0.3)
|
||
from matplotlib.patches import Patch
|
||
ax.legend(handles=[Patch(color=c, label=s) for s, c in colores.items()], loc="upper right")
|
||
plt.tight_layout(); plt.show()'''),
|
||
|
||
("md", """## Resumen del análisis
|
||
|
||
A lo largo de los tres notebooks hemos visto cómo distintos procesos envían datos por pub/sub con NATS:
|
||
|
||
- **01** — el modelo base: publishers y subscribers desacoplados por un broker, *fan-out* y *wildcards*.
|
||
- **02** — patrones de orden superior: *queue groups* (reparto de carga), *request/reply* (RPC) y *JetStream* (persistencia y replay).
|
||
- **03** — **procesos del SO reales**: el desacople de verdad. El publisher no conoce a sus subscribers; el broker enruta por subject. Añadir o quitar procesos consumidores no cambia ni una línea del publisher.
|
||
|
||
Esa es la idea central de NATS: **los procesos se comunican por nombres de subject, no por direcciones**, y el broker se encarga del resto.
|
||
|
||
### Limpieza (opcional)
|
||
|
||
Para parar el broker cuando termines:
|
||
|
||
```python
|
||
import subprocess
|
||
subprocess.run(["docker", "stop", "nats_demo"]) # detener
|
||
subprocess.run(["docker", "rm", "nats_demo"]) # eliminar
|
||
```"""),
|
||
]
|
||
|
||
|
||
if __name__ == "__main__":
|
||
build("01_core_pubsub.ipynb", nb1)
|
||
build("02_queue_request_jetstream.ipynb", nb2)
|
||
build("03_procesos_reales.ipynb", nb3)
|
||
print("OK: 3 notebooks generados en", NBDIR)
|