feat(enrichers): cuatro enrichers web — fetch + extract trio (issues 0028, 0028b)

Cada enricher es un par manifest.yaml + run.py en enrichers/<id>/.

1. fetch_webpage (Url, Webpage):
   HTTP GET (requests, fallback urllib) -> html_to_markdown_py_core ->
   sha256(url) -> guarda HTML+MD en cache/<aa>/<sha>.{html,md}. Convierte
   Url -> Webpage con metadata enriquecida (title/status_code/content_type/
   paths/text_length). Crea Domain con relacion BELONGS_TO.

2. extract_domain (Url, Webpage, Email):
   Saca dominio de metadata.url o metadata.address (sin I/O). Crea/conecta
   Domain con BELONGS_TO. Util cuando el usuario quiere ver el dominio
   antes de fetch.

3. extract_links (Webpage):
   Lee metadata.markdown_path -> extract_urls_py_cybersecurity -> dedup ->
   crea nodo Url por enlace + relacion LINKS_TO. Param max_links (50).

4. extract_text_entities (Webpage):
   Lee metadata.markdown_path -> extract_iocs_py_cybersecurity (regex puro,
   sin coste) -> crea entidades por (type, value) tipadas en el registro:
   Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone.
   Cada una con relacion EXTRACTED_FROM al Webpage origen. v1 sin GLiNER/
   GLiREL — esos requieren modelos pre-cargados (futura iteracion).

Probado end-to-end:
  fetch_webpage  https://httpbin.org/html -> 1 Webpage + 1 Domain
  extract_links  -> 2 Url + 2 LINKS_TO
  extract_text_entities -> 8 IoCs (Email, IP*2, CVE, Domain*2, Wallet, Phone)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-01 18:24:52 +02:00
