"""Enricher: Extract entities + relations from text using LLM (claude -p haiku).""" import sys import json import os import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed # Registry functions ROOT = os.environ.get("FN_REGISTRY_ROOT", "") sys.path.insert(0, os.path.join(ROOT, "python", "functions", "core")) sys.path.insert(0, os.path.join(ROOT, "python", "functions", "datascience")) sys.path.insert(0, os.path.join(ROOT, "python", "functions", "cybersecurity")) sys.path.insert(0, os.path.join(ROOT, "analysis", "ontology_graph", "lib")) from core_functions import extract_json_from_llm, preprocess_text from split_text_into_chunks import split_text_into_chunks from deduplicate_entities import deduplicate_entities from deduplicate_relations import deduplicate_relations # ── Presets ──────────────────────────────────────────────────────────────────── ENTITY_PRESETS = [ {"type_ref": "person", "label": "Person", "metadata_fields": ["full_name", "alias", "nationality", "dob", "gender", "risk_score"]}, {"type_ref": "organization", "label": "Organization", "metadata_fields": ["legal_name", "country", "sector", "founded", "risk_score"]}, {"type_ref": "location", "label": "Location", "metadata_fields": ["lat", "lon", "address", "country", "city"]}, {"type_ref": "event", "label": "Event", "metadata_fields": ["event_type", "date", "location", "description", "severity"]}, {"type_ref": "email", "label": "Email", "metadata_fields": ["address", "provider", "verified", "breached"]}, {"type_ref": "domain", "label": "Domain", "metadata_fields": ["fqdn", "registrar", "created_date", "expires_date"]}, {"type_ref": "ip_address", "label": "IP Address", "metadata_fields": ["ip", "asn", "country", "isp", "geolocation"]}, {"type_ref": "phone", "label": "Phone", "metadata_fields": ["number", "country_code", "carrier", "phone_type"]}, {"type_ref": "document", "label": "Document", "metadata_fields": ["title", "format", "classification", "source"]}, {"type_ref": "url", "label": "URL/Link", "metadata_fields": ["url", "domain", "context"]}, {"type_ref": "concept", "label": "Concept", "metadata_fields": ["name", "category", "definition"]}, {"type_ref": "date_reference", "label": "Date/Time", "metadata_fields": ["date", "precision", "context"]}, {"type_ref": "quantity", "label": "Quantity/Amount", "metadata_fields": ["value", "unit", "context"]}, ] RELATION_TYPES = [ "employs", "works_for", "founded", "owns", "controls", "member_of", "affiliated_with", "collaborates_with", "communicates_with", "sent_to", "received_from", "located_in", "headquartered_in", "operates_in", "participated_in", "caused", "occurred_at", "occurred_on", "mentions", "references", "describes", "authored", "published", "funds", "transacted_with", "invested_in", "hosts", "resolves_to", "exploits", "targets", "related_to", "part_of", "instance_of", "has_attribute", ] # ── Load custom presets ──────────────────────────────────────────────────────── CUSTOM_PRESETS_PATH = os.path.join(ROOT, "analysis", "ontology_graph", "data", "custom_presets.json") def load_custom_presets(): if os.path.exists(CUSTOM_PRESETS_PATH): with open(CUSTOM_PRESETS_PATH) as f: data = json.load(f) return [p for p in data.get("presets", []) if not p.get("promoted", False)] return [] # ── LLM ──────────────────────────────────────────────────────────────────────── def claude_haiku_json(messages): parts = [] for msg in messages: if msg["role"] == "system": parts.append(f"[SYSTEM]\n{msg['content']}") elif msg["role"] == "user": parts.append(f"[USER]\n{msg['content']}") prompt = "\n\n".join(parts) result = subprocess.run( ["claude", "-p", "--model", "haiku", "--output-format", "json", prompt], capture_output=True, text=True, timeout=120, ) if result.returncode != 0: return {} envelope = json.loads(result.stdout) return extract_json_from_llm(envelope.get("result", "")) # ── Prompt ───────────────────────────────────────────────────────────────────── def build_prompt(presets, rel_types): type_lines = [] for p in presets: fields = ", ".join(p.get("metadata_fields", [])) type_lines.append(f"- {p['label']} (type_ref: {p['type_ref']}): [{fields}]") return ( "You are an entity and relation extraction expert. " "Given text, extract ALL entities and relations in a single pass.\n\n" "ENTITY TYPES:\n" + "\n".join(type_lines) + "\n\n" "RELATION TYPES: " + ", ".join(rel_types) + "\n\n" 'OUTPUT FORMAT (strict JSON):\n' '{\n' ' "entities": [{"name": "...", "type_ref": "...", "attributes": {...}, "confidence": 0.9}],\n' ' "relations": [{"from_name": "...", "to_name": "...", "relation_type": "...", "confidence": 0.8, "description": "..."}]\n' '}\n\n' "RULES:\n" "- Extract ALL entities explicitly mentioned\n" "- Use exact type_ref from schema. Unknown attributes = null\n" "- Confidence: 1.0=explicit, 0.7=strongly implied, 0.5=weakly implied\n" "- Relations: from_name/to_name MUST match entity names exactly\n" "- Respond in the same language as the text for descriptions" ) # ── Process chunk ────────────────────────────────────────────────────────────── def process_chunk(chunk_text, system_prompt): try: resp = claude_haiku_json([ {"role": "system", "content": system_prompt}, {"role": "user", "content": chunk_text}, ]) return resp.get("entities", []), resp.get("relations", []) except Exception: return [], [] # ── Main ─────────────────────────────────────────────────────────────────────── def main(): entity = json.load(sys.stdin) text = (entity.get("metadata") or {}).get("full_content", "") if not text: json.dump({"error": "No text content in entity metadata"}, sys.stdout) return text = preprocess_text(text) chunks = split_text_into_chunks(text, chunk_size=2000, overlap=200) all_presets = ENTITY_PRESETS + load_custom_presets() system_prompt = build_prompt(all_presets, RELATION_TYPES) # Parallel extraction from entity_candidate import EntityCandidate from relation_candidate import RelationCandidate all_entities = [] all_relations_raw = [] with ThreadPoolExecutor(max_workers=4) as pool: futures = {pool.submit(process_chunk, chunk, system_prompt): i for i, chunk in enumerate(chunks)} for future in as_completed(futures): ents, rels = future.result() for e in ents: name = e.get("name", "").strip() if name and e.get("confidence", 0) >= 0.5: all_entities.append(EntityCandidate( name=name, type_ref=e.get("type_ref", "concept"), attributes=e.get("attributes", {}), confidence=float(e.get("confidence", 0.5)), source_chunk_indices=[futures[future]], )) for r in rels: fn = r.get("from_name", "").strip() tn = r.get("to_name", "").strip() if fn and tn: all_relations_raw.append(RelationCandidate( from_name=fn, to_name=tn, relation_type=r.get("relation_type", "related_to"), confidence=float(r.get("confidence", 0.5)), description=r.get("description", ""), source_chunk_index=futures[future], )) # Dedup if all_entities: dedup = deduplicate_entities(all_entities, name_threshold=0.85) final_entities = dedup.entities entity_id_map = dedup.name_to_id final_relations = deduplicate_relations(all_relations_raw, entity_id_map) else: final_entities = [] final_relations = [] # Convert to enricher output format entities_out = [] relations_out = [] for i, e in enumerate(final_entities): attrs = {k: str(v) for k, v in (e.attributes or {}).items() if v is not None} entities_out.append({ "name": e.name, "type_ref": e.type_ref, "description": f"Extracted from text ({e.confidence:.0%} confidence)", "tags": ["extracted", "llm"], "metadata": attrs, "notes": "", }) # Build name→index map for relations name_to_idx = {} for i, e in enumerate(final_entities): name_to_idx[e.name] = i name_to_idx[e.name.lower().strip()] = i for r in final_relations: from_idx = name_to_idx.get(r.from_name) or name_to_idx.get(r.from_name.lower().strip()) to_idx = name_to_idx.get(r.to_name) or name_to_idx.get(r.to_name.lower().strip()) if from_idx is not None and to_idx is not None: relations_out.append({ "name": r.relation_type, "from_entity": f"__NEW_{from_idx}__", "to_entity": f"__NEW_{to_idx}__", "description": r.description, "weight": r.confidence, "tags": ["extracted"], "notes": "", }) # Also connect all entities to source text node for i in range(len(entities_out)): relations_out.append({ "name": "extracted_from", "from_entity": f"__NEW_{i}__", "to_entity": "__SOURCE__", "description": "Entity extracted from text", "weight": 1.0, "tags": [], "notes": "", }) json.dump({"entities": entities_out, "relations": relations_out}, sys.stdout, ensure_ascii=False) if __name__ == "__main__": main()