62 lines
1.9 KiB
Python
62 lines
1.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Subscriber NATS como proceso del sistema operativo independiente.
|
|
|
|
Se conecta al broker, se suscribe a uno o varios subjects y emite cada evento
|
|
como una linea JSON en stdout para que el proceso padre (el notebook) la lea.
|
|
Termina solo tras `--seconds` segundos.
|
|
"""
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import time
|
|
|
|
import nats
|
|
|
|
NATS_URL = "nats://127.0.0.1:4222"
|
|
|
|
|
|
def emit(event: dict) -> None:
|
|
"""Escribe un evento como linea JSON en stdout, con flush inmediato."""
|
|
print(json.dumps(event), flush=True)
|
|
|
|
|
|
async def main(name: str, subjects: list[str], seconds: float) -> None:
|
|
pid = os.getpid()
|
|
nc = await nats.connect(NATS_URL, name=name)
|
|
received = 0
|
|
t0 = time.monotonic()
|
|
|
|
async def handler(msg):
|
|
nonlocal received
|
|
received += 1
|
|
emit({
|
|
"event": "msg",
|
|
"pid": pid,
|
|
"name": name,
|
|
"subject": msg.subject,
|
|
"data": msg.data.decode(),
|
|
"t": round(time.monotonic() - t0, 4),
|
|
})
|
|
|
|
for subject in subjects:
|
|
await nc.subscribe(subject, cb=handler)
|
|
|
|
# Senal de que este proceso ya esta escuchando (el padre la espera).
|
|
emit({"event": "ready", "pid": pid, "name": name, "subjects": subjects})
|
|
|
|
await asyncio.sleep(seconds)
|
|
emit({"event": "done", "pid": pid, "name": name, "received": received})
|
|
await nc.drain()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Subscriber NATS de demostracion")
|
|
parser.add_argument("--name", required=True, help="Nombre logico del subscriber")
|
|
parser.add_argument("--subjects", required=True,
|
|
help="Subjects separados por coma (admite wildcards)")
|
|
parser.add_argument("--seconds", type=float, default=4.0,
|
|
help="Tiempo de escucha antes de terminar")
|
|
args = parser.parse_args()
|
|
asyncio.run(main(args.name, args.subjects.split(","), args.seconds))
|