Files
odr_console/collectors/api_hn_top/run.py
T
2026-05-09 18:11:22 +02:00

238 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""Collector api_hn_top — issue 0066 MVP.
Wire protocol (espejo del de graph_explorer enrichers, issue 0026):
- stdin: JSON con `ops_db_path`, `app_dir`, `registry_root`, `params` (limit, timeout_s).
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen `{entities_added, items}`.
- exit code 0 = ok, !=0 = error.
Uso standalone (sin odr_console):
cd projects/online_data_recopilation/apps/odr_console
echo '{"ops_db_path":"operations.db","app_dir":".","params":{"limit":5}}' \
| python/.venv/bin/python3 collectors/api_hn_top/run.py
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def load_registry_funcs(registry_root: str):
"""Importa funciones del registry. Prefiere `_vendored/`, fallback a
`<registry_root>/python/functions/` (modo dev)."""
vendored = Path(__file__).parent / "_vendored"
if vendored.is_dir():
if str(vendored) not in sys.path:
sys.path.insert(0, str(vendored))
return
if registry_root:
path = Path(registry_root) / "python" / "functions"
if path.is_dir() and str(path) not in sys.path:
sys.path.insert(0, str(path))
def now_utc_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def open_ops_db(path: str) -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.execute("PRAGMA foreign_keys=ON;")
return conn
def insert_entity(
conn: sqlite3.Connection,
eid: str,
name: str,
type_ref: str,
metadata: dict,
source: str,
) -> None:
ts = now_utc_iso()
conn.execute(
"""INSERT OR REPLACE INTO entities
(id, name, type_ref, status, description, domain, tags,
source, metadata, notes, created_at, updated_at)
VALUES (?, ?, ?, 'active', '', '', '[]', ?, ?, '', ?, ?)""",
(eid, name, type_ref, source, json.dumps(metadata), ts, ts),
)
def insert_execution(
conn: sqlite3.Connection,
pipeline_id: str,
started_at: str,
ended_at: str,
duration_ms: int,
records_in: int,
records_out: int,
status: str,
error: str,
metrics: dict,
) -> None:
ts = now_utc_iso()
conn.execute(
"""INSERT INTO executions
(id, pipeline_id, relation_id, status, started_at, ended_at,
duration_ms, records_in, records_out, error, metrics, created_at)
VALUES (?, ?, '', ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
f"e_{uuid.uuid4().hex[:12]}",
pipeline_id,
status,
started_at,
ended_at,
duration_ms,
records_in,
records_out,
error,
json.dumps(metrics),
ts,
),
)
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw) if raw.strip() else {}
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
ops_db_path = ctx.get("ops_db_path") or "operations.db"
registry_root = ctx.get("registry_root") or os.environ.get("FN_REGISTRY_ROOT", "")
params = ctx.get("params") or {}
limit = int(params.get("limit", 30))
timeout_s = int(params.get("timeout_s", 15))
load_registry_funcs(registry_root)
try:
from infra.http_get_json import http_get_json
except ImportError:
# Fallback: stdlib urllib directo si el registry no esta disponible.
log("registry funcs unavailable; falling back to urllib")
import urllib.request
def http_get_json(url, headers=None, params=None, timeout=30.0):
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode("utf-8"))
started = time.time()
started_iso = now_utc_iso()
progress(0.05, "fetch top ids")
try:
ids = http_get_json(
"https://hacker-news.firebaseio.com/v0/topstories.json",
timeout=timeout_s,
)
if not isinstance(ids, list):
raise RuntimeError(f"expected list, got {type(ids).__name__}")
except Exception as e:
log(f"fetch topstories failed: {e}")
ended_iso = now_utc_iso()
try:
conn = open_ops_db(ops_db_path)
insert_execution(
conn, "api_hn_top", started_iso, ended_iso,
int((time.time() - started) * 1000), 0, 0, "failure",
str(e), {},
)
conn.commit()
conn.close()
except Exception:
pass
return 1
ids = ids[:limit]
progress(0.2, f"fetch {len(ids)} stories")
items = []
n_added = 0
try:
conn = open_ops_db(ops_db_path)
except Exception as e:
log(f"open ops_db failed: {e}")
return 1
for i, sid in enumerate(ids):
try:
story = http_get_json(
f"https://hacker-news.firebaseio.com/v0/item/{sid}.json",
timeout=timeout_s,
)
except Exception as e:
log(f"item {sid}: {e}")
continue
if not isinstance(story, dict):
continue
eid = f"hn_{sid}"
title = story.get("title") or "(untitled)"
meta = {
"hn_id": sid,
"title": title,
"url": story.get("url") or "",
"score": story.get("score"),
"by": story.get("by"),
"time": story.get("time"),
"type": story.get("type"),
"descendants": story.get("descendants"),
}
try:
insert_entity(conn, eid, title, "HnStory", meta, "api_hn_top")
n_added += 1
items.append({"id": eid, "title": title})
except Exception as e:
log(f"insert {eid}: {e}")
continue
progress(0.2 + 0.7 * (i + 1) / len(ids), f"{i+1}/{len(ids)}")
ended_iso = now_utc_iso()
duration_ms = int((time.time() - started) * 1000)
try:
insert_execution(
conn, "api_hn_top", started_iso, ended_iso, duration_ms,
len(ids), n_added, "success", "",
{"limit": limit, "fetched": len(ids), "stored": n_added},
)
conn.commit()
finally:
conn.close()
progress(1.0, "done")
summary = {
"entities_added": n_added,
"items_total": len(ids),
"duration_ms": duration_ms,
}
sys.stdout.write(json.dumps(summary) + "\n")
sys.stdout.flush()
return 0
if __name__ == "__main__":
sys.exit(main())