feat(0035c): web_search crea Group cuando excede umbral

Cuando un enricher web_search produce >= 50 resultados, los primeros 10
quedan sueltos colgando del source (preview Twitter/Reddit) y los
restantes entran como hijos de un nuevo nodo Group cuadrado.

Cambios:
- enrichers/web_search/run.py:
  - DEFAULT_GROUP_THRESHOLD=50, GROUP_PREVIEW_K=10 (constantes globales).
  - has_group_id_column(): detecta si el schema soporta agrupacion.
  - insert_group_entity(): crea nodo Group con metadata
    {enricher, query, count, batch_id}.
  - insert_url_entity() acepta batch_id y group_id; los inyecta en
    metadata/columna respectivamente. Nodos existentes mantienen su
    group_id actual (no se machaca).
  - Generacion de batch_id (UUID4 hex) por ejecucion, compartido por
    todos los nodos creados (group + sueltos + agrupados).
  - Cada hijo del grupo conserva su relacion individual SEARCH_RESULT_OF
    al source original — la procedencia es la relacion real, no el
    contenedor.
  - El JSON de salida añade batch_id, group_id, grouped.

- tests/conftest.py: añade columna entities.group_id al SCHEMA_SQL y
  expone group_id en list_entities() para que los tests lo verifiquen.

- tests/test_web_search.py: 3 tests nuevos
  - below_threshold_no_group: 5 resultados → 0 Groups, comportamiento clasico.
  - above_threshold_creates_group_and_preview: 100 resultados → 1 Group +
    10 sueltos + 90 con group_id, todos con SEARCH_RESULT_OF al source.
  - batch_id_shared_across_outputs: group + preview + hijos comparten
    batch_id.
  - _build_lite_html() genera HTML sintetico con N resultados sin
    necesidad de fixture estatico grande.

Tests: 35 passed (32 previos + 3 nuevos) en WSL.
       24 passed + 11 skipped en Windows.

