diff --git a/enrichers/web_search/run.py b/enrichers/web_search/run.py index 3f4cbca..45fef87 100755 --- a/enrichers/web_search/run.py +++ b/enrichers/web_search/run.py @@ -32,11 +32,24 @@ import re import sqlite3 import sys import time +import uuid from datetime import datetime, timezone from html.parser import HTMLParser from urllib.parse import parse_qs, unquote, urlparse +# Issue 0035c — agrupacion automatica de resultados. +# +# Cuando un enricher produce >= GROUP_THRESHOLD resultados, los primeros +# GROUP_PREVIEW_K quedan sueltos colgando del source (estilo +# Twitter/Reddit timeline) y los N-K restantes entran en un nodo Group +# cuadrado. El manifest puede declarar `auto_group_threshold` para +# overridear el default; mas adelante settings UI permitira override +# global. Por ahora esta hardcoded. +DEFAULT_GROUP_THRESHOLD = 50 +GROUP_PREVIEW_K = 10 + + def progress(p: float, stage: str = "") -> None: sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") sys.stderr.flush() @@ -350,9 +363,34 @@ def find_url_entity(conn: sqlite3.Connection, url: str) -> str | None: return None +def has_group_id_column(conn: sqlite3.Connection) -> bool: + """Detecta si la columna `group_id` existe en `entities`. + + El proyecto graph_explorer la añade via migracion (issue 0035a), + pero podriamos correr contra una BD vieja. Si no esta, insertamos + sin esa columna (resultados sueltos pero con `batch_id` en metadata). + """ + try: + cur = conn.execute("PRAGMA table_info(entities)") + for row in cur: + if row[1] == "group_id": + return True + except sqlite3.Error: + pass + return False + + def insert_url_entity(conn: sqlite3.Connection, url: str, title: str, - snippet: str, rank: int, query: str) -> str: - """Crea un nodo Url y devuelve su id. Si ya existe, lo reusa y refresca.""" + snippet: str, rank: int, query: str, + batch_id: str = "", + group_id: str | None = None, + has_group_col: bool = False) -> str: + """Crea un nodo Url y devuelve su id. Si ya existe, lo reusa y refresca. + + `batch_id` se inyecta en metadata si no esta vacio. `group_id` se + escribe en la columna homonima cuando existe en el schema y se ha + pasado un valor; si no, queda NULL (nodo suelto). + """ existing = find_url_entity(conn, url) ts = now_iso() meta = { @@ -364,8 +402,14 @@ def insert_url_entity(conn: sqlite3.Connection, url: str, title: str, "engine": "duckduckgo", "found_at": ts, } + if batch_id: + meta["batch_id"] = batch_id meta_json = json.dumps(meta, ensure_ascii=False) if existing: + # Si la entidad ya existia, mantenemos su group_id actual (no + # lo machacamos): un mismo Url puede aparecer en multiples + # busquedas y el primer Group que lo capturo gana. Solo + # actualizamos metadata + updated_at. conn.execute( "UPDATE entities SET metadata=?, updated_at=? WHERE id=?", (meta_json, ts, existing), @@ -374,10 +418,43 @@ def insert_url_entity(conn: sqlite3.Connection, url: str, title: str, new_id = f"Url_{now_ms()}_{rank}_{abs(hash(url)) % 100000}" name = title[:200] if title else url[:200] + if has_group_col: + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " group_id, created_at, updated_at) " + "VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?, ?)", + (new_id, name, meta_json, group_id, ts, ts), + ) + else: + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " created_at, updated_at) " + "VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?)", + (new_id, name, meta_json, ts, ts), + ) + return new_id + + +def insert_group_entity(conn: sqlite3.Connection, *, query: str, + count: int, batch_id: str) -> str: + """Crea un nodo Group para los resultados restantes de una busqueda. + + Devuelve el id del Group recien creado. + """ + ts = now_iso() + new_id = f"Group_{now_ms()}_{abs(hash(query + batch_id)) % 100000}" + name = f"web_search: {query} ({count})" + meta = { + "enricher": "web_search", + "query": query, + "count": count, + "batch_id": batch_id, + } + meta_json = json.dumps(meta, ensure_ascii=False) conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " created_at, updated_at) " - "VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?)", + "VALUES (?, ?, 'Group', 'enricher:web_search', ?, ?, ?)", (new_id, name, meta_json, ts, ts), ) return new_id @@ -537,8 +614,31 @@ def main() -> int: conn.execute("PRAGMA foreign_keys=OFF") entities_added = 0 relations_added = 0 + group_id: str | None = None + batch_id = uuid.uuid4().hex try: - for r in results: + has_group_col = has_group_id_column(conn) + n_total = len(results) + # Threshold: por ahora hardcoded; la lectura del manifest + # vendra en 0035e (settings UI / overrides por enricher). + threshold = DEFAULT_GROUP_THRESHOLD + + if n_total >= threshold and has_group_col: + # Modo Twitter/Reddit: K sueltos + Group con N-K hijos. + group_id = insert_group_entity( + conn, query=query, count=n_total, batch_id=batch_id, + ) + entities_added += 1 + if insert_relation(conn, group_id, node_id, "SEARCH_RESULT_OF"): + relations_added += 1 + preview = results[:GROUP_PREVIEW_K] + grouped = results[GROUP_PREVIEW_K:] + else: + # Comportamiento clasico: todo suelto, sin Group. + preview = results + grouped = [] + + for r in preview: existed = find_url_entity(conn, r["url"]) is not None url_id = insert_url_entity( conn, @@ -547,11 +647,34 @@ def main() -> int: snippet=r["snippet"], rank=r["rank"], query=query, + batch_id=batch_id, + group_id=None, + has_group_col=has_group_col, ) if not existed: entities_added += 1 if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"): relations_added += 1 + + for r in grouped: + existed = find_url_entity(conn, r["url"]) is not None + url_id = insert_url_entity( + conn, + url=r["url"], + title=r["title"], + snippet=r["snippet"], + rank=r["rank"], + query=query, + batch_id=batch_id, + group_id=group_id, + has_group_col=has_group_col, + ) + if not existed: + entities_added += 1 + # La procedencia es la relacion al source original, no al + # grupo — el grupo es solo un contenedor visual. + if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"): + relations_added += 1 conn.commit() finally: conn.close() @@ -563,6 +686,9 @@ def main() -> int: "results": len(results), "entities_added": entities_added, "relations_added": relations_added, + "batch_id": batch_id, + "group_id": group_id or "", + "grouped": bool(group_id), }, ensure_ascii=False)) return 0 diff --git a/tests/conftest.py b/tests/conftest.py index 8d3ec5a..5effcf4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -139,6 +139,7 @@ CREATE TABLE entities ( source TEXT NOT NULL, metadata TEXT NOT NULL DEFAULT '{}', notes TEXT NOT NULL DEFAULT '', + group_id TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); @@ -229,11 +230,11 @@ def list_entities(ops_db: Path, type_ref: str | None = None) -> list[dict]: try: if type_ref: cur = conn.execute( - "SELECT id, name, type_ref, source, metadata " + "SELECT id, name, type_ref, source, metadata, group_id " "FROM entities WHERE type_ref=? ORDER BY id", (type_ref,)) else: cur = conn.execute( - "SELECT id, name, type_ref, source, metadata " + "SELECT id, name, type_ref, source, metadata, group_id " "FROM entities ORDER BY id") rows = cur.fetchall() finally: @@ -245,7 +246,7 @@ def list_entities(ops_db: Path, type_ref: str | None = None) -> list[dict]: except Exception: md = {} out.append({"id": r[0], "name": r[1], "type_ref": r[2], - "source": r[3], "metadata": md}) + "source": r[3], "metadata": md, "group_id": r[5]}) return out diff --git a/tests/test_web_search.py b/tests/test_web_search.py index 778098e..c0b9174 100644 --- a/tests/test_web_search.py +++ b/tests/test_web_search.py @@ -95,3 +95,144 @@ def test_web_search_no_query_fails_clean(ops_db, app_dir, registry_root, rc, out, err = run_enricher("web_search", ctx, env=env) assert rc == 2 assert "sin query" in err + + +# --------------------------------------------------------------------------- +# 0035c — agrupacion automatica cuando se excede el threshold +# --------------------------------------------------------------------------- + +def _build_lite_html(n: int) -> str: + """Genera un HTML estilo lite.duckduckgo.com con N resultados. + + Estructura minima que `_DDGLiteParser` consume: por cada resultado + un anchor con `class='result-link'` y un `` + con el snippet inmediatamente despues. URLs unicas para evitar la + deduplicacion del parser. + """ + rows = [] + for i in range(n): + url = f"https://example.com/result-{i:03d}" + rows.append( + f"" + f"Result {i:03d}" + f"snippet for result {i:03d}" + ) + return ( + "" + + "".join(rows) + + "" + ) + + +def test_web_search_below_threshold_no_group(ops_db, app_dir, registry_root, + tmp_path): + """5 resultados < threshold → ningun Group, comportamiento clasico.""" + make_node(ops_db, node_id="t1", name="tomate", + type_ref="text", metadata={}) + plan = { + "match": [ + {"contains": "duckduckgo.com", + "text": _build_lite_html(5), + "status": 200}, + ], + } + env = stub_requests(tmp_path, plan) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="tomate", node_type="text", + params={"limit": 0}) # 0 = sin truncar + rc, out, err = run_enricher("web_search", ctx, env=env) + assert rc == 0, err + assert out["results"] == 5 + assert out["grouped"] is False + assert out["group_id"] == "" + + # Sin Group creado. + groups = list_entities(ops_db, type_ref="Group") + assert groups == [] + # 5 Urls sueltos. + urls = list_entities(ops_db, type_ref="Url") + assert len(urls) == 5 + assert all(u["group_id"] is None for u in urls) + # Todos con la misma batch_id en metadata. + batch_ids = {u["metadata"].get("batch_id") for u in urls} + assert len(batch_ids) == 1 and "" not in batch_ids + + +def test_web_search_above_threshold_creates_group_and_preview( + ops_db, app_dir, registry_root, tmp_path): + """100 resultados → 1 Group + 10 sueltos + 90 con group_id.""" + make_node(ops_db, node_id="t1", name="tomate", + type_ref="text", metadata={}) + plan = { + "match": [ + {"contains": "duckduckgo.com", + "text": _build_lite_html(100), + "status": 200}, + ], + } + env = stub_requests(tmp_path, plan) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="tomate", node_type="text", + params={"limit": 0}) + rc, out, err = run_enricher("web_search", ctx, env=env) + assert rc == 0, err + assert out["results"] == 100 + assert out["grouped"] is True + assert out["group_id"] + + # Exactamente 1 Group. + groups = list_entities(ops_db, type_ref="Group") + assert len(groups) == 1 + g = groups[0] + assert g["metadata"]["count"] == 100 + assert g["metadata"]["query"] == "tomate" + assert g["metadata"]["enricher"] == "web_search" + assert g["metadata"].get("batch_id") + + # 10 Urls sin group_id (preview) + 90 con group_id. + urls = list_entities(ops_db, type_ref="Url") + assert len(urls) == 100 + sueltos = [u for u in urls if u["group_id"] is None] + children = [u for u in urls if u["group_id"] == g["id"]] + assert len(sueltos) == 10 + assert len(children) == 90 + + # Todos los Urls con relacion SEARCH_RESULT_OF al source original. + rels = list_relations(ops_db, name="SEARCH_RESULT_OF") + # 100 Urls + 1 Group = 101 relaciones al source. + from_to_t1 = [r for r in rels if r["to_entity"] == "t1"] + assert len(from_to_t1) == 101 + # Group → t1. + assert any(r["from_entity"] == g["id"] for r in from_to_t1) + + +def test_web_search_batch_id_shared_across_outputs( + ops_db, app_dir, registry_root, tmp_path): + """Tras un run con grouping, group + preview + hijos comparten batch_id.""" + make_node(ops_db, node_id="t1", name="tomate", + type_ref="text", metadata={}) + plan = { + "match": [ + {"contains": "duckduckgo.com", + "text": _build_lite_html(100), + "status": 200}, + ], + } + env = stub_requests(tmp_path, plan) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="tomate", node_type="text", + params={"limit": 0}) + rc, out, err = run_enricher("web_search", ctx, env=env) + assert rc == 0, err + + expected_batch = out["batch_id"] + assert expected_batch + + groups = list_entities(ops_db, type_ref="Group") + urls = list_entities(ops_db, type_ref="Url") + all_nodes = groups + urls + assert len(all_nodes) == 101 # 1 Group + 100 Urls + + batch_ids = {n["metadata"].get("batch_id") for n in all_nodes} + assert batch_ids == {expected_batch}, ( + f"batch_ids inconsistentes: {batch_ids}")