#!/usr/bin/env python3 """Collector api_hn_top — issue 0066 MVP. Wire protocol (espejo del de graph_explorer enrichers, issue 0026): - stdin: JSON con `ops_db_path`, `app_dir`, `registry_root`, `params` (limit, timeout_s). - stderr: lineas `PROGRESS: ` para feedback de UI. - stdout: una linea JSON al final con resumen `{entities_added, items}`. - exit code 0 = ok, !=0 = error. Uso standalone (sin odr_console): cd projects/online_data_recopilation/apps/odr_console echo '{"ops_db_path":"operations.db","app_dir":".","params":{"limit":5}}' \ | python/.venv/bin/python3 collectors/api_hn_top/run.py """ from __future__ import annotations import json import os import sqlite3 import sys import time import uuid from datetime import datetime, timezone from pathlib import Path def progress(p: float, stage: str = "") -> None: sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") sys.stderr.flush() def log(msg: str) -> None: sys.stderr.write(f"{msg}\n") sys.stderr.flush() def load_registry_funcs(registry_root: str): """Importa funciones del registry. Prefiere `_vendored/`, fallback a `/python/functions/` (modo dev).""" vendored = Path(__file__).parent / "_vendored" if vendored.is_dir(): if str(vendored) not in sys.path: sys.path.insert(0, str(vendored)) return if registry_root: path = Path(registry_root) / "python" / "functions" if path.is_dir() and str(path) not in sys.path: sys.path.insert(0, str(path)) def now_utc_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def open_ops_db(path: str) -> sqlite3.Connection: conn = sqlite3.connect(path) conn.execute("PRAGMA foreign_keys=ON;") return conn def insert_entity( conn: sqlite3.Connection, eid: str, name: str, type_ref: str, metadata: dict, source: str, ) -> None: ts = now_utc_iso() conn.execute( """INSERT OR REPLACE INTO entities (id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at) VALUES (?, ?, ?, 'active', '', '', '[]', ?, ?, '', ?, ?)""", (eid, name, type_ref, source, json.dumps(metadata), ts, ts), ) def insert_execution( conn: sqlite3.Connection, pipeline_id: str, started_at: str, ended_at: str, duration_ms: int, records_in: int, records_out: int, status: str, error: str, metrics: dict, ) -> None: ts = now_utc_iso() conn.execute( """INSERT INTO executions (id, pipeline_id, relation_id, status, started_at, ended_at, duration_ms, records_in, records_out, error, metrics, created_at) VALUES (?, ?, '', ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( f"e_{uuid.uuid4().hex[:12]}", pipeline_id, status, started_at, ended_at, duration_ms, records_in, records_out, error, json.dumps(metrics), ts, ), ) def main() -> int: raw = sys.stdin.read() try: ctx = json.loads(raw) if raw.strip() else {} except Exception as e: log(f"stdin not valid JSON: {e}") return 2 ops_db_path = ctx.get("ops_db_path") or "operations.db" registry_root = ctx.get("registry_root") or os.environ.get("FN_REGISTRY_ROOT", "") params = ctx.get("params") or {} limit = int(params.get("limit", 30)) timeout_s = int(params.get("timeout_s", 15)) load_registry_funcs(registry_root) try: from infra.http_get_json import http_get_json except ImportError: # Fallback: stdlib urllib directo si el registry no esta disponible. log("registry funcs unavailable; falling back to urllib") import urllib.request def http_get_json(url, headers=None, params=None, timeout=30.0): req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode("utf-8")) started = time.time() started_iso = now_utc_iso() progress(0.05, "fetch top ids") try: ids = http_get_json( "https://hacker-news.firebaseio.com/v0/topstories.json", timeout=timeout_s, ) if not isinstance(ids, list): raise RuntimeError(f"expected list, got {type(ids).__name__}") except Exception as e: log(f"fetch topstories failed: {e}") ended_iso = now_utc_iso() try: conn = open_ops_db(ops_db_path) insert_execution( conn, "api_hn_top", started_iso, ended_iso, int((time.time() - started) * 1000), 0, 0, "failure", str(e), {}, ) conn.commit() conn.close() except Exception: pass return 1 ids = ids[:limit] progress(0.2, f"fetch {len(ids)} stories") items = [] n_added = 0 try: conn = open_ops_db(ops_db_path) except Exception as e: log(f"open ops_db failed: {e}") return 1 for i, sid in enumerate(ids): try: story = http_get_json( f"https://hacker-news.firebaseio.com/v0/item/{sid}.json", timeout=timeout_s, ) except Exception as e: log(f"item {sid}: {e}") continue if not isinstance(story, dict): continue eid = f"hn_{sid}" title = story.get("title") or "(untitled)" meta = { "hn_id": sid, "title": title, "url": story.get("url") or "", "score": story.get("score"), "by": story.get("by"), "time": story.get("time"), "type": story.get("type"), "descendants": story.get("descendants"), } try: insert_entity(conn, eid, title, "HnStory", meta, "api_hn_top") n_added += 1 items.append({"id": eid, "title": title}) except Exception as e: log(f"insert {eid}: {e}") continue progress(0.2 + 0.7 * (i + 1) / len(ids), f"{i+1}/{len(ids)}") ended_iso = now_utc_iso() duration_ms = int((time.time() - started) * 1000) try: insert_execution( conn, "api_hn_top", started_iso, ended_iso, duration_ms, len(ids), n_added, "success", "", {"limit": limit, "fetched": len(ids), "stored": n_added}, ) conn.commit() finally: conn.close() progress(1.0, "done") summary = { "entities_added": n_added, "items_total": len(ids), "duration_ms": duration_ms, } sys.stdout.write(json.dumps(summary) + "\n") sys.stdout.flush() return 0 if __name__ == "__main__": sys.exit(main())