"""Pipeline E2E: text -> entities + relations + graph nodes/edges. Compone las funciones del registry: - chunk_with_overlap (si len(text) > max_chars_per_chunk) - extract_graph_gliner2 (por chunk) - aggregate_extraction_results - filter_relations_by_entity_types - merge_entity_aliases Es el flujo completo del playground (server.py del analisis gliner_glirel_tuning) refactorizado como funcion componible. """ from __future__ import annotations import os import sys import time from typing import Any _ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..")) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) from python.functions.core.chunk_with_overlap import chunk_with_overlap from python.functions.core.aggregate_extraction_results import aggregate_extraction_results from python.functions.core.filter_relations_by_entity_types import filter_relations_by_entity_types from python.functions.core.merge_entity_aliases import merge_entity_aliases from python.functions.datascience.extract_graph_gliner2 import extract_graph_gliner2 def extract_graph_from_text( text: str, entity_labels: list[str], relation_labels: list | dict, allowed: dict, model: Any, threshold: float = 0.3, max_chars_per_chunk: int = 1500, overlap_sentences: int = 2, ) -> dict: """Full pipeline: text -> graph (nodes + edges). Orchestrates chunking, per-chunk extraction, aggregation, typed filtering and alias resolution. Returns a graph ready for visualization. Args: text: Input text (any length). Auto-chunked if > max_chars_per_chunk. entity_labels: E.g. ["person", "organization", "location"]. relation_labels: E.g. ["works_at", "ceo_of", "located_in"] or dict with descriptions per label. allowed: Typed filter rules {rel_type: (head_types, tail_types)}. Pass {} to skip typed filtering. model: GLiNER2 model instance from gliner2_load_model. threshold: Confidence threshold (0.3 validated empirically). max_chars_per_chunk: Max chars per chunk before splitting. overlap_sentences: Sentence overlap between consecutive chunks. Returns: { "nodes": [{"id": str, "type": str, "count": int}, ...], "edges": [{"from": str, "to": str, "kind": str}, ...], "stats": { "n_chunks": int, "n_nodes": int, "n_edges": int, "n_dropped_typed": int, "elapsed_s": float } } """ t0 = time.time() # 1. Chunking if len(text) <= max_chars_per_chunk: chunks = [text] else: chunks = [ c["text"] for c in chunk_with_overlap( text, max_chars=max_chars_per_chunk, overlap_sentences=overlap_sentences, ) ] # 2. Extraccion por chunk results = [ extract_graph_gliner2( chunk, entity_labels=entity_labels, relation_labels=relation_labels, model=model, threshold=threshold, ) for chunk in chunks ] # 3. Agregacion agg = aggregate_extraction_results(results) # 4. name_to_type para el filtrado tipado name_to_type = {key[1]: data["type"] for key, data in agg["entities"].items()} # 5. Convertir Counter a dict {rel_type: [(h, t), ...]} raw_relations: dict[str, list] = {} for (h, rt, t), _count in agg["relations"].items(): raw_relations.setdefault(rt, []).append((h, t)) # 6. Filtrado tipado keep, drop = filter_relations_by_entity_types(raw_relations, name_to_type, allowed) # 7. Coreference / alias original_names = [data["name"] for data in agg["entities"].values()] alias = merge_entity_aliases(original_names) # 8. Construir nodos con alias aplicado nodes_dict: dict[str, dict] = {} for (typ, _key), data in agg["entities"].items(): canon = alias.get(data["name"], data["name"]) if canon not in nodes_dict: nodes_dict[canon] = {"type": typ, "count": data["count"]} else: nodes_dict[canon]["count"] += data["count"] # 9. Construir aristas deduplicadas con alias aplicado edges_set: set[tuple[str, str, str]] = set() for e in keep: h_canon = alias.get(e["from"], e["from"]) t_canon = alias.get(e["to"], e["to"]) if h_canon == t_canon: continue edges_set.add((h_canon, e["kind"], t_canon)) elapsed = round(time.time() - t0, 2) return { "nodes": [{"id": n, "type": info["type"], "count": info["count"]} for n, info in nodes_dict.items()], "edges": [{"from": h, "to": t, "kind": k} for h, k, t in edges_set], "stats": { "n_chunks": len(chunks), "n_nodes": len(nodes_dict), "n_edges": len(edges_set), "n_dropped_typed": len(drop), "elapsed_s": elapsed, }, }