parent 6df04652d8
commit 7ec6c4e09f
8 changed files with 821 additions and 0 deletions
+7
View File
@@ -0,0 +1,7 @@
id: extract_domain
name: "Extract domain"
description: "Saca el dominio de la url/email del nodo y crea/conecta una entidad Domain con relacion BELONGS_TO. No descarga nada."
applies_to: [Url, Webpage, Email]
emits: [Domain]
relations: [BELONGS_TO]
params: []
+125
View File
@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""Enricher extract_domain — issue 0028b.
Saca el dominio de un nodo Url/Webpage (campo metadata.url) o Email (campo
metadata.address) y crea/conecta una entidad Domain con relacion BELONGS_TO.
No hace I/O de red.
"""
from __future__ import annotations
import json
import sqlite3
import sys
import time
from datetime import datetime, timezone
from urllib.parse import urlparse
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def domain_from_url(url: str) -> str:
if not url:
return ""
if "://" not in url:
url = "https://" + url
try:
return (urlparse(url).hostname or "").lower()
except Exception:
return ""
def domain_from_email(addr: str) -> str:
if "@" not in addr:
return ""
return addr.split("@", 1)[1].strip().lower()
def main() -> int:
ctx = json.loads(sys.stdin.read())
node_id = ctx.get("node_id") or ""
node_type = (ctx.get("node_type") or "").lower()
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db = ctx.get("ops_db_path") or ""
if not node_id or not ops_db:
sys.stderr.write("missing node_id / ops_db_path\n")
return 2
progress(0.30, "extracting")
dname = ""
if node_type == "email":
addr = metadata.get("address") or ctx.get("node_name") or ""
dname = domain_from_email(addr)
else:
url = metadata.get("url") or ctx.get("node_name") or ""
dname = domain_from_url(url)
if not dname:
print(json.dumps({"warning": "no domain extractable",
"entities_added": 0, "relations_added": 0}))
return 0
progress(0.70, "writing")
conn = sqlite3.connect(ops_db)
entities_added = 0
relations_added = 0
try:
existed = conn.execute(
"SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1",
(dname,),
).fetchone()
if existed:
domain_id = existed[0]
else:
domain_id = f"Domain_{now_ms()}"
ts = now_iso()
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) "
"VALUES (?, ?, 'Domain', 'enricher:extract_domain', ?, ?)",
(domain_id, dname, ts, ts),
)
entities_added = 1
rel_exists = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='BELONGS_TO' LIMIT 1",
(node_id, domain_id),
).fetchone()
if not rel_exists:
ts = now_iso()
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, 'BELONGS_TO', ?, ?, ?, ?)",
(f"rel_{now_ms()}_belongs_to", node_id, domain_id, ts, ts),
)
relations_added = 1
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"domain": dname,
"entities_added": entities_added,
"relations_added": relations_added,
}))
return 0
if __name__ == "__main__":
sys.exit(main())
+8
View File
@@ -0,0 +1,8 @@
id: extract_links
name: "Extract links"
description: "Lee la markdown cacheada de un Webpage (metadata.markdown_path) y crea nodos Url para cada enlace encontrado, conectados con relacion LINKS_TO. Requiere haber ejecutado fetch_webpage antes."
applies_to: [Webpage]
emits: [Url]
relations: [LINKS_TO]
params:
- { name: max_links, type: int, default: 50 }
+139
View File
@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""Enricher extract_links — issue 0028b.
Lee la markdown cacheada de un Webpage (metadata.markdown_path), saca todas
las URLs unicas con `extract_urls_py_cybersecurity`, y crea/conecta un nodo
Url por cada URL nueva con relacion LINKS_TO desde el Webpage origen.
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import time
from datetime import datetime, timezone
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def main() -> int:
ctx = json.loads(sys.stdin.read())
node_id = ctx.get("node_id") or ""
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try: metadata = json.loads(metadata)
except Exception: metadata = {}
ops_db = ctx.get("ops_db_path") or ""
app_dir = ctx.get("app_dir") or ""
registry_root = ctx.get("registry_root") or ""
params = ctx.get("params") or {}
max_links = int(params.get("max_links", 50))
if not node_id or not ops_db:
log("missing node_id / ops_db_path")
return 2
md_path = metadata.get("markdown_path") or ""
if not md_path:
log("nodo sin markdown_path — corre fetch_webpage primero")
print(json.dumps({"error": "missing markdown_path. Run fetch_webpage first.",
"entities_added": 0, "relations_added": 0}))
return 3
# Path relativo a app_dir.
abs_md = md_path if os.path.isabs(md_path) else os.path.join(app_dir, md_path)
if not os.path.exists(abs_md):
log(f"markdown not found at {abs_md}")
print(json.dumps({"error": f"markdown not found: {abs_md}",
"entities_added": 0, "relations_added": 0}))
return 4
progress(0.20, "reading")
text = open(abs_md, "r", encoding="utf-8", errors="replace").read()
progress(0.45, "extracting")
py_funcs = os.path.join(registry_root, "python", "functions")
if py_funcs not in sys.path:
sys.path.insert(0, py_funcs)
from cybersecurity.cybersecurity import extract_urls # type: ignore
urls = extract_urls(text)
# Dedup conservando orden.
seen = set()
unique = []
for u in urls:
if u not in seen:
seen.add(u)
unique.append(u)
if max_links > 0:
unique = unique[:max_links]
progress(0.65, "writing")
conn = sqlite3.connect(ops_db)
entities_added = 0
relations_added = 0
try:
for i, u in enumerate(unique):
existed = conn.execute(
"SELECT id FROM entities WHERE type_ref='Url' AND name=? LIMIT 1",
(u,),
).fetchone()
if existed:
target_id = existed[0]
else:
target_id = f"Url_{now_ms()}_{i}"
ts = now_iso()
meta_json = json.dumps({"url": u})
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:extract_links', ?, ?, ?)",
(target_id, u, meta_json, ts, ts),
)
entities_added += 1
rel_exists = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='LINKS_TO' LIMIT 1",
(node_id, target_id),
).fetchone()
if not rel_exists:
ts = now_iso()
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, 'LINKS_TO', ?, ?, ?, ?)",
(f"rel_{now_ms()}_{i}_links_to", node_id, target_id, ts, ts),
)
relations_added += 1
if i % 10 == 0:
progress(0.65 + 0.30 * (i / max(1, len(unique))), "writing")
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"links_found": len(unique),
"entities_added": entities_added,
"relations_added": relations_added,
}))
return 0
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,9 @@
id: extract_text_entities
name: "Extract entities from text"
description: "Lee la markdown cacheada de un Webpage y extrae IoCs (IPs, emails, dominios, hashes, crypto wallets, CVEs, MAC, telefonos) creando entidades + relacion EXTRACTED_FROM. Sin coste — solo regex. Modelos ML (GLiNER/GLiREL) en futura iteracion."
applies_to: [Webpage]
emits: [Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone]
relations: [EXTRACTED_FROM]
params:
- { name: types, type: string, default: "" }
- { name: max_entities, type: int, default: 200 }
+187
View File
@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""Enricher extract_text_entities — issue 0028b.
Lee la markdown cacheada de un Webpage (metadata.markdown_path) y corre el
pipeline puro `extract_iocs` (regex puro, sin coste, sin modelos ML).
Para cada IoC encontrado:
- Crea o reusa la entidad por (type, name).
- Crea relacion EXTRACTED_FROM desde la entidad nueva al Webpage origen.
Tipos soportados (mapeo IoC -> type_ref del registry):
email -> Email
ip_address -> IPAddress
domain -> Domain
file_hash -> FileHash
crypto_wallet -> CryptoWallet
cve_id -> CVE
mac_address -> MACAddress
phone_number -> Phone
Futura iteracion: añadir GLiNER/GLiREL para Person/Org/Location etc.
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import time
from datetime import datetime, timezone
_TYPE_MAP = {
"email": ("Email", "address"),
"ip_address": ("IPAddress", "address"),
"domain": ("Domain", "name"),
"file_hash": ("FileHash", "value"),
"crypto_wallet": ("CryptoWallet", "address"),
"cve_id": ("CVE", "id"),
"mac_address": ("MACAddress", "address"),
"phone_number": ("Phone", "number"),
}
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def main() -> int:
ctx = json.loads(sys.stdin.read())
node_id = ctx.get("node_id") or ""
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try: metadata = json.loads(metadata)
except Exception: metadata = {}
ops_db = ctx.get("ops_db_path") or ""
app_dir = ctx.get("app_dir") or ""
registry_root = ctx.get("registry_root") or ""
params = ctx.get("params") or {}
types_csv = (params.get("types") or "").strip()
types_list = [t.strip() for t in types_csv.split(",") if t.strip()] if types_csv else None
max_entities = int(params.get("max_entities", 200))
if not node_id or not ops_db:
log("missing node_id / ops_db_path")
return 2
md_path = metadata.get("markdown_path") or ""
if not md_path:
log("nodo sin markdown_path — corre fetch_webpage primero")
print(json.dumps({"error": "missing markdown_path. Run fetch_webpage first.",
"entities_added": 0, "relations_added": 0}))
return 3
abs_md = md_path if os.path.isabs(md_path) else os.path.join(app_dir, md_path)
if not os.path.exists(abs_md):
log(f"markdown not found at {abs_md}")
print(json.dumps({"error": f"markdown not found: {abs_md}",
"entities_added": 0, "relations_added": 0}))
return 4
progress(0.10, "reading")
text = open(abs_md, "r", encoding="utf-8", errors="replace").read()
progress(0.30, "extracting iocs")
py_funcs = os.path.join(registry_root, "python", "functions")
if py_funcs not in sys.path:
sys.path.insert(0, py_funcs)
from cybersecurity.extract_iocs import extract_iocs # type: ignore
iocs = extract_iocs(text, types_list)
# Dedup por (type, value).
seen = set()
unique = []
for it in iocs:
t = it.get("type")
v = it.get("value") or it.get("address") or it.get("name") or ""
if not t or not v:
continue
key = (t, v)
if key in seen:
continue
seen.add(key)
unique.append(it)
if len(unique) >= max_entities:
break
progress(0.55, "writing")
conn = sqlite3.connect(ops_db)
entities_added = 0
relations_added = 0
new_by_type: dict[str, int] = {}
try:
n = len(unique)
for i, it in enumerate(unique):
ioc_type = it.get("type")
value = it.get("value") or it.get("address") or it.get("name") or ""
if not value:
continue
type_ref, value_field = _TYPE_MAP.get(ioc_type, (ioc_type or "Text", "value"))
existed = conn.execute(
"SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1",
(type_ref, value),
).fetchone()
if existed:
target_id = existed[0]
else:
target_id = f"{type_ref}_{now_ms()}_{i}"
ts = now_iso()
meta = {value_field: value}
if "start" in it: meta["text_offset"] = it["start"]
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, created_at, updated_at) "
"VALUES (?, ?, ?, 'enricher:extract_text_entities', ?, ?, ?)",
(target_id, value, type_ref, json.dumps(meta), ts, ts),
)
entities_added += 1
new_by_type[type_ref] = new_by_type.get(type_ref, 0) + 1
rel_exists = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name='EXTRACTED_FROM' LIMIT 1",
(target_id, node_id),
).fetchone()
if not rel_exists:
ts = now_iso()
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, 'EXTRACTED_FROM', ?, ?, ?, ?)",
(f"rel_{now_ms()}_{i}_extracted", target_id, node_id, ts, ts),
)
relations_added += 1
if i % 20 == 0 and n > 0:
progress(0.55 + 0.40 * (i / n), "writing")
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"iocs_found": len(unique),
"by_type": new_by_type,
"entities_added": entities_added,
"relations_added": relations_added,
}))
return 0
if __name__ == "__main__":
sys.exit(main())
+8
View File
@@ -0,0 +1,8 @@
id: fetch_webpage
name: "Fetch web page"
description: "Descarga HTML de una URL, extrae markdown limpio (readabilipy) y guarda los blobs en cache. Crea/actualiza el nodo Webpage con title/status_code/paths y crea el Domain con relacion BELONGS_TO."
applies_to: [Url, Webpage]
emits: [Domain]
relations: [BELONGS_TO]
params:
- { name: timeout_s, type: int, default: 15 }
+338
View File
@@ -0,0 +1,338 @@
#!/usr/bin/env python3
"""Enricher fetch_webpage — issue 0028.
Lee JSON de stdin, descarga la URL del nodo, convierte HTML a markdown,
guarda blobs en `<cache_dir>/<sha256[0:2]>/<sha256>.{html,md}`, actualiza el
nodo a tipo Webpage con metadata enriquecida y crea/conecta el Domain.
Wire protocol (issue 0026):
- stdin: JSON con node_id, metadata, ops_db_path, app_dir, cache_dir,
registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen `{entities_added, ...}`.
- exit code 0 = ok, !=0 = error (stderr capturado se muestra en panel).
"""
from __future__ import annotations
import hashlib
import json
import os
import re
import sqlite3
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
def progress(p: float, stage: str = "") -> None:
"""Emite linea PROGRESS al stderr para que C++ actualice la UI."""
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def load_registry_funcs(registry_root: str):
"""Anade el registry al sys.path e importa funciones que usamos."""
py_funcs = os.path.join(registry_root, "python", "functions")
if py_funcs not in sys.path:
sys.path.insert(0, py_funcs)
from cybersecurity.cybersecurity import normalize_url # type: ignore
from core.html_to_markdown import html_to_markdown # type: ignore
return normalize_url, html_to_markdown
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def fetch_with_requests(url: str, timeout: int):
"""Descarga la URL y retorna (status_code, content_type, html, encoding).
Usa `requests` si esta disponible, fallback a urllib.
"""
try:
import requests # type: ignore
headers = {
"User-Agent": (
"Mozilla/5.0 (graph_explorer/0.1; "
"https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/graph_explorer) "
"Chrome/120 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
r = requests.get(url, timeout=timeout, headers=headers, allow_redirects=True)
ct = r.headers.get("Content-Type", "text/html")
# `requests` decodifica por encoding HTTP/charset; si falla cae a apparent.
return r.status_code, ct, r.text, r.encoding or "utf-8"
except ImportError:
from urllib.request import Request, urlopen
req = Request(url, headers={"User-Agent": "graph_explorer/0.1"})
with urlopen(req, timeout=timeout) as resp: # type: ignore
data = resp.read()
ct = resp.headers.get("Content-Type", "text/html")
enc = "utf-8"
m = re.search(r"charset=([\w-]+)", ct, re.I)
if m:
enc = m.group(1).lower()
return resp.status, ct, data.decode(enc, errors="replace"), enc
_TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
def extract_title(html: str) -> str:
m = _TITLE_RE.search(html)
if not m:
return ""
title = re.sub(r"\s+", " ", m.group(1)).strip()
if len(title) > 300:
title = title[:300] + ""
return title
def domain_of(url: str) -> str:
try:
host = urlparse(url).hostname or ""
return host.lower()
except Exception:
return ""
def cache_paths(cache_dir: str, key: str) -> tuple[Path, Path]:
"""Devuelve (html_path, md_path) y crea el dir intermedio."""
sub = key[:2]
base = Path(cache_dir) / sub
base.mkdir(parents=True, exist_ok=True)
return base / f"{key}.html", base / f"{key}.md"
def merge_metadata_json(existing: str, patch: dict) -> str:
"""Fusiona patch sobre el JSON existente (string) y devuelve nuevo string."""
try:
cur = json.loads(existing) if existing else {}
if not isinstance(cur, dict):
cur = {}
except Exception:
cur = {}
cur.update(patch)
return json.dumps(cur, ensure_ascii=False)
def upsert_domain(conn: sqlite3.Connection, name: str) -> str:
"""Crea o reusa entidad Domain por nombre. Retorna su id."""
cur = conn.execute(
"SELECT id FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1",
(name,),
)
row = cur.fetchone()
if row:
return row[0]
new_id = f"Domain_{now_ms()}"
ts = now_iso()
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, created_at, updated_at) "
"VALUES (?, ?, 'Domain', 'enricher:fetch_webpage', ?, ?)",
(new_id, name, ts, ts),
)
return new_id
def relation_exists(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool:
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? AND name=? LIMIT 1",
(from_id, to_id, name),
)
return cur.fetchone() is not None
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool:
if relation_exists(conn, from_id, to_id, name):
return False
ts = now_iso()
rel_id = f"rel_{now_ms()}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_type = ctx.get("node_type") or ""
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db_path = ctx.get("ops_db_path") or ""
cache_dir = ctx.get("cache_dir") or ""
registry_root = ctx.get("registry_root") or ""
params = ctx.get("params") or {}
timeout_s = int(params.get("timeout_s", 15))
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
# URL puede estar en `url` (Url/Webpage) o `address` (Url legacy).
raw_url = (metadata.get("url") or metadata.get("address") or "").strip()
if not raw_url:
# Fallback: si el nodo no tiene url en metadata, mira el name.
raw_url = (ctx.get("node_name") or "").strip()
if not raw_url:
log("nodo sin url en metadata ni name")
return 2
progress(0.05, "normalize")
try:
normalize_url, html_to_markdown = load_registry_funcs(registry_root)
except Exception as e:
log(f"registry imports failed: {e}")
return 3
try:
url = normalize_url(raw_url)
except Exception:
url = raw_url
if not url.startswith(("http://", "https://")):
url = "https://" + url
progress(0.20, "fetching")
try:
status, content_type, html, _enc = fetch_with_requests(url, timeout=timeout_s)
except Exception as e:
log(f"fetch failed: {e}")
# Marcamos node con status=-1 para evidencia.
conn = sqlite3.connect(ops_db_path)
try:
cur = conn.execute("SELECT metadata FROM entities WHERE id=?", (node_id,))
row = cur.fetchone()
existing_meta = row[0] if row and row[0] else "{}"
patch = {"url": url, "fetched_at": now_iso(), "status_code": -1}
new_meta = merge_metadata_json(existing_meta, patch)
conn.execute(
"UPDATE entities SET metadata=?, updated_at=? WHERE id=?",
(new_meta, now_iso(), node_id),
)
conn.commit()
finally:
conn.close()
print(json.dumps({"error": str(e), "url": url, "entities_added": 0,
"relations_added": 0}))
return 4
progress(0.55, "parsing")
try:
markdown = html_to_markdown(html)
except Exception as e:
log(f"html_to_markdown failed (will save raw): {e}")
markdown = ""
title = extract_title(html)
text_length = len(markdown) if markdown else len(html)
progress(0.80, "writing")
key = hashlib.sha256(url.encode("utf-8")).hexdigest()
html_path, md_path = cache_paths(cache_dir, key)
try:
html_path.write_text(html, encoding="utf-8", errors="replace")
if markdown:
md_path.write_text(markdown, encoding="utf-8")
except Exception as e:
log(f"cache write failed: {e}")
return 5
# Paths en metadata se guardan relativos al app_dir para portabilidad.
rel_html = os.path.relpath(html_path, ctx.get("app_dir") or cache_dir)
rel_md = os.path.relpath(md_path, ctx.get("app_dir") or cache_dir)
progress(0.92, "applying")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
node_updated = False
try:
# 1. Update del nodo: convertir Url -> Webpage si aplica + parche meta.
cur = conn.execute(
"SELECT type_ref, metadata FROM entities WHERE id=?", (node_id,)
)
row = cur.fetchone()
if not row:
log(f"node {node_id} disappeared")
return 6
cur_type, cur_meta = row[0], row[1] or "{}"
new_type = "Webpage" if cur_type.lower() == "url" else cur_type or "Webpage"
patch = {
"url": url,
"title": title,
"status_code": status,
"content_type": content_type,
"fetched_at": now_iso(),
"html_path": rel_html,
"markdown_path": rel_md if markdown else "",
"text_length": text_length,
}
new_meta = merge_metadata_json(cur_meta, patch)
conn.execute(
"UPDATE entities SET type_ref=?, metadata=?, updated_at=? WHERE id=?",
(new_type, new_meta, now_iso(), node_id),
)
node_updated = True
# 2. Crear/conectar Domain.
dname = domain_of(url)
if dname:
existed_before = conn.execute(
"SELECT 1 FROM entities WHERE type_ref='Domain' AND name=? LIMIT 1",
(dname,),
).fetchone() is not None
domain_id = upsert_domain(conn, dname)
if not existed_before:
entities_added += 1
if insert_relation(conn, node_id, domain_id, "BELONGS_TO"):
relations_added += 1
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"url": url,
"status_code": status,
"title": title,
"text_length": text_length,
"html_path": rel_html,
"markdown_path": rel_md if markdown else "",
"entities_added": entities_added,
"relations_added": relations_added,
"node_updated": node_updated,
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())