837563c3ba
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
146 lines
5.1 KiB
Python
146 lines
5.1 KiB
Python
"""Extrae entidades de un chunk de texto usando un LLM inyectado."""
|
|
|
|
import sys
|
|
import os
|
|
import warnings
|
|
from typing import Callable
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
|
|
from python.types.datascience.entity_candidate import EntityCandidate
|
|
|
|
|
|
def _build_system_prompt(entity_schema: list[dict], language_instruction: str) -> str:
|
|
"""Construye el system prompt para extraccion de entidades."""
|
|
lines = [
|
|
"You are an entity extraction expert. Given text, extract all entities",
|
|
"matching these types. For each entity, provide: name, type_ref,",
|
|
"attributes (matching the metadata_fields for that type), and a",
|
|
"confidence score (0.0-1.0).",
|
|
"",
|
|
"Entity types:",
|
|
]
|
|
|
|
for schema_entry in entity_schema:
|
|
label = schema_entry.get("label", "Unknown")
|
|
type_ref = schema_entry.get("type_ref", "")
|
|
metadata_fields = schema_entry.get("metadata_fields", [])
|
|
lines.append(f"- {label} (type_ref: {type_ref})")
|
|
if metadata_fields:
|
|
lines.append(f" fields: {', '.join(metadata_fields)}")
|
|
|
|
lines += [
|
|
"",
|
|
'Output JSON: {"entities": [{"name": "...", "type_ref": "...", "attributes": {...}, "confidence": 0.9}]}',
|
|
"",
|
|
"Rules:",
|
|
"- Only extract entities explicitly mentioned in the text",
|
|
"- Use the exact type_ref from the schema",
|
|
"- Leave unknown attributes as null",
|
|
"- Confidence: 1.0 = explicitly named, 0.7 = strongly implied, 0.5 = weakly implied",
|
|
f"- {language_instruction}",
|
|
]
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def extract_entities_llm(
|
|
text: str,
|
|
entity_schema: list[dict],
|
|
llm_chat_json: Callable[[list[dict]], dict],
|
|
language_instruction: str = "Respond in English.",
|
|
) -> list[EntityCandidate]:
|
|
"""Extrae entidades de un chunk de texto usando un LLM inyectado.
|
|
|
|
Construye un system prompt con el schema de entity types, llama al LLM
|
|
y valida la respuesta retornando una lista de EntityCandidate.
|
|
|
|
Args:
|
|
text: Chunk de texto a analizar.
|
|
entity_schema: Lista de tipos con metadata fields. Cada entrada es un
|
|
dict con las claves 'type_ref', 'label' y opcionalmente
|
|
'metadata_fields'. Ejemplo:
|
|
[{"type_ref": "osint_person_go_cybersecurity", "label": "Person",
|
|
"metadata_fields": ["full_name", "alias"]}]
|
|
llm_chat_json: Funcion que recibe una lista de mensajes OpenAI-style
|
|
y retorna un dict con la respuesta JSON del LLM. Interfaz:
|
|
llm_chat_json([{"role": "system", "content": "..."}, ...]) -> dict
|
|
language_instruction: Instruccion de idioma para el LLM. Por defecto
|
|
"Respond in English."
|
|
|
|
Returns:
|
|
Lista de EntityCandidate extraidos. Retorna lista vacia si el LLM
|
|
no retorna JSON valido o si no se encuentran entidades.
|
|
|
|
Raises:
|
|
ValueError: Si entity_schema esta vacio.
|
|
"""
|
|
if not entity_schema:
|
|
raise ValueError("entity_schema no puede estar vacio")
|
|
|
|
valid_type_refs = {entry.get("type_ref", "") for entry in entity_schema}
|
|
type_ref_to_label = {
|
|
entry.get("type_ref", ""): entry.get("label", "") for entry in entity_schema
|
|
}
|
|
|
|
system_prompt = _build_system_prompt(entity_schema, language_instruction)
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": text},
|
|
]
|
|
|
|
try:
|
|
response = llm_chat_json(messages)
|
|
except Exception as exc:
|
|
warnings.warn(f"extract_entities_llm: error llamando al LLM: {exc}", stacklevel=2)
|
|
return []
|
|
|
|
raw_entities = response.get("entities", [])
|
|
if not isinstance(raw_entities, list):
|
|
warnings.warn(
|
|
"extract_entities_llm: la respuesta del LLM no contiene 'entities' como lista",
|
|
stacklevel=2,
|
|
)
|
|
return []
|
|
|
|
candidates: list[EntityCandidate] = []
|
|
for item in raw_entities:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
|
|
name = item.get("name", "")
|
|
if not name:
|
|
continue
|
|
|
|
type_ref = item.get("type_ref", "")
|
|
if type_ref not in valid_type_refs:
|
|
warnings.warn(
|
|
f"extract_entities_llm: type_ref '{type_ref}' no esta en el schema, descartando entidad '{name}'",
|
|
stacklevel=2,
|
|
)
|
|
continue
|
|
|
|
attributes = item.get("attributes", {})
|
|
if not isinstance(attributes, dict):
|
|
attributes = {}
|
|
# Normalizar null values a None
|
|
attributes = {k: v for k, v in attributes.items() if v is not None}
|
|
|
|
confidence = item.get("confidence", 0.0)
|
|
if not isinstance(confidence, (int, float)):
|
|
confidence = 0.0
|
|
confidence = float(max(0.0, min(1.0, confidence)))
|
|
|
|
candidates.append(
|
|
EntityCandidate(
|
|
name=name,
|
|
type_ref=type_ref,
|
|
type_label=type_ref_to_label.get(type_ref, ""),
|
|
attributes=attributes,
|
|
confidence=confidence,
|
|
)
|
|
)
|
|
|
|
return candidates
|