Files
fn_registry/python/functions/datascience/extract_entities_llm.py
egutierrez 63a9cb5273 feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

146 lines
5.1 KiB
Python

"""Extrae entidades de un chunk de texto usando un LLM inyectado."""
import sys
import os
import warnings
from typing import Callable
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from python.types.datascience.entity_candidate import EntityCandidate
def _build_system_prompt(entity_schema: list[dict], language_instruction: str) -> str:
"""Construye el system prompt para extraccion de entidades."""
lines = [
"You are an entity extraction expert. Given text, extract all entities",
"matching these types. For each entity, provide: name, type_ref,",
"attributes (matching the metadata_fields for that type), and a",
"confidence score (0.0-1.0).",
"",
"Entity types:",
]
for schema_entry in entity_schema:
label = schema_entry.get("label", "Unknown")
type_ref = schema_entry.get("type_ref", "")
metadata_fields = schema_entry.get("metadata_fields", [])
lines.append(f"- {label} (type_ref: {type_ref})")
if metadata_fields:
lines.append(f" fields: {', '.join(metadata_fields)}")
lines += [
"",
'Output JSON: {"entities": [{"name": "...", "type_ref": "...", "attributes": {...}, "confidence": 0.9}]}',
"",
"Rules:",
"- Only extract entities explicitly mentioned in the text",
"- Use the exact type_ref from the schema",
"- Leave unknown attributes as null",
"- Confidence: 1.0 = explicitly named, 0.7 = strongly implied, 0.5 = weakly implied",
f"- {language_instruction}",
]
return "\n".join(lines)
def extract_entities_llm(
text: str,
entity_schema: list[dict],
llm_chat_json: Callable[[list[dict]], dict],
language_instruction: str = "Respond in English.",
) -> list[EntityCandidate]:
"""Extrae entidades de un chunk de texto usando un LLM inyectado.
Construye un system prompt con el schema de entity types, llama al LLM
y valida la respuesta retornando una lista de EntityCandidate.
Args:
text: Chunk de texto a analizar.
entity_schema: Lista de tipos con metadata fields. Cada entrada es un
dict con las claves 'type_ref', 'label' y opcionalmente
'metadata_fields'. Ejemplo:
[{"type_ref": "osint_person_go_cybersecurity", "label": "Person",
"metadata_fields": ["full_name", "alias"]}]
llm_chat_json: Funcion que recibe una lista de mensajes OpenAI-style
y retorna un dict con la respuesta JSON del LLM. Interfaz:
llm_chat_json([{"role": "system", "content": "..."}, ...]) -> dict
language_instruction: Instruccion de idioma para el LLM. Por defecto
"Respond in English."
Returns:
Lista de EntityCandidate extraidos. Retorna lista vacia si el LLM
no retorna JSON valido o si no se encuentran entidades.
Raises:
ValueError: Si entity_schema esta vacio.
"""
if not entity_schema:
raise ValueError("entity_schema no puede estar vacio")
valid_type_refs = {entry.get("type_ref", "") for entry in entity_schema}
type_ref_to_label = {
entry.get("type_ref", ""): entry.get("label", "") for entry in entity_schema
}
system_prompt = _build_system_prompt(entity_schema, language_instruction)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": text},
]
try:
response = llm_chat_json(messages)
except Exception as exc:
warnings.warn(f"extract_entities_llm: error llamando al LLM: {exc}", stacklevel=2)
return []
raw_entities = response.get("entities", [])
if not isinstance(raw_entities, list):
warnings.warn(
"extract_entities_llm: la respuesta del LLM no contiene 'entities' como lista",
stacklevel=2,
)
return []
candidates: list[EntityCandidate] = []
for item in raw_entities:
if not isinstance(item, dict):
continue
name = item.get("name", "")
if not name:
continue
type_ref = item.get("type_ref", "")
if type_ref not in valid_type_refs:
warnings.warn(
f"extract_entities_llm: type_ref '{type_ref}' no esta en el schema, descartando entidad '{name}'",
stacklevel=2,
)
continue
attributes = item.get("attributes", {})
if not isinstance(attributes, dict):
attributes = {}
# Normalizar null values a None
attributes = {k: v for k, v in attributes.items() if v is not None}
confidence = item.get("confidence", 0.0)
if not isinstance(confidence, (int, float)):
confidence = 0.0
confidence = float(max(0.0, min(1.0, confidence)))
candidates.append(
EntityCandidate(
name=name,
type_ref=type_ref,
type_label=type_ref_to_label.get(type_ref, ""),
attributes=attributes,
confidence=confidence,
)
)
return candidates