Refs: issues/0035c-web-search-creates-groups.md
This commit is contained in:
2026-05-03 14:52:29 +02:00
parent 784b56ba10
commit 67f10a8afd
3 changed files with 275 additions and 7 deletions
+130 -4
View File
@@ -32,11 +32,24 @@ import re
import sqlite3 import sqlite3
import sys import sys
import time import time
import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from html.parser import HTMLParser from html.parser import HTMLParser
from urllib.parse import parse_qs, unquote, urlparse from urllib.parse import parse_qs, unquote, urlparse
# Issue 0035c — agrupacion automatica de resultados.
#
# Cuando un enricher produce >= GROUP_THRESHOLD resultados, los primeros
# GROUP_PREVIEW_K quedan sueltos colgando del source (estilo
# Twitter/Reddit timeline) y los N-K restantes entran en un nodo Group
# cuadrado. El manifest puede declarar `auto_group_threshold` para
# overridear el default; mas adelante settings UI permitira override
# global. Por ahora esta hardcoded.
DEFAULT_GROUP_THRESHOLD = 50
GROUP_PREVIEW_K = 10
def progress(p: float, stage: str = "") -> None: def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush() sys.stderr.flush()
@@ -350,9 +363,34 @@ def find_url_entity(conn: sqlite3.Connection, url: str) -> str | None:
return None return None
def has_group_id_column(conn: sqlite3.Connection) -> bool:
"""Detecta si la columna `group_id` existe en `entities`.
El proyecto graph_explorer la añade via migracion (issue 0035a),
pero podriamos correr contra una BD vieja. Si no esta, insertamos
sin esa columna (resultados sueltos pero con `batch_id` en metadata).
"""
try:
cur = conn.execute("PRAGMA table_info(entities)")
for row in cur:
if row[1] == "group_id":
return True
except sqlite3.Error:
pass
return False
def insert_url_entity(conn: sqlite3.Connection, url: str, title: str, def insert_url_entity(conn: sqlite3.Connection, url: str, title: str,
snippet: str, rank: int, query: str) -> str: snippet: str, rank: int, query: str,
"""Crea un nodo Url y devuelve su id. Si ya existe, lo reusa y refresca.""" batch_id: str = "",
group_id: str | None = None,
has_group_col: bool = False) -> str:
"""Crea un nodo Url y devuelve su id. Si ya existe, lo reusa y refresca.
`batch_id` se inyecta en metadata si no esta vacio. `group_id` se
escribe en la columna homonima cuando existe en el schema y se ha
pasado un valor; si no, queda NULL (nodo suelto).
"""
existing = find_url_entity(conn, url) existing = find_url_entity(conn, url)
ts = now_iso() ts = now_iso()
meta = { meta = {
@@ -364,8 +402,14 @@ def insert_url_entity(conn: sqlite3.Connection, url: str, title: str,
"engine": "duckduckgo", "engine": "duckduckgo",
"found_at": ts, "found_at": ts,
} }
if batch_id:
meta["batch_id"] = batch_id
meta_json = json.dumps(meta, ensure_ascii=False) meta_json = json.dumps(meta, ensure_ascii=False)
if existing: if existing:
# Si la entidad ya existia, mantenemos su group_id actual (no
# lo machacamos): un mismo Url puede aparecer en multiples
# busquedas y el primer Group que lo capturo gana. Solo
# actualizamos metadata + updated_at.
conn.execute( conn.execute(
"UPDATE entities SET metadata=?, updated_at=? WHERE id=?", "UPDATE entities SET metadata=?, updated_at=? WHERE id=?",
(meta_json, ts, existing), (meta_json, ts, existing),
@@ -374,10 +418,43 @@ def insert_url_entity(conn: sqlite3.Connection, url: str, title: str,
new_id = f"Url_{now_ms()}_{rank}_{abs(hash(url)) % 100000}" new_id = f"Url_{now_ms()}_{rank}_{abs(hash(url)) % 100000}"
name = title[:200] if title else url[:200] name = title[:200] if title else url[:200]
if has_group_col:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" group_id, created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?, ?)",
(new_id, name, meta_json, group_id, ts, ts),
)
else:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
def insert_group_entity(conn: sqlite3.Connection, *, query: str,
count: int, batch_id: str) -> str:
"""Crea un nodo Group para los resultados restantes de una busqueda.
Devuelve el id del Group recien creado.
"""
ts = now_iso()
new_id = f"Group_{now_ms()}_{abs(hash(query + batch_id)) % 100000}"
name = f"web_search: {query} ({count})"
meta = {
"enricher": "web_search",
"query": query,
"count": count,
"batch_id": batch_id,
}
meta_json = json.dumps(meta, ensure_ascii=False)
conn.execute( conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, " "INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) " " created_at, updated_at) "
"VALUES (?, ?, 'Url', 'enricher:web_search', ?, ?, ?)", "VALUES (?, ?, 'Group', 'enricher:web_search', ?, ?, ?)",
(new_id, name, meta_json, ts, ts), (new_id, name, meta_json, ts, ts),
) )
return new_id return new_id
@@ -537,8 +614,31 @@ def main() -> int:
conn.execute("PRAGMA foreign_keys=OFF") conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0 entities_added = 0
relations_added = 0 relations_added = 0
group_id: str | None = None
batch_id = uuid.uuid4().hex
try: try:
for r in results: has_group_col = has_group_id_column(conn)
n_total = len(results)
# Threshold: por ahora hardcoded; la lectura del manifest
# vendra en 0035e (settings UI / overrides por enricher).
threshold = DEFAULT_GROUP_THRESHOLD
if n_total >= threshold and has_group_col:
# Modo Twitter/Reddit: K sueltos + Group con N-K hijos.
group_id = insert_group_entity(
conn, query=query, count=n_total, batch_id=batch_id,
)
entities_added += 1
if insert_relation(conn, group_id, node_id, "SEARCH_RESULT_OF"):
relations_added += 1
preview = results[:GROUP_PREVIEW_K]
grouped = results[GROUP_PREVIEW_K:]
else:
# Comportamiento clasico: todo suelto, sin Group.
preview = results
grouped = []
for r in preview:
existed = find_url_entity(conn, r["url"]) is not None existed = find_url_entity(conn, r["url"]) is not None
url_id = insert_url_entity( url_id = insert_url_entity(
conn, conn,
@@ -547,11 +647,34 @@ def main() -> int:
snippet=r["snippet"], snippet=r["snippet"],
rank=r["rank"], rank=r["rank"],
query=query, query=query,
batch_id=batch_id,
group_id=None,
has_group_col=has_group_col,
) )
if not existed: if not existed:
entities_added += 1 entities_added += 1
if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"): if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"):
relations_added += 1 relations_added += 1
for r in grouped:
existed = find_url_entity(conn, r["url"]) is not None
url_id = insert_url_entity(
conn,
url=r["url"],
title=r["title"],
snippet=r["snippet"],
rank=r["rank"],
query=query,
batch_id=batch_id,
group_id=group_id,
has_group_col=has_group_col,
)
if not existed:
entities_added += 1
# La procedencia es la relacion al source original, no al
# grupo — el grupo es solo un contenedor visual.
if insert_relation(conn, url_id, node_id, "SEARCH_RESULT_OF"):
relations_added += 1
conn.commit() conn.commit()
finally: finally:
conn.close() conn.close()
@@ -563,6 +686,9 @@ def main() -> int:
"results": len(results), "results": len(results),
"entities_added": entities_added, "entities_added": entities_added,
"relations_added": relations_added, "relations_added": relations_added,
"batch_id": batch_id,
"group_id": group_id or "",
"grouped": bool(group_id),
}, ensure_ascii=False)) }, ensure_ascii=False))
return 0 return 0
+4 -3
View File
@@ -139,6 +139,7 @@ CREATE TABLE entities (
source TEXT NOT NULL, source TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}', metadata TEXT NOT NULL DEFAULT '{}',
notes TEXT NOT NULL DEFAULT '', notes TEXT NOT NULL DEFAULT '',
group_id TEXT,
created_at TEXT NOT NULL, created_at TEXT NOT NULL,
updated_at TEXT NOT NULL updated_at TEXT NOT NULL
); );
@@ -229,11 +230,11 @@ def list_entities(ops_db: Path, type_ref: str | None = None) -> list[dict]:
try: try:
if type_ref: if type_ref:
cur = conn.execute( cur = conn.execute(
"SELECT id, name, type_ref, source, metadata " "SELECT id, name, type_ref, source, metadata, group_id "
"FROM entities WHERE type_ref=? ORDER BY id", (type_ref,)) "FROM entities WHERE type_ref=? ORDER BY id", (type_ref,))
else: else:
cur = conn.execute( cur = conn.execute(
"SELECT id, name, type_ref, source, metadata " "SELECT id, name, type_ref, source, metadata, group_id "
"FROM entities ORDER BY id") "FROM entities ORDER BY id")
rows = cur.fetchall() rows = cur.fetchall()
finally: finally:
@@ -245,7 +246,7 @@ def list_entities(ops_db: Path, type_ref: str | None = None) -> list[dict]:
except Exception: except Exception:
md = {} md = {}
out.append({"id": r[0], "name": r[1], "type_ref": r[2], out.append({"id": r[0], "name": r[1], "type_ref": r[2],
"source": r[3], "metadata": md}) "source": r[3], "metadata": md, "group_id": r[5]})
return out return out
+141
View File
@@ -95,3 +95,144 @@ def test_web_search_no_query_fails_clean(ops_db, app_dir, registry_root,
rc, out, err = run_enricher("web_search", ctx, env=env) rc, out, err = run_enricher("web_search", ctx, env=env)
assert rc == 2 assert rc == 2
assert "sin query" in err assert "sin query" in err
# ---------------------------------------------------------------------------
# 0035c — agrupacion automatica cuando se excede el threshold
# ---------------------------------------------------------------------------
def _build_lite_html(n: int) -> str:
"""Genera un HTML estilo lite.duckduckgo.com con N resultados.
Estructura minima que `_DDGLiteParser` consume: por cada resultado
un anchor con `class='result-link'` y un `<td class='result-snippet'>`
con el snippet inmediatamente despues. URLs unicas para evitar la
deduplicacion del parser.
"""
rows = []
for i in range(n):
url = f"https://example.com/result-{i:03d}"
rows.append(
f"<a rel='nofollow' href='{url}' class='result-link'>"
f"Result {i:03d}</a>"
f"<td class='result-snippet'>snippet for result {i:03d}</td>"
)
return (
"<!DOCTYPE html><html><body>"
+ "".join(rows)
+ "</body></html>"
)
def test_web_search_below_threshold_no_group(ops_db, app_dir, registry_root,
tmp_path):
"""5 resultados < threshold → ningun Group, comportamiento clasico."""
make_node(ops_db, node_id="t1", name="tomate",
type_ref="text", metadata={})
plan = {
"match": [
{"contains": "duckduckgo.com",
"text": _build_lite_html(5),
"status": 200},
],
}
env = stub_requests(tmp_path, plan)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="tomate", node_type="text",
params={"limit": 0}) # 0 = sin truncar
rc, out, err = run_enricher("web_search", ctx, env=env)
assert rc == 0, err
assert out["results"] == 5
assert out["grouped"] is False
assert out["group_id"] == ""
# Sin Group creado.
groups = list_entities(ops_db, type_ref="Group")
assert groups == []
# 5 Urls sueltos.
urls = list_entities(ops_db, type_ref="Url")
assert len(urls) == 5
assert all(u["group_id"] is None for u in urls)
# Todos con la misma batch_id en metadata.
batch_ids = {u["metadata"].get("batch_id") for u in urls}
assert len(batch_ids) == 1 and "" not in batch_ids
def test_web_search_above_threshold_creates_group_and_preview(
ops_db, app_dir, registry_root, tmp_path):
"""100 resultados → 1 Group + 10 sueltos + 90 con group_id."""
make_node(ops_db, node_id="t1", name="tomate",
type_ref="text", metadata={})
plan = {
"match": [
{"contains": "duckduckgo.com",
"text": _build_lite_html(100),
"status": 200},
],
}
env = stub_requests(tmp_path, plan)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="tomate", node_type="text",
params={"limit": 0})
rc, out, err = run_enricher("web_search", ctx, env=env)
assert rc == 0, err
assert out["results"] == 100
assert out["grouped"] is True
assert out["group_id"]
# Exactamente 1 Group.
groups = list_entities(ops_db, type_ref="Group")
assert len(groups) == 1
g = groups[0]
assert g["metadata"]["count"] == 100
assert g["metadata"]["query"] == "tomate"
assert g["metadata"]["enricher"] == "web_search"
assert g["metadata"].get("batch_id")
# 10 Urls sin group_id (preview) + 90 con group_id.
urls = list_entities(ops_db, type_ref="Url")
assert len(urls) == 100
sueltos = [u for u in urls if u["group_id"] is None]
children = [u for u in urls if u["group_id"] == g["id"]]
assert len(sueltos) == 10
assert len(children) == 90
# Todos los Urls con relacion SEARCH_RESULT_OF al source original.
rels = list_relations(ops_db, name="SEARCH_RESULT_OF")
# 100 Urls + 1 Group = 101 relaciones al source.
from_to_t1 = [r for r in rels if r["to_entity"] == "t1"]
assert len(from_to_t1) == 101
# Group → t1.
assert any(r["from_entity"] == g["id"] for r in from_to_t1)
def test_web_search_batch_id_shared_across_outputs(
ops_db, app_dir, registry_root, tmp_path):
"""Tras un run con grouping, group + preview + hijos comparten batch_id."""
make_node(ops_db, node_id="t1", name="tomate",
type_ref="text", metadata={})
plan = {
"match": [
{"contains": "duckduckgo.com",
"text": _build_lite_html(100),
"status": 200},
],
}
env = stub_requests(tmp_path, plan)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="tomate", node_type="text",
params={"limit": 0})
rc, out, err = run_enricher("web_search", ctx, env=env)
assert rc == 0, err
expected_batch = out["batch_id"]
assert expected_batch
groups = list_entities(ops_db, type_ref="Group")
urls = list_entities(ops_db, type_ref="Url")
all_nodes = groups + urls
assert len(all_nodes) == 101 # 1 Group + 100 Urls
batch_ids = {n["metadata"].get("batch_id") for n in all_nodes}
assert batch_ids == {expected_batch}, (
f"batch_ids inconsistentes: {batch_ids}")