From 7ec6c4e09f9316db4864218c0f112321020f417a Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 1 May 2026 18:24:52 +0200 Subject: [PATCH] =?UTF-8?q?feat(enrichers):=20cuatro=20enrichers=20web=20?= =?UTF-8?q?=E2=80=94=20fetch=20+=20extract=20trio=20(issues=200028,=200028?= =?UTF-8?q?b)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cada enricher es un par manifest.yaml + run.py en enrichers//. 1. fetch_webpage (Url, Webpage): HTTP GET (requests, fallback urllib) -> html_to_markdown_py_core -> sha256(url) -> guarda HTML+MD en cache//.{html,md}. Convierte Url -> Webpage con metadata enriquecida (title/status_code/content_type/ paths/text_length). Crea Domain con relacion BELONGS_TO. 2. extract_domain (Url, Webpage, Email): Saca dominio de metadata.url o metadata.address (sin I/O). Crea/conecta Domain con BELONGS_TO. Util cuando el usuario quiere ver el dominio antes de fetch. 3. extract_links (Webpage): Lee metadata.markdown_path -> extract_urls_py_cybersecurity -> dedup -> crea nodo Url por enlace + relacion LINKS_TO. Param max_links (50). 4. extract_text_entities (Webpage): Lee metadata.markdown_path -> extract_iocs_py_cybersecurity (regex puro, sin coste) -> crea entidades por (type, value) tipadas en el registro: Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone. Cada una con relacion EXTRACTED_FROM al Webpage origen. v1 sin GLiNER/ GLiREL — esos requieren modelos pre-cargados (futura iteracion). Probado end-to-end: fetch_webpage https://httpbin.org/html -> 1 Webpage + 1 Domain extract_links -> 2 Url + 2 LINKS_TO extract_text_entities -> 8 IoCs (Email, IP*2, CVE, Domain*2, Wallet, Phone) Co-Authored-By: Claude Opus 4.7 (1M context) --- enrichers/extract_domain/manifest.yaml | 7 + enrichers/extract_domain/run.py | 125 +++++++ enrichers/extract_links/manifest.yaml | 8 + enrichers/extract_links/run.py | 139 +++++++ enrichers/extract_text_entities/manifest.yaml | 9 + enrichers/extract_text_entities/run.py | 187 ++++++++++ enrichers/fetch_webpage/manifest.yaml | 8 + enrichers/fetch_webpage/run.py | 338 ++++++++++++++++++ 8 files changed, 821 insertions(+) create mode 100644 enrichers/extract_domain/manifest.yaml create mode 100755 enrichers/extract_domain/run.py create mode 100644 enrichers/extract_links/manifest.yaml create mode 100755 enrichers/extract_links/run.py create mode 100644 enrichers/extract_text_entities/manifest.yaml create mode 100755 enrichers/extract_text_entities/run.py create mode 100644 enrichers/fetch_webpage/manifest.yaml create mode 100755 enrichers/fetch_webpage/run.py diff --git a/enrichers/extract_domain/manifest.yaml b/enrichers/extract_domain/manifest.yaml new file mode 100644 index 0000000..7a14e75 --- /dev/null +++ b/enrichers/extract_domain/manifest.yaml @@ -0,0 +1,7 @@ +id: extract_domain +name: "Extract domain" +description: "Saca el dominio de la url/email del nodo y crea/conecta una entidad Domain con relacion BELONGS_TO. No descarga nada." +applies_to: [Url, Webpage, Email] +emits: [Domain] +relations: [BELONGS_TO] +params: [] diff --git a/enrichers/extract_domain/run.py b/enrichers/extract_domain/run.py new file mode 100755 index 0000000..53f73fb --- /dev/null +++ b/enrichers/extract_domain/run.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +"""Enricher extract_domain — issue 0028b. + +Saca el dominio de un nodo Url/Webpage (campo metadata.url) o Email (campo +metadata.address) y crea/conecta una entidad Domain con relacion BELONGS_TO. +No hace I/O de red. +""" +from __future__ import annotations + +import json +import sqlite3 +import sys +import time +from datetime import datetime, timezone +from urllib.parse import urlparse + + +def progress(p: float, stage: str = "") -> None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def domain_from_url(url: str) -> str: + if not url: + return "" + if "://" not in url: + url = "https://" + url + try: + return (urlparse(url).hostname or "").lower() + except Exception: + return "" + + +def domain_from_email(addr: str) -> str: + if "@" not in addr: + return "" + return addr.split("@", 1)[1].strip().lower() + + +def main() -> int: + ctx = json.loads(sys.stdin.read()) + node_id = ctx.get("node_id") or "" + node_type = (ctx.get("node_type") or "").lower() + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + ops_db = ctx.get("ops_db_path") or "" + if not node_id or not ops_db: + sys.stderr.write("missing node_id / ops_db_path\n") + return 2 + + progress(0.30, "extracting") + dname = "" + if node_type == "email": + addr = metadata.get("address") or ctx.get("node_name") or "" + dname = domain_from_email(addr) + else: + url = metadata.get("url") or ctx.get("node_name") or "" + dname = domain_from_url(url) + + if not dname: + print(json.dumps({"warning": "no domain extractable", + "entities_added": 0, "relations_added": 0})) + return 0 + + progress(0.70, "writing") + conn = sqlite3.connect(ops_db) + entities_added = 0 + relations_added = 0 + try: + existed = conn.execute( + "SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1", + (dname,), + ).fetchone() + if existed: + domain_id = existed[0] + else: + domain_id = f"Domain_{now_ms()}" + ts = now_iso() + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) " + "VALUES (?, ?, 'Domain', 'enricher:extract_domain', ?, ?)", + (domain_id, dname, ts, ts), + ) + entities_added = 1 + + rel_exists = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='BELONGS_TO' LIMIT 1", + (node_id, domain_id), + ).fetchone() + if not rel_exists: + ts = now_iso() + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) " + "VALUES (?, 'BELONGS_TO', ?, ?, ?, ?)", + (f"rel_{now_ms()}_belongs_to", node_id, domain_id, ts, ts), + ) + relations_added = 1 + + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "domain": dname, + "entities_added": entities_added, + "relations_added": relations_added, + })) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/enrichers/extract_links/manifest.yaml b/enrichers/extract_links/manifest.yaml new file mode 100644 index 0000000..625f194 --- /dev/null +++ b/enrichers/extract_links/manifest.yaml @@ -0,0 +1,8 @@ +id: extract_links +name: "Extract links" +description: "Lee la markdown cacheada de un Webpage (metadata.markdown_path) y crea nodos Url para cada enlace encontrado, conectados con relacion LINKS_TO. Requiere haber ejecutado fetch_webpage antes." +applies_to: [Webpage] +emits: [Url] +relations: [LINKS_TO] +params: + - { name: max_links, type: int, default: 50 } diff --git a/enrichers/extract_links/run.py b/enrichers/extract_links/run.py new file mode 100755 index 0000000..6519b60 --- /dev/null +++ b/enrichers/extract_links/run.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +"""Enricher extract_links — issue 0028b. + +Lee la markdown cacheada de un Webpage (metadata.markdown_path), saca todas +las URLs unicas con `extract_urls_py_cybersecurity`, y crea/conecta un nodo +Url por cada URL nueva con relacion LINKS_TO desde el Webpage origen. +""" +from __future__ import annotations + +import json +import os +import sqlite3 +import sys +import time +from datetime import datetime, timezone + + +def progress(p: float, stage: str = "") -> None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def main() -> int: + ctx = json.loads(sys.stdin.read()) + node_id = ctx.get("node_id") or "" + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: metadata = json.loads(metadata) + except Exception: metadata = {} + ops_db = ctx.get("ops_db_path") or "" + app_dir = ctx.get("app_dir") or "" + registry_root = ctx.get("registry_root") or "" + params = ctx.get("params") or {} + max_links = int(params.get("max_links", 50)) + + if not node_id or not ops_db: + log("missing node_id / ops_db_path") + return 2 + + md_path = metadata.get("markdown_path") or "" + if not md_path: + log("nodo sin markdown_path — corre fetch_webpage primero") + print(json.dumps({"error": "missing markdown_path. Run fetch_webpage first.", + "entities_added": 0, "relations_added": 0})) + return 3 + + # Path relativo a app_dir. + abs_md = md_path if os.path.isabs(md_path) else os.path.join(app_dir, md_path) + if not os.path.exists(abs_md): + log(f"markdown not found at {abs_md}") + print(json.dumps({"error": f"markdown not found: {abs_md}", + "entities_added": 0, "relations_added": 0})) + return 4 + + progress(0.20, "reading") + text = open(abs_md, "r", encoding="utf-8", errors="replace").read() + + progress(0.45, "extracting") + py_funcs = os.path.join(registry_root, "python", "functions") + if py_funcs not in sys.path: + sys.path.insert(0, py_funcs) + from cybersecurity.cybersecurity import extract_urls # type: ignore + + urls = extract_urls(text) + # Dedup conservando orden. + seen = set() + unique = [] + for u in urls: + if u not in seen: + seen.add(u) + unique.append(u) + if max_links > 0: + unique = unique[:max_links] + + progress(0.65, "writing") + conn = sqlite3.connect(ops_db) + entities_added = 0 + relations_added = 0 + try: + for i, u in enumerate(unique): + existed = conn.execute( + "SELECT id FROM entities WHERE type_ref='Url' AND name=? LIMIT 1", + (u,), + ).fetchone() + if existed: + target_id = existed[0] + else: + target_id = f"Url_{now_ms()}_{i}" + ts = now_iso() + meta_json = json.dumps({"url": u}) + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, created_at, updated_at) " + "VALUES (?, ?, 'Url', 'enricher:extract_links', ?, ?, ?)", + (target_id, u, meta_json, ts, ts), + ) + entities_added += 1 + + rel_exists = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='LINKS_TO' LIMIT 1", + (node_id, target_id), + ).fetchone() + if not rel_exists: + ts = now_iso() + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) " + "VALUES (?, 'LINKS_TO', ?, ?, ?, ?)", + (f"rel_{now_ms()}_{i}_links_to", node_id, target_id, ts, ts), + ) + relations_added += 1 + if i % 10 == 0: + progress(0.65 + 0.30 * (i / max(1, len(unique))), "writing") + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "links_found": len(unique), + "entities_added": entities_added, + "relations_added": relations_added, + })) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/enrichers/extract_text_entities/manifest.yaml b/enrichers/extract_text_entities/manifest.yaml new file mode 100644 index 0000000..8afc75f --- /dev/null +++ b/enrichers/extract_text_entities/manifest.yaml @@ -0,0 +1,9 @@ +id: extract_text_entities +name: "Extract entities from text" +description: "Lee la markdown cacheada de un Webpage y extrae IoCs (IPs, emails, dominios, hashes, crypto wallets, CVEs, MAC, telefonos) creando entidades + relacion EXTRACTED_FROM. Sin coste — solo regex. Modelos ML (GLiNER/GLiREL) en futura iteracion." +applies_to: [Webpage] +emits: [Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone] +relations: [EXTRACTED_FROM] +params: + - { name: types, type: string, default: "" } + - { name: max_entities, type: int, default: 200 } diff --git a/enrichers/extract_text_entities/run.py b/enrichers/extract_text_entities/run.py new file mode 100755 index 0000000..1d7290d --- /dev/null +++ b/enrichers/extract_text_entities/run.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +"""Enricher extract_text_entities — issue 0028b. + +Lee la markdown cacheada de un Webpage (metadata.markdown_path) y corre el +pipeline puro `extract_iocs` (regex puro, sin coste, sin modelos ML). + +Para cada IoC encontrado: + - Crea o reusa la entidad por (type, name). + - Crea relacion EXTRACTED_FROM desde la entidad nueva al Webpage origen. + +Tipos soportados (mapeo IoC -> type_ref del registry): + email -> Email + ip_address -> IPAddress + domain -> Domain + file_hash -> FileHash + crypto_wallet -> CryptoWallet + cve_id -> CVE + mac_address -> MACAddress + phone_number -> Phone + +Futura iteracion: añadir GLiNER/GLiREL para Person/Org/Location etc. +""" +from __future__ import annotations + +import json +import os +import sqlite3 +import sys +import time +from datetime import datetime, timezone + + +_TYPE_MAP = { + "email": ("Email", "address"), + "ip_address": ("IPAddress", "address"), + "domain": ("Domain", "name"), + "file_hash": ("FileHash", "value"), + "crypto_wallet": ("CryptoWallet", "address"), + "cve_id": ("CVE", "id"), + "mac_address": ("MACAddress", "address"), + "phone_number": ("Phone", "number"), +} + + +def progress(p: float, stage: str = "") -> None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def main() -> int: + ctx = json.loads(sys.stdin.read()) + node_id = ctx.get("node_id") or "" + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: metadata = json.loads(metadata) + except Exception: metadata = {} + ops_db = ctx.get("ops_db_path") or "" + app_dir = ctx.get("app_dir") or "" + registry_root = ctx.get("registry_root") or "" + params = ctx.get("params") or {} + + types_csv = (params.get("types") or "").strip() + types_list = [t.strip() for t in types_csv.split(",") if t.strip()] if types_csv else None + max_entities = int(params.get("max_entities", 200)) + + if not node_id or not ops_db: + log("missing node_id / ops_db_path") + return 2 + + md_path = metadata.get("markdown_path") or "" + if not md_path: + log("nodo sin markdown_path — corre fetch_webpage primero") + print(json.dumps({"error": "missing markdown_path. Run fetch_webpage first.", + "entities_added": 0, "relations_added": 0})) + return 3 + + abs_md = md_path if os.path.isabs(md_path) else os.path.join(app_dir, md_path) + if not os.path.exists(abs_md): + log(f"markdown not found at {abs_md}") + print(json.dumps({"error": f"markdown not found: {abs_md}", + "entities_added": 0, "relations_added": 0})) + return 4 + + progress(0.10, "reading") + text = open(abs_md, "r", encoding="utf-8", errors="replace").read() + + progress(0.30, "extracting iocs") + py_funcs = os.path.join(registry_root, "python", "functions") + if py_funcs not in sys.path: + sys.path.insert(0, py_funcs) + from cybersecurity.extract_iocs import extract_iocs # type: ignore + + iocs = extract_iocs(text, types_list) + + # Dedup por (type, value). + seen = set() + unique = [] + for it in iocs: + t = it.get("type") + v = it.get("value") or it.get("address") or it.get("name") or "" + if not t or not v: + continue + key = (t, v) + if key in seen: + continue + seen.add(key) + unique.append(it) + if len(unique) >= max_entities: + break + + progress(0.55, "writing") + conn = sqlite3.connect(ops_db) + entities_added = 0 + relations_added = 0 + new_by_type: dict[str, int] = {} + try: + n = len(unique) + for i, it in enumerate(unique): + ioc_type = it.get("type") + value = it.get("value") or it.get("address") or it.get("name") or "" + if not value: + continue + type_ref, value_field = _TYPE_MAP.get(ioc_type, (ioc_type or "Text", "value")) + + existed = conn.execute( + "SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1", + (type_ref, value), + ).fetchone() + if existed: + target_id = existed[0] + else: + target_id = f"{type_ref}_{now_ms()}_{i}" + ts = now_iso() + meta = {value_field: value} + if "start" in it: meta["text_offset"] = it["start"] + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, created_at, updated_at) " + "VALUES (?, ?, ?, 'enricher:extract_text_entities', ?, ?, ?)", + (target_id, value, type_ref, json.dumps(meta), ts, ts), + ) + entities_added += 1 + new_by_type[type_ref] = new_by_type.get(type_ref, 0) + 1 + + rel_exists = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='EXTRACTED_FROM' LIMIT 1", + (target_id, node_id), + ).fetchone() + if not rel_exists: + ts = now_iso() + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) " + "VALUES (?, 'EXTRACTED_FROM', ?, ?, ?, ?)", + (f"rel_{now_ms()}_{i}_extracted", target_id, node_id, ts, ts), + ) + relations_added += 1 + + if i % 20 == 0 and n > 0: + progress(0.55 + 0.40 * (i / n), "writing") + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "iocs_found": len(unique), + "by_type": new_by_type, + "entities_added": entities_added, + "relations_added": relations_added, + })) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/enrichers/fetch_webpage/manifest.yaml b/enrichers/fetch_webpage/manifest.yaml new file mode 100644 index 0000000..d2a7535 --- /dev/null +++ b/enrichers/fetch_webpage/manifest.yaml @@ -0,0 +1,8 @@ +id: fetch_webpage +name: "Fetch web page" +description: "Descarga HTML de una URL, extrae markdown limpio (readabilipy) y guarda los blobs en cache. Crea/actualiza el nodo Webpage con title/status_code/paths y crea el Domain con relacion BELONGS_TO." +applies_to: [Url, Webpage] +emits: [Domain] +relations: [BELONGS_TO] +params: + - { name: timeout_s, type: int, default: 15 } diff --git a/enrichers/fetch_webpage/run.py b/enrichers/fetch_webpage/run.py new file mode 100755 index 0000000..b23d7d3 --- /dev/null +++ b/enrichers/fetch_webpage/run.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 +"""Enricher fetch_webpage — issue 0028. + +Lee JSON de stdin, descarga la URL del nodo, convierte HTML a markdown, +guarda blobs en `//.{html,md}`, actualiza el +nodo a tipo Webpage con metadata enriquecida y crea/conecta el Domain. + +Wire protocol (issue 0026): + - stdin: JSON con node_id, metadata, ops_db_path, app_dir, cache_dir, + registry_root, params. + - stderr: lineas `PROGRESS: ` para feedback de UI. + - stdout: una linea JSON al final con resumen `{entities_added, ...}`. + - exit code 0 = ok, !=0 = error (stderr capturado se muestra en panel). +""" +from __future__ import annotations + +import hashlib +import json +import os +import re +import sqlite3 +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from urllib.parse import urlparse + + +def progress(p: float, stage: str = "") -> None: + """Emite linea PROGRESS al stderr para que C++ actualice la UI.""" + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def load_registry_funcs(registry_root: str): + """Anade el registry al sys.path e importa funciones que usamos.""" + py_funcs = os.path.join(registry_root, "python", "functions") + if py_funcs not in sys.path: + sys.path.insert(0, py_funcs) + from cybersecurity.cybersecurity import normalize_url # type: ignore + from core.html_to_markdown import html_to_markdown # type: ignore + return normalize_url, html_to_markdown + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def fetch_with_requests(url: str, timeout: int): + """Descarga la URL y retorna (status_code, content_type, html, encoding). + + Usa `requests` si esta disponible, fallback a urllib. + """ + try: + import requests # type: ignore + headers = { + "User-Agent": ( + "Mozilla/5.0 (graph_explorer/0.1; " + "https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/graph_explorer) " + "Chrome/120 Safari/537.36" + ), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.5", + } + r = requests.get(url, timeout=timeout, headers=headers, allow_redirects=True) + ct = r.headers.get("Content-Type", "text/html") + # `requests` decodifica por encoding HTTP/charset; si falla cae a apparent. + return r.status_code, ct, r.text, r.encoding or "utf-8" + except ImportError: + from urllib.request import Request, urlopen + req = Request(url, headers={"User-Agent": "graph_explorer/0.1"}) + with urlopen(req, timeout=timeout) as resp: # type: ignore + data = resp.read() + ct = resp.headers.get("Content-Type", "text/html") + enc = "utf-8" + m = re.search(r"charset=([\w-]+)", ct, re.I) + if m: + enc = m.group(1).lower() + return resp.status, ct, data.decode(enc, errors="replace"), enc + + +_TITLE_RE = re.compile(r"]*>(.*?)", re.I | re.S) + + +def extract_title(html: str) -> str: + m = _TITLE_RE.search(html) + if not m: + return "" + title = re.sub(r"\s+", " ", m.group(1)).strip() + if len(title) > 300: + title = title[:300] + "…" + return title + + +def domain_of(url: str) -> str: + try: + host = urlparse(url).hostname or "" + return host.lower() + except Exception: + return "" + + +def cache_paths(cache_dir: str, key: str) -> tuple[Path, Path]: + """Devuelve (html_path, md_path) y crea el dir intermedio.""" + sub = key[:2] + base = Path(cache_dir) / sub + base.mkdir(parents=True, exist_ok=True) + return base / f"{key}.html", base / f"{key}.md" + + +def merge_metadata_json(existing: str, patch: dict) -> str: + """Fusiona patch sobre el JSON existente (string) y devuelve nuevo string.""" + try: + cur = json.loads(existing) if existing else {} + if not isinstance(cur, dict): + cur = {} + except Exception: + cur = {} + cur.update(patch) + return json.dumps(cur, ensure_ascii=False) + + +def upsert_domain(conn: sqlite3.Connection, name: str) -> str: + """Crea o reusa entidad Domain por nombre. Retorna su id.""" + cur = conn.execute( + "SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1", + (name,), + ) + row = cur.fetchone() + if row: + return row[0] + new_id = f"Domain_{now_ms()}" + ts = now_iso() + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) " + "VALUES (?, ?, 'Domain', 'enricher:fetch_webpage', ?, ?)", + (new_id, name, ts, ts), + ) + return new_id + + +def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool: + cur = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name=? LIMIT 1", + (from_id, to_id, name), + ) + return cur.fetchone() is not None + + +def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool: + if relation_exists(conn, from_id, to_id, name): + return False + ts = now_iso() + rel_id = f"rel_{now_ms()}_{name.lower()}" + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (rel_id, name, from_id, to_id, ts, ts), + ) + return True + + +def main() -> int: + raw = sys.stdin.read() + try: + ctx = json.loads(raw) + except Exception as e: + log(f"stdin not valid JSON: {e}") + return 2 + + node_id = ctx.get("node_id") or "" + node_type = ctx.get("node_type") or "" + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + ops_db_path = ctx.get("ops_db_path") or "" + cache_dir = ctx.get("cache_dir") or "" + registry_root = ctx.get("registry_root") or "" + params = ctx.get("params") or {} + timeout_s = int(params.get("timeout_s", 15)) + + if not node_id or not ops_db_path: + log("missing node_id / ops_db_path") + return 2 + + # URL puede estar en `url` (Url/Webpage) o `address` (Url legacy). + raw_url = (metadata.get("url") or metadata.get("address") or "").strip() + if not raw_url: + # Fallback: si el nodo no tiene url en metadata, mira el name. + raw_url = (ctx.get("node_name") or "").strip() + if not raw_url: + log("nodo sin url en metadata ni name") + return 2 + + progress(0.05, "normalize") + try: + normalize_url, html_to_markdown = load_registry_funcs(registry_root) + except Exception as e: + log(f"registry imports failed: {e}") + return 3 + + try: + url = normalize_url(raw_url) + except Exception: + url = raw_url + if not url.startswith(("http://", "https://")): + url = "https://" + url + + progress(0.20, "fetching") + try: + status, content_type, html, _enc = fetch_with_requests(url, timeout=timeout_s) + except Exception as e: + log(f"fetch failed: {e}") + # Marcamos node con status=-1 para evidencia. + conn = sqlite3.connect(ops_db_path) + try: + cur = conn.execute("SELECT metadata FROM entities WHERE id=?", (node_id,)) + row = cur.fetchone() + existing_meta = row[0] if row and row[0] else "{}" + patch = {"url": url, "fetched_at": now_iso(), "status_code": -1} + new_meta = merge_metadata_json(existing_meta, patch) + conn.execute( + "UPDATE entities SET metadata=?, updated_at=? WHERE id=?", + (new_meta, now_iso(), node_id), + ) + conn.commit() + finally: + conn.close() + print(json.dumps({"error": str(e), "url": url, "entities_added": 0, + "relations_added": 0})) + return 4 + + progress(0.55, "parsing") + try: + markdown = html_to_markdown(html) + except Exception as e: + log(f"html_to_markdown failed (will save raw): {e}") + markdown = "" + + title = extract_title(html) + text_length = len(markdown) if markdown else len(html) + + progress(0.80, "writing") + key = hashlib.sha256(url.encode("utf-8")).hexdigest() + html_path, md_path = cache_paths(cache_dir, key) + try: + html_path.write_text(html, encoding="utf-8", errors="replace") + if markdown: + md_path.write_text(markdown, encoding="utf-8") + except Exception as e: + log(f"cache write failed: {e}") + return 5 + + # Paths en metadata se guardan relativos al app_dir para portabilidad. + rel_html = os.path.relpath(html_path, ctx.get("app_dir") or cache_dir) + rel_md = os.path.relpath(md_path, ctx.get("app_dir") or cache_dir) + + progress(0.92, "applying") + conn = sqlite3.connect(ops_db_path) + conn.execute("PRAGMA foreign_keys=OFF") + entities_added = 0 + relations_added = 0 + node_updated = False + try: + # 1. Update del nodo: convertir Url -> Webpage si aplica + parche meta. + cur = conn.execute( + "SELECT type_ref, metadata FROM entities WHERE id=?", (node_id,) + ) + row = cur.fetchone() + if not row: + log(f"node {node_id} disappeared") + return 6 + cur_type, cur_meta = row[0], row[1] or "{}" + new_type = "Webpage" if cur_type.lower() == "url" else cur_type or "Webpage" + + patch = { + "url": url, + "title": title, + "status_code": status, + "content_type": content_type, + "fetched_at": now_iso(), + "html_path": rel_html, + "markdown_path": rel_md if markdown else "", + "text_length": text_length, + } + new_meta = merge_metadata_json(cur_meta, patch) + conn.execute( + "UPDATE entities SET type_ref=?, metadata=?, updated_at=? WHERE id=?", + (new_type, new_meta, now_iso(), node_id), + ) + node_updated = True + + # 2. Crear/conectar Domain. + dname = domain_of(url) + if dname: + existed_before = conn.execute( + "SELECT 1 FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1", + (dname,), + ).fetchone() is not None + domain_id = upsert_domain(conn, dname) + if not existed_before: + entities_added += 1 + if insert_relation(conn, node_id, domain_id, "BELONGS_TO"): + relations_added += 1 + + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "url": url, + "status_code": status, + "title": title, + "text_length": text_length, + "html_path": rel_html, + "markdown_path": rel_md if markdown else "", + "entities_added": entities_added, + "relations_added": relations_added, + "node_updated": node_updated, + }, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + sys.exit(main())