#!/usr/bin/env python3 """Subscriber NATS como proceso del sistema operativo independiente. Se conecta al broker, se suscribe a uno o varios subjects y emite cada evento como una linea JSON en stdout para que el proceso padre (el notebook) la lea. Termina solo tras `--seconds` segundos. """ import argparse import asyncio import json import os import time import nats NATS_URL = "nats://127.0.0.1:4222" def emit(event: dict) -> None: """Escribe un evento como linea JSON en stdout, con flush inmediato.""" print(json.dumps(event), flush=True) async def main(name: str, subjects: list[str], seconds: float) -> None: pid = os.getpid() nc = await nats.connect(NATS_URL, name=name) received = 0 t0 = time.monotonic() async def handler(msg): nonlocal received received += 1 emit({ "event": "msg", "pid": pid, "name": name, "subject": msg.subject, "data": msg.data.decode(), "t": round(time.monotonic() - t0, 4), }) for subject in subjects: await nc.subscribe(subject, cb=handler) # Senal de que este proceso ya esta escuchando (el padre la espera). emit({"event": "ready", "pid": pid, "name": name, "subjects": subjects}) await asyncio.sleep(seconds) emit({"event": "done", "pid": pid, "name": name, "received": received}) await nc.drain() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Subscriber NATS de demostracion") parser.add_argument("--name", required=True, help="Nombre logico del subscriber") parser.add_argument("--subjects", required=True, help="Subjects separados por coma (admite wildcards)") parser.add_argument("--seconds", type=float, default=4.0, help="Tiempo de escucha antes de terminar") args = parser.parse_args() asyncio.run(main(args.name, args.subjects.split(","), args.seconds))