{ "cells": [ { "cell_type": "markdown", "id": "05860b9f", "metadata": {}, "source": [ "# NATS pub/sub — 03 · Procesos del sistema operativo reales\n", "\n", "En los notebooks 01 y 02 todo ocurrió dentro de un mismo kernel: varias conexiones `asyncio` simulaban procesos distintos. Eso es cómodo para explicar, pero NATS brilla precisamente cuando los participantes son **procesos del sistema operativo separados** —incluso en máquinas distintas— que solo comparten la dirección del broker y los nombres de subject.\n", "\n", "Aquí lanzamos **procesos reales** con `subprocess`:\n", "\n", "- un **publisher** (`procs/publisher.py`) que emite telemetría a `telemetria.cpu` y `telemetria.mem`;\n", "- dos **subscribers** independientes (`procs/subscriber.py`), cada uno con su propio PID:\n", " - `sub-todo` escucha `telemetria.>` (toda la telemetría),\n", " - `sub-cpu` escucha solo `telemetria.cpu`.\n", "\n", "Cada proceso abre su propia conexión al broker. El publisher **no sabe** cuántos subscribers hay ni qué escuchan: solo publica a un subject. Ese es el desacople real." ] }, { "cell_type": "markdown", "id": "c5127085", "metadata": {}, "source": [ "## 0 · Broker + scripts de los procesos\n", "\n", "Arrancamos el broker (idempotente) y mostramos el código de los dos scripts que vamos a lanzar como procesos. Cada uno es un programa autónomo que se conecta a `nats://127.0.0.1:4222` y emite eventos como líneas JSON en su stdout, que el notebook recogerá." ] }, { "cell_type": "code", "execution_count": 1, "id": "bb720c29", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Broker: already-running\n", "Scripts de proceso en: /home/enmanuel/fn_registry/analysis/nats/notebooks/procs\n", "\n", "=== procs/publisher.py ===\n", "#!/usr/bin/env python3\n", "\"\"\"Publisher NATS como proceso del sistema operativo independiente.\n", "\n", "Se conecta al broker y publica una rafaga de mensajes de telemetria,\n", "alternando entre los subjects `telemetria.cpu` y `telemetria.mem`.\n", "No sabe ni le importa cuantos subscribers hay escuchando: solo conoce el\n", "subject. Emite cada publicacion como linea JSON en stdout.\n", "\"\"\"\n", "import argparse\n", "import asyncio\n", "import json\n", "import os\n", "import random\n", "import time\n", "\n", "import nats\n", "\n", "NATS_URL = \"nats://127.0.0.1:4222\"\n", "\n", "\n", "def emit(event: dict) -> None:\n", " print(json.dumps(event), flush=True)\n", "\n", "\n", "async def main(count: int, interval: float) -> None:\n", " pid = os.getpid()\n", " nc = await nats.connect(NATS_URL, name=\"publisher\")\n", " emit({\"event\": \"ready\", \"pid\": pid, \"name\": \"publisher\"})\n", "\n", " for i in range(count):\n", " subject = \"telemetria.cpu\" if i % 2 == 0 else \"telemetria.mem\"\n", " payload = json.dumps({\"i\": i, \"valor\": round(random.uniform(0, 100), 1)})\n", " await nc.publish(subject, payload.encode())\n", " emit({\"event\": \"published\", \"pid\": pid, \"subject\": subject, \"i\": i})\n", " await asyncio.sleep(interval)\n", "\n", " await nc.flush()\n", " emit({\"event\": \"done\", \"pid\": pid, \"name\": \"publisher\", \"published\": count})\n", " await nc.drain()\n", "\n", "\n", "if __name__ == \"__main__\":\n", " parser = argparse.ArgumentParser(description=\"Publisher NATS de demostracion\")\n", " parser.add_argument(\"--count\", type=int, default=8, help=\"Numero de mensajes a publicar\")\n", " parser.add_argument(\"--interval\", type=float, default=0.15,\n", " help=\"Segundos entre publicaciones\")\n", " args = parser.parse_args()\n", " asyncio.run(main(args.count, args.interval))\n", "\n" ] } ], "source": [ "import subprocess, time, json\n", "\n", "NATS_CONTAINER = \"nats_demo\"\n", "NATS_PORT = 4222\n", "NATS_URL = f\"nats://127.0.0.1:{NATS_PORT}\"\n", "\n", "def _docker(*args, check=True):\n", " return subprocess.run([\"docker\", *args], capture_output=True, text=True, check=check)\n", "\n", "def ensure_nats(name=NATS_CONTAINER, port=NATS_PORT):\n", " \"\"\"Arranca un broker NATS en Docker de forma idempotente. Devuelve el estado.\"\"\"\n", " out = _docker(\"ps\", \"-a\", \"--filter\", f\"name=^{name}$\", \"--format\", \"{{.State}}\", check=False).stdout.strip()\n", " if out == \"running\":\n", " state = \"already-running\"\n", " elif out in (\"exited\", \"created\", \"paused\"):\n", " _docker(\"start\", name)\n", " state = \"started\"\n", " else:\n", " _docker(\"run\", \"-d\", \"--name\", name, \"-p\", f\"{port}:4222\", \"-p\", \"8222:8222\",\n", " \"nats:latest\", \"-js\", \"-m\", \"8222\")\n", " state = \"created\"\n", " time.sleep(1.0)\n", " return state\n", "\n", "from pathlib import Path\n", "\n", "PROCS = Path(r\"/home/enmanuel/fn_registry/analysis/nats/notebooks/procs\")\n", "print(\"Broker:\", ensure_nats())\n", "print(\"Scripts de proceso en:\", PROCS)\n", "print()\n", "print(\"=== procs/publisher.py ===\")\n", "print(Path(PROCS / \"publisher.py\").read_text())" ] }, { "cell_type": "code", "execution_count": null, "id": "3412b705", "metadata": {}, "outputs": [], "source": [ "print(\"=== procs/subscriber.py ===\")\n", "print((PROCS / \"subscriber.py\").read_text())" ] }, { "cell_type": "markdown", "id": "e17dd705", "metadata": {}, "source": [ "## 1 · Lanzar los procesos y orquestarlos\n", "\n", "El notebook actúa de **orquestador**:\n", "\n", "1. Lanza los dos subscribers como procesos (`subprocess.Popen`), cada uno con su PID. Les damos 1.5 s para que conecten y se suscriban.\n", "2. Lanza el publisher, que emite 8 mensajes y termina.\n", "3. Espera a que los subscribers terminen solos (su `--seconds`) y recoge su stdout.\n", "\n", "Usamos `sys.executable` para que los procesos hijos usen el mismo intérprete (con `nats-py` instalado) que el kernel." ] }, { "cell_type": "code", "execution_count": null, "id": "f647029e", "metadata": {}, "outputs": [], "source": [ "import subprocess, sys, json, time\n", "\n", "def lanzar_subscriber(nombre, subjects, seconds=4.5):\n", " return subprocess.Popen(\n", " [sys.executable, str(PROCS / \"subscriber.py\"),\n", " \"--name\", nombre, \"--subjects\", subjects, \"--seconds\", str(seconds)],\n", " stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,\n", " )\n", "\n", "# 1. Subscribers: procesos OS independientes\n", "procs = {\n", " \"sub-todo\": lanzar_subscriber(\"sub-todo\", \"telemetria.>\"),\n", " \"sub-cpu\": lanzar_subscriber(\"sub-cpu\", \"telemetria.cpu\"),\n", "}\n", "print(\"Subscribers lanzados (PIDs del SO):\", {n: p.pid for n, p in procs.items()})\n", "time.sleep(1.5) # que conecten y se suscriban antes de publicar\n", "\n", "# 2. Publisher: otro proceso OS, publica 8 mensajes y termina\n", "pub = subprocess.run(\n", " [sys.executable, str(PROCS / \"publisher.py\"), \"--count\", \"8\", \"--interval\", \"0.15\"],\n", " stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,\n", ")\n", "pub_eventos = [json.loads(l) for l in pub.stdout.splitlines() if l.strip()]\n", "print(f\"Publisher (PID {pub_eventos[0]['pid']}) publicó {sum(1 for e in pub_eventos if e['event']=='published')} mensajes\")\n", "\n", "# 3. Recoger stdout de los subscribers (terminan solos por --seconds)\n", "eventos = []\n", "for nombre, p in procs.items():\n", " out, err = p.communicate(timeout=10)\n", " for l in out.splitlines():\n", " if l.strip():\n", " eventos.append(json.loads(l))\n", " if err.strip():\n", " print(f\"[{nombre} stderr] {err.strip()[:200]}\")\n", "\n", "msgs = [e for e in eventos if e[\"event\"] == \"msg\"]\n", "print(f\"\\nTotal de entregas recibidas entre todos los procesos: {len(msgs)}\")" ] }, { "cell_type": "markdown", "id": "33dcf1f4", "metadata": {}, "source": [ "## 2 · Qué recibió cada proceso\n", "\n", "Cada subscriber es un PID distinto. `sub-todo` (suscrito a `telemetria.>`) recibe los 8 mensajes; `sub-cpu` (suscrito solo a `telemetria.cpu`) recibe únicamente los 4 de CPU. El broker filtró por subject sin que el publisher supiera nada de ello." ] }, { "cell_type": "code", "execution_count": null, "id": "01ae57ed", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "df = pd.DataFrame(msgs)\n", "# PID por nombre de proceso (demuestra que son procesos distintos)\n", "pids = {e[\"name\"]: e[\"pid\"] for e in eventos if e[\"event\"] == \"ready\"}\n", "print(\"PID de cada proceso subscriber:\", pids)\n", "print()\n", "\n", "# Conteo de mensajes por (proceso, subject)\n", "tabla = df.groupby([\"name\", \"subject\"]).size().unstack(fill_value=0)\n", "print(\"Mensajes recibidos por proceso y subject:\")\n", "print(tabla)\n", "tabla" ] }, { "cell_type": "code", "execution_count": null, "id": "9a5ee65b", "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "resumen = df.groupby(\"name\").size().reindex([\"sub-todo\", \"sub-cpu\"]).fillna(0).astype(int)\n", "\n", "fig, ax = plt.subplots(figsize=(7, 3.2))\n", "barras = ax.bar(resumen.index, resumen.values, color=[\"#7c3aed\", \"#0891b2\"])\n", "ax.bar_label(barras, padding=3)\n", "ax.set_ylabel(\"mensajes recibidos\")\n", "ax.set_title(\"Telemetría recibida por cada PROCESO (8 publicados: 4 cpu + 4 mem)\")\n", "ax.set_ylim(0, 10)\n", "for i, name in enumerate(resumen.index):\n", " ax.text(i, -1.4, f\"PID {pids.get(name, '?')}\\n{('telemetria.>' if name=='sub-todo' else 'telemetria.cpu')}\",\n", " ha=\"center\", va=\"top\", fontsize=8, color=\"#555\")\n", "plt.tight_layout(); plt.show()" ] }, { "cell_type": "markdown", "id": "b8d60d73", "metadata": {}, "source": [ "## 3 · Línea de tiempo de las entregas\n", "\n", "Ordenando los mensajes por su marca temporal (`t`, segundos desde que cada proceso arrancó) se ve cómo ambos subscribers reciben los mensajes de CPU casi a la vez (fan-out), mientras que los de memoria solo llegan a `sub-todo`." ] }, { "cell_type": "code", "execution_count": null, "id": "1656576f", "metadata": {}, "outputs": [], "source": [ "fig, ax = plt.subplots(figsize=(9, 3))\n", "colores = {\"telemetria.cpu\": \"#ef4444\", \"telemetria.mem\": \"#3b82f6\"}\n", "y_de = {\"sub-todo\": 1, \"sub-cpu\": 0}\n", "for e in msgs:\n", " ax.scatter(e[\"t\"], y_de[e[\"name\"]], color=colores[e[\"subject\"]], s=80, zorder=3)\n", "ax.set_yticks([0, 1]); ax.set_yticklabels([\"sub-cpu\", \"sub-todo\"])\n", "ax.set_xlabel(\"t (segundos desde el arranque de cada proceso)\")\n", "ax.set_title(\"Timeline de entregas — rojo: telemetria.cpu, azul: telemetria.mem\")\n", "ax.grid(axis=\"x\", alpha=0.3)\n", "from matplotlib.patches import Patch\n", "ax.legend(handles=[Patch(color=c, label=s) for s, c in colores.items()], loc=\"upper right\")\n", "plt.tight_layout(); plt.show()" ] }, { "cell_type": "markdown", "id": "5914c849", "metadata": {}, "source": [ "## Resumen del análisis\n", "\n", "A lo largo de los tres notebooks hemos visto cómo distintos procesos envían datos por pub/sub con NATS:\n", "\n", "- **01** — el modelo base: publishers y subscribers desacoplados por un broker, *fan-out* y *wildcards*.\n", "- **02** — patrones de orden superior: *queue groups* (reparto de carga), *request/reply* (RPC) y *JetStream* (persistencia y replay).\n", "- **03** — **procesos del SO reales**: el desacople de verdad. El publisher no conoce a sus subscribers; el broker enruta por subject. Añadir o quitar procesos consumidores no cambia ni una línea del publisher.\n", "\n", "Esa es la idea central de NATS: **los procesos se comunican por nombres de subject, no por direcciones**, y el broker se encarga del resto.\n", "\n", "### Limpieza (opcional)\n", "\n", "Para parar el broker cuando termines:\n", "\n", "```python\n", "import subprocess\n", "subprocess.run([\"docker\", \"stop\", \"nats_demo\"]) # detener\n", "subprocess.run([\"docker\", \"rm\", \"nats_demo\"]) # eliminar\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "name": "" } }, "nbformat": 4, "nbformat_minor": 5 }