#!/usr/bin/env python3 """Importa contactos de Google (vCard export) al vault OSINT como fichas de persona y organizacion, clasificando con LLM y creando relaciones persona <-> organizacion. Flujo: 1. Parsear el .vcf con split_vcards (grupo `dav`). Extraer FN, TEL*, EMAIL*, ORG, TITLE. 2. Filtrar ruido/servicio (numeros de operadora, recordatorios, sin >=3 letras). 3. Clasificar con ask_llm (grupo `claude-direct`) por lotes de ~40, pidiendo JSON estricto. 4. Dedup contra personas/*.md existentes (match por slug exacto o subconjunto de tokens). 5. Generar fichas siguiendo projects/osint/CONVENTIONS.md (frontmatter canonico 3b). Modos: --dry-run (DEFAULT) no escribe nada; imprime resumen + muestra de 15. --apply escribe de verdad usando funciones del grupo `obsidian`. Tool de PROYECTO (vive en projects/osint/tools/). NO es funcion del registry, NO se indexa. Idempotente: re-ejecutar no duplica (dedup por slug). """ import sys import os import re import json import argparse import datetime sys.path.insert(0, "/home/enmanuel/fn_registry/python/functions") from infra.split_vcards import split_vcards # noqa: E402 from core.ask_llm import ask_llm # noqa: E402 from obsidian import ( # noqa: E402 slugify_obsidian_name, list_obsidian_notes, read_obsidian_note, create_obsidian_note, update_obsidian_note, ) OSINT = "/home/enmanuel/Obsidian/osint" VCF_PATH = "/home/enmanuel/Downloads/contacts.vcf" FUENTE = "Google Contacts export 2026-06-11" LLM_MODEL = "claude-haiku-4-5-20251001" BATCH_SIZE = 40 # Topónimos locales que el LLM tiende a confundir con organizaciones cuando # vienen como sufijo del nombre del contacto (p.ej. "Adrian Quinto Almachar"). # Un lugar NUNCA se convierte en organizacion ni en relacion. (slugificados) _PLACE_BLOCKLIST = { "almachar", "barcelona", "madrid", "malaga", "velez-malaga", "velez", "aliaguilla", "chamana", "axarquia", "torre-del-mar", "torrox", "nerja", "comares", "benamargosa", "moclinejo", "iznate", "cutar", } # Frontmatter canonico de persona (CONVENTIONS.md seccion 3b), en orden. PERSON_CANON = [ "tipo", "nombre", "slug", "aliases", "sexo", "fecha_nacimiento", "dni", "telefono", "email", "direccion", "pais", "relaciones", "contexto", "fuente", "tags", ] # Frontmatter de organizacion (CONVENTIONS.md secciones 6 y 3b adaptado). ORG_CANON = [ "tipo", "nombre", "slug", "aliases", "telefono", "email", "direccion", "pais", "relaciones", "contexto", "fuente", "tags", ] # -------------------------------------------------------------------------- # 1. Parseo de vCards # -------------------------------------------------------------------------- def _unfold(vcard_text: str) -> str: """Deshace el folding de lineas de vCard (continuacion con espacio/tab).""" return re.sub(r"\r?\n[ \t]", "", vcard_text) def _vcard_values(vcard_text: str, prop: str) -> list: """Devuelve todos los valores de una propiedad (p.ej. TEL, EMAIL). Acepta la forma `PROP;PARAMS:valor` y `PROP:valor`. Decodifica escapes simples de vCard (\\, , \\;, \\n) en el valor. """ vals = [] for line in vcard_text.splitlines(): m = re.match(rf"^(?:item\d+\.)?{prop}(?:;[^:]*)?:(.*)$", line, re.IGNORECASE) if m: v = m.group(1).strip() v = v.replace("\\,", ",").replace("\\;", ";").replace("\\n", " ").replace("\\\\", "\\") v = v.strip() if v: vals.append(v) return vals def parse_vcard(vcard_text: str) -> dict: """Extrae FN, todos los TEL, todos los EMAIL, ORG y TITLE de una vCard.""" txt = _unfold(vcard_text) fn_vals = _vcard_values(txt, "FN") org_vals = _vcard_values(txt, "ORG") org = "" if org_vals: # ORG viene como `Empresa;Departamento`. Quitar componentes vacios. org = " ".join(p.strip() for p in org_vals[0].split(";") if p.strip()) return { "fn": fn_vals[0] if fn_vals else "", "tels": _dedup_keep_order(_vcard_values(txt, "TEL")), "emails": _dedup_keep_order(_vcard_values(txt, "EMAIL")), "org": org, "title": (_vcard_values(txt, "TITLE") or [""])[0], } def _dedup_keep_order(items: list) -> list: seen, out = set(), [] for it in items: key = it.strip().lower() if key and key not in seen: seen.add(key) out.append(it.strip()) return out # -------------------------------------------------------------------------- # 2. Filtro de ruido/servicio # -------------------------------------------------------------------------- # Patrones de nombre que delatan numeros de servicio / recordatorios. _SERVICE_NAME_RE = re.compile( r"^\*" # empieza por * r"|^\d{3,5}\b" # codigo corto al inicio (1200, 22122) r"|att\.?\s*cliente" r"|buz[oó]n|buzon" r"|voicemail|voice\s*mail" r"|gestiona|consulta\b|informaci[oó]n|recarga" r"|servicio\s+al\s+cliente", re.IGNORECASE, ) def is_service(name: str) -> bool: """True si el contacto es ruido de operadora / recordatorio / sin nombre real.""" n = (name or "").strip() if not n: return True if _SERVICE_NAME_RE.search(n): return True # menos de 3 letras = no es un nombre humano ni de negocio real letters = re.sub(r"[^A-Za-zÀ-ÿñÑ]", "", n) if len(letters) < 3: return True return False # -------------------------------------------------------------------------- # 4. Dedup contra fichas existentes # -------------------------------------------------------------------------- # Tokens demasiado comunes para fundamentar un match por subconjunto. _STOP_TOKENS = {"de", "del", "la", "las", "el", "los", "y", "san", "da", "do"} # Nombres de pila muy comunes: compartir SOLO estos no basta para deducir que # dos contactos son la misma persona (hay decenas de "Antonio", "Maria", "Jose"). # Un match por subconjunto exige al menos un token distintivo fuera de esta lista # (tipicamente un apellido). _COMMON_GIVEN = { "antonio", "jose", "juan", "maria", "manuel", "carlos", "francisco", "javier", "david", "miguel", "angel", "luis", "pedro", "pablo", "rafael", "fernando", "sergio", "alberto", "alejandro", "daniel", "jesus", "marcos", "ana", "carmen", "cristina", "laura", "marta", "lucia", "elena", "sara", "paula", "raquel", "gema", "lorena", "natalia", "silvia", "rosa", "isabel", "dani", "javi", "manolo", "paco", "pepe", "alex", "nacho", "mari", "lola", } def _name_tokens(name: str) -> set: slug = slugify_obsidian_name(name or "") return {t for t in slug.split("-") if t and t not in _STOP_TOKENS} def load_existing_persons() -> list: """Carga (slug, nombre, token_set) de cada ficha de persona del vault.""" out = [] for p in list_obsidian_notes(OSINT, subfolder="personas"): base = os.path.splitext(os.path.basename(p))[0] if base.startswith("_"): continue try: fm = read_obsidian_note(p)["frontmatter"] except Exception: fm = {} nombre = fm.get("nombre") or base.replace("-", " ") out.append({ "slug": base, "path": p, "nombre": nombre, "tokens": _name_tokens(nombre) or _name_tokens(base), }) return out def load_existing_orgs() -> dict: """Mapa slug -> path de las organizaciones existentes.""" out = {} for p in list_obsidian_notes(OSINT, subfolder="organizaciones"): base = os.path.splitext(os.path.basename(p))[0] if base.startswith("_"): continue out[base] = p return out def _distinctive(tokens: set) -> bool: """True si el conjunto de tokens incluye al menos uno distintivo (apellido): longitud >=4 y fuera de los nombres de pila ultra-comunes.""" return any(len(t) >= 4 and t not in _COMMON_GIVEN for t in tokens) def match_existing_person(name: str, existing: list): """Busca una persona existente que case con `name`. Conservador a proposito. Se considera la MISMA persona solo si: - slug exacto, o - los tokens del nombre de contacto son subconjunto de los de una ficha existente (forma menos especifica del mismo nombre), compartiendo >=2 tokens, ambos con >=2 tokens, y con al menos un token distintivo (apellido) en el solape. Esto cubre el caso del estandar ("Manuel Gutierrez" subset de "Manuel Gutierrez Gamez") y RECHAZA fusiones erroneas por nombre de pila comun ("Antonio", "Maria") o por dos given names compartidos ("Maria Jose" vs "Jose Maria ..."). Ante la duda, NO casa: se prefiere crear una ficha nueva (un duplicado es recuperable; una fusion erronea corrompe una investigacion existente). """ cand_slug = slugify_obsidian_name(name) cand_tokens = _name_tokens(name) if not cand_tokens: return None for ex in existing: if ex["slug"] == cand_slug and cand_slug: return ex for ex in existing: ex_tokens = ex["tokens"] if len(cand_tokens) < 2 or len(ex_tokens) < 2: continue if not (cand_tokens <= ex_tokens): continue shared = cand_tokens & ex_tokens if len(shared) >= 2 and _distinctive(shared): return ex return None # -------------------------------------------------------------------------- # 3. Clasificacion LLM por lotes # -------------------------------------------------------------------------- _LLM_SYSTEM = ( "Eres un clasificador de contactos telefonicos en espanol. Devuelves SOLO " "un array JSON valido, sin texto alrededor, sin markdown." ) _LLM_INSTRUCTIONS = """Clasifica cada contacto de la lista. Devuelve un array JSON con un objeto por contacto, en el MISMO orden, con estos campos: {"i": , "tipo": "persona"|"organizacion"|"servicio", "persona_nombre": , "org_nombre": , "rol": , "sexo": "hombre"|"mujer"|null} Reglas: - tipo="persona" si el contacto es un individuo (nombre de pila + apellidos). - tipo="organizacion" si es un negocio, empresa, comercio o servicio (fruteria, autoescuela, seguros, banco, taller, tienda, restaurante, clinica...). - tipo="servicio" si es un numero de operadora, recordatorio o automatismo (raro: ya filtramos la mayoria). - Si el contacto MEZCLA persona y organizacion, rellena persona_nombre Y org_nombre Y rol. Ej: "Emilio Villalba Gestor Orange" -> persona_nombre="Emilio Villalba", org_nombre="Orange", rol="gestor". Ej: "Abdul Fruteria Velez" -> tipo="organizacion", org_nombre="Fruteria Velez", persona_nombre="Abdul", rol="dueno". - persona_nombre: nombre LIMPIO de la persona (quita el rol y la empresa). null si no hay persona. - org_nombre: nombre del negocio/empresa asociado. null si no hay. - rol: gestor, comercial, dueno, empleado, contacto... null si no aplica. - sexo: deduce del nombre de pila ("hombre"|"mujer"); null si ambiguo o no hay persona. - Limpia emojis y typos al inferir, pero NO inventes datos. Contactos: """ def _extract_json_array(text: str): """Extrae el primer array JSON `[...]` de una respuesta, tolerando texto alrededor.""" if not text: return None # intento directo try: v = json.loads(text.strip()) if isinstance(v, list): return v except Exception: pass # buscar el primer '[' y casar corchetes balanceados start = text.find("[") if start == -1: return None depth = 0 in_str = False esc = False for i in range(start, len(text)): c = text[i] if in_str: if esc: esc = False elif c == "\\": esc = True elif c == '"': in_str = False continue if c == '"': in_str = True elif c == "[": depth += 1 elif c == "]": depth -= 1 if depth == 0: chunk = text[start:i + 1] try: v = json.loads(chunk) return v if isinstance(v, list) else None except Exception: return None return None def classify_batch(batch: list, llm_calls: list) -> list: """Clasifica un lote de contactos. batch = [(local_idx, contact_dict), ...]. Devuelve lista de dicts de clasificacion alineados por 'i' (local_idx). Reintenta una vez si el parseo falla; si vuelve a fallar, marca todos como persona por defecto y lo anota en llm_calls. """ lines = [] for idx, c in batch: extra = [] if c["org"]: extra.append(f"ORG={c['org']}") if c["title"]: extra.append(f"TITLE={c['title']}") suffix = f" [{'; '.join(extra)}]" if extra else "" lines.append(f"{idx}. {c['fn']}{suffix}") prompt = _LLM_INSTRUCTIONS + "\n".join(lines) for attempt in (1, 2): try: resp = ask_llm(prompt, model=LLM_MODEL, system=_LLM_SYSTEM, max_tokens=4096, echo=False) except Exception as e: # noqa: BLE001 llm_calls.append({"size": len(batch), "ok": False, "error": f"{type(e).__name__}: {e}", "attempt": attempt}) resp = "" if not resp: llm_calls.append({"size": len(batch), "ok": False, "error": "empty response (auth/token?)", "attempt": attempt}) if attempt == 2: break continue arr = _extract_json_array(resp) if arr is not None: llm_calls.append({"size": len(batch), "ok": True, "attempt": attempt}) return arr llm_calls.append({"size": len(batch), "ok": False, "error": "json parse failed", "attempt": attempt}) # fallback: todo persona return [{"i": idx, "tipo": "persona", "persona_nombre": c["fn"], "org_nombre": None, "rol": None, "sexo": None, "_fallback": True} for idx, c in batch] # -------------------------------------------------------------------------- # 5. Construccion de fichas (planificacion) # -------------------------------------------------------------------------- def _ordered_frontmatter(values: dict, canon: list) -> dict: """Devuelve un dict ordenado segun `canon`, con extras al final.""" fm = {} for k in canon: fm[k] = values.get(k) for k, v in values.items(): if k not in fm: fm[k] = v return fm def _contact_block(tels: list, emails: list) -> str: """Seccion ## Contacto con los telefonos/emails extra (mas alla del primero).""" lines = [] extra_tel = tels[1:] extra_mail = emails[1:] if extra_tel or extra_mail: lines.append("## Contacto") lines.append("") for t in extra_tel: lines.append(f"- telefono: {t}") for e in extra_mail: lines.append(f"- email: {e}") lines.append("") return "\n".join(lines) def plan_person(name, sexo, tels, emails, org_slug, org_nombre, rol, existing_persons, used_person_slugs): """Planifica crear o enriquecer una persona. Devuelve dict de plan.""" match = match_existing_person(name, existing_persons) nombre = name.strip() if match: return { "action": "enrich_person", "slug": match["slug"], "path": match["path"], "nombre_existente": match["nombre"], "alias_add": nombre, "tel": tels[0] if tels else None, "email": emails[0] if emails else None, "tels": tels, "emails": emails, "org_slug": org_slug, "org_nombre": org_nombre, "rol": rol, } # crear nueva slug = _resolve_slug(slugify_obsidian_name(nombre) or "contacto", used_person_slugs) rel = [] if org_slug: rel.append(f"[[{org_slug}]] — {rol or 'contacto'}") fm = _ordered_frontmatter({ "tipo": "persona", "nombre": nombre, "slug": slug, "aliases": [], "sexo": sexo if sexo in ("hombre", "mujer") else None, "fecha_nacimiento": None, "dni": None, "telefono": tels[0] if tels else None, "email": emails[0] if emails else None, "direccion": None, "pais": None, "relaciones": rel, "contexto": "google-contacts", "fuente": FUENTE, "tags": ["persona", "osint", "contacto"], }, PERSON_CANON) body_parts = [] contact = _contact_block(tels, emails) if contact: body_parts.append(contact) if org_slug: body_parts.append("## Relacionado") body_parts.append("") body_parts.append(f"- [[organizaciones/{org_slug}|{org_nombre}]] — {rol or 'contacto'}") body_parts.append("") body_parts.append("## Notas") body_parts.append("") return { "action": "create_person", "slug": slug, "nombre": nombre, "frontmatter": fm, "body": "\n".join(body_parts), "tel": tels[0] if tels else None, "email": emails[0] if emails else None, "org_slug": org_slug, "org_nombre": org_nombre, "rol": rol, } def _fuzzy_existing_org(slug: str, existing_orgs: dict): """Devuelve el slug de una org existente que sea casi-duplicado de `slug`. Casa cuando uno es prefijo del otro compartiendo >=5 chars de raiz comun (p.ej. "fenixfood" ~ "fenixfood-sl", "biorganic" ~ "biorganicfood-sl", "4geekss" ~ "4geeks"). None si no hay casi-duplicado. """ for ex in existing_orgs: a, b = slug, ex root = a if len(a) <= len(b) else b longer = b if root is a else a if len(root) >= 5 and longer.startswith(root): return ex # tolerar 1-2 chars de cola repetida ("4geekss" vs "4geeks") common = os.path.commonprefix([a, b]) if len(common) >= 5 and abs(len(a) - len(b)) <= 2 and ( a[len(common):].strip("s-") == "" or b[len(common):].strip("s-") == "" ): return ex return None def plan_org(org_nombre, tels, emails, existing_orgs, used_org_slugs, person_slug=None, person_nombre=None, rol=None): """Planifica crear (o reutilizar) una organizacion. Devuelve (slug, plan|None). plan=None si ya existe (en vault o ya planificada en este batch) o si el nombre es un toponimo (no se crea org de lugar). slug=None si debe ignorarse. """ slug = slugify_obsidian_name(org_nombre) if not slug: return None, None # Lugar -> no es organizacion: no crear, no enlazar. if slug in _PLACE_BLOCKLIST: return None, None if slug in existing_orgs or slug in used_org_slugs: # ya existe: solo enlazar (no crear). Devolvemos el slug, sin plan de creacion. return slug, None # Casi-duplicado de una org existente -> reutilizar la existente. fuzzy = _fuzzy_existing_org(slug, existing_orgs) if fuzzy: return fuzzy, None rel = [] if person_slug: rel.append(f"[[{person_slug}]] — {rol or 'contacto'}") fm = _ordered_frontmatter({ "tipo": "organizacion", "nombre": org_nombre.strip(), "slug": slug, "aliases": [], "telefono": tels[0] if tels else None, "email": emails[0] if emails else None, "direccion": None, "pais": None, "relaciones": rel, "contexto": "google-contacts", "fuente": FUENTE, "tags": ["organizacion", "osint", "contacto"], }, ORG_CANON) body_parts = [] contact = _contact_block(tels, emails) if contact: body_parts.append(contact) if person_slug: body_parts.append("## Relacionado") body_parts.append("") body_parts.append(f"- [[{person_slug}|{person_nombre}]] — {rol or 'contacto'}") body_parts.append("") body_parts.append("## Notas") body_parts.append("") plan = { "action": "create_org", "slug": slug, "nombre": org_nombre.strip(), "frontmatter": fm, "body": "\n".join(body_parts), } return slug, plan def _resolve_slug(base: str, used: set) -> str: """Resuelve colisiones de slug con sufijo -2, -3...""" if base not in used: used.add(base) return base k = 2 while f"{base}-{k}" in used: k += 1 s = f"{base}-{k}" used.add(s) return s # -------------------------------------------------------------------------- # Orquestacion # -------------------------------------------------------------------------- def build_plan(contacts, classifications, existing_persons, existing_orgs): """Construye la lista de acciones (crear/enriquecer) a partir de la clasificacion.""" by_idx = {} for c in classifications: if isinstance(c, dict) and "i" in c: by_idx[c["i"]] = c person_plans, org_plans, enrich_plans = [], [], [] relations = [] # (tipo_origen, slug_origen, slug_org, rol) used_person_slugs = {p["slug"] for p in existing_persons} used_org_slugs = set() skipped_service = 0 # indice de personas existentes mutable (para que dedup vea las recien creadas) persons_index = list(existing_persons) for idx, contact in contacts: cls = by_idx.get(idx) if not cls: cls = {"tipo": "persona", "persona_nombre": contact["fn"], "org_nombre": None, "rol": None, "sexo": None} tipo = (cls.get("tipo") or "persona").lower() tels = contact["tels"] emails = contact["emails"] rol = cls.get("rol") sexo = cls.get("sexo") persona_nombre = cls.get("persona_nombre") org_nombre = cls.get("org_nombre") or contact["org"] or None if tipo == "servicio": skipped_service += 1 continue if tipo == "organizacion": # crear la org (telefono al de la org); persona asociada si la hay person_slug = None person_disp = None if persona_nombre and len(_name_tokens(persona_nombre)) >= 1: pmatch = match_existing_person(persona_nombre, persons_index) if pmatch: person_slug = pmatch["slug"] person_disp = pmatch["nombre"] enrich_plans.append({ "action": "enrich_person", "slug": pmatch["slug"], "path": pmatch["path"], "nombre_existente": pmatch["nombre"], "alias_add": persona_nombre, "tel": None, "email": None, "tels": [], "emails": [], "org_slug": None, "org_nombre": None, "rol": None, }) else: pslug = _resolve_slug(slugify_obsidian_name(persona_nombre) or "contacto", used_person_slugs) person_slug = pslug person_disp = persona_nombre.strip() pfm = _ordered_frontmatter({ "tipo": "persona", "nombre": persona_nombre.strip(), "slug": pslug, "aliases": [], "sexo": sexo if sexo in ("hombre", "mujer") else None, "fecha_nacimiento": None, "dni": None, "telefono": None, "email": None, "direccion": None, "pais": None, "relaciones": [], # se completa abajo con el org slug "contexto": "google-contacts", "fuente": FUENTE, "tags": ["persona", "osint", "contacto"], }, PERSON_CANON) person_plans.append({ "action": "create_person", "slug": pslug, "nombre": persona_nombre.strip(), "frontmatter": pfm, "body": "## Notas\n", "tel": None, "email": None, "org_slug": None, "org_nombre": org_nombre, "rol": rol, "_pending_org_rel": True, }) persons_index.append({"slug": pslug, "path": None, "nombre": persona_nombre.strip(), "tokens": _name_tokens(persona_nombre)}) oslug, oplan = plan_org(org_nombre or contact["fn"], tels, emails, existing_orgs, used_org_slugs, person_slug=person_slug, person_nombre=person_disp, rol=rol) if oslug: used_org_slugs.add(oslug) if oplan: org_plans.append(oplan) if person_slug: relations.append(("persona->org", person_slug, oslug, rol)) # completar relacion en el person plan recien creado for pp in person_plans: if pp.get("_pending_org_rel") and pp["slug"] == person_slug: pp["frontmatter"]["relaciones"] = [f"[[{oslug}]] — {rol or 'contacto'}"] pp["org_slug"] = oslug pp["body"] = ( "## Relacionado\n\n" f"- [[organizaciones/{oslug}|{org_nombre}]] — {rol or 'contacto'}\n\n" "## Notas\n" ) pp.pop("_pending_org_rel", None) continue # tipo == persona name = persona_nombre or contact["fn"] org_slug = None # si la persona trae una org asociada, planificar la org y enlazar if org_nombre and len(_name_tokens(org_nombre)) >= 1: oslug, oplan = plan_org(org_nombre, [], [], existing_orgs, used_org_slugs) if oslug: used_org_slugs.add(oslug) org_slug = oslug if oplan: # la org no lleva tel/email del contacto (son de la persona) org_plans.append(oplan) pplan = plan_person(name, sexo, tels, emails, org_slug, org_nombre, rol, persons_index, used_person_slugs) if pplan["action"] == "create_person": person_plans.append(pplan) persons_index.append({"slug": pplan["slug"], "path": None, "nombre": pplan["nombre"], "tokens": _name_tokens(pplan["nombre"])}) if org_slug: # backref persona en la org recien planificada for op in org_plans: if op["slug"] == org_slug and not op["frontmatter"].get("relaciones"): op["frontmatter"]["relaciones"] = [f"[[{pplan['slug']}]] — {pplan['rol'] or 'contacto'}"] else: enrich_plans.append(pplan) if org_slug: relations.append(("persona->org", pplan["slug"], org_slug, rol)) return { "person_creates": person_plans, "org_creates": org_plans, "enriches": enrich_plans, "relations": relations, "skipped_service": skipped_service, } # -------------------------------------------------------------------------- # Aplicar (solo --apply) # -------------------------------------------------------------------------- def apply_plan(plan): """Escribe las fichas en disco usando funciones del grupo obsidian.""" created_p = created_o = enriched = 0 for pp in plan["person_creates"]: create_obsidian_note(OSINT, f"personas/{pp['slug']}", body=pp["body"], frontmatter=pp["frontmatter"], overwrite=True) created_p += 1 for op in plan["org_creates"]: create_obsidian_note(OSINT, f"organizaciones/{op['slug']}", body=op["body"], frontmatter=op["frontmatter"], overwrite=True) created_o += 1 for ep in plan["enriches"]: path = ep["path"] if not path or not os.path.exists(path): continue note = read_obsidian_note(path) fm = dict(note["frontmatter"]) # anadir alias del contacto aliases = fm.get("aliases") or [] if not isinstance(aliases, list): aliases = [aliases] if ep["alias_add"] and ep["alias_add"] not in aliases and ep["alias_add"] != fm.get("nombre"): aliases.append(ep["alias_add"]) # rellenar telefono/email si faltan if ep.get("tel") and not fm.get("telefono"): fm["telefono"] = ep["tel"] if ep.get("email") and not fm.get("email"): fm["email"] = ep["email"] update_obsidian_note(path, set_frontmatter={"aliases": aliases, "telefono": fm.get("telefono"), "email": fm.get("email")}) enriched += 1 return created_p, created_o, enriched # -------------------------------------------------------------------------- # Reporte dry-run # -------------------------------------------------------------------------- def report(plan, stats, llm_calls): n_create_p = len(plan["person_creates"]) n_enrich = len(plan["enriches"]) n_create_o = len(plan["org_creates"]) n_rel = len(plan["relations"]) print("=" * 64) print("DRY-RUN — import_google_contacts.py") print("=" * 64) print(f"vCards totales en el .vcf .................. {stats['total']}") print(f"descartados servicio/ruido ................ {stats['filtered']}") print(f"contactos clasificados con LLM ............ {stats['classified']}") print(f" de ellos sin telefono ni email .......... {stats['no_contact']}") print("-" * 64) print(f"PERSONAS a crear .......................... {n_create_p}") print(f"PERSONAS a enriquecer (ya existen) ........ {n_enrich}") print(f"ORGANIZACIONES a crear .................... {n_create_o}") print(f"RELACIONES persona<->organizacion ......... {n_rel}") print(f"contactos marcados como servicio (LLM) .... {plan['skipped_service']}") print(f"colisiones de slug resueltas (sufijo) ..... {stats['slug_collisions']}") print("-" * 64) print("Llamadas a ask_llm:") ok = sum(1 for c in llm_calls if c["ok"]) fail = sum(1 for c in llm_calls if not c["ok"]) print(f" exitosas={ok} fallidas={fail} total_intentos={len(llm_calls)}") for c in llm_calls: if not c["ok"]: print(f" FALLO lote size={c['size']} intento={c['attempt']}: {c.get('error')}") print("=" * 64) print("MUESTRA de 15 fichas (nombre -> tipo/accion -> tel/email -> relacion):") print("-" * 64) sample = [] for pp in plan["person_creates"]: rel = f" -> org {pp['org_slug']} ({pp['rol'] or 'contacto'})" if pp.get("org_slug") else "" sample.append(f"[crear persona] {pp['nombre']} | tel={pp['tel'] or '-'} email={pp['email'] or '-'}{rel}") for op in plan["org_creates"]: rels = op["frontmatter"].get("relaciones") or [] rel = f" -> {rels[0]}" if rels else "" tel = op["frontmatter"].get("telefono") eml = op["frontmatter"].get("email") sample.append(f"[crear org] {op['nombre']} | tel={tel or '-'} email={eml or '-'}{rel}") for ep in plan["enriches"]: sample.append(f"[enriquecer] {ep['nombre_existente']} (+alias '{ep['alias_add']}', +tel={ep.get('tel') or '-'})") for line in sample[:15]: print(" " + line) if len(sample) < 1: print(" (sin fichas planificadas)") print("=" * 64) # -------------------------------------------------------------------------- # main # -------------------------------------------------------------------------- def main(): ap = argparse.ArgumentParser(description="Importa contactos Google al vault OSINT.") ap.add_argument("--apply", action="store_true", help="Escribe las fichas en disco. Por defecto: dry-run (no escribe).") ap.add_argument("--vcf", default=VCF_PATH, help="Ruta al .vcf de contactos.") ap.add_argument("--limit", type=int, default=0, help="(debug) limita el numero de contactos clasificados.") args = ap.parse_args() if not os.path.exists(args.vcf): print(f"ERROR: no existe el .vcf: {args.vcf}", file=sys.stderr) return 1 with open(args.vcf, "r", encoding="utf-8", errors="replace") as f: vcf_text = f.read() cards = split_vcards(vcf_text) total = len(cards) contacts = [] filtered = 0 for raw in cards: c = parse_vcard(raw) if is_service(c["fn"]): filtered += 1 continue contacts.append(c) if args.limit and args.limit > 0: contacts = contacts[:args.limit] # indexar contactos indexed = list(enumerate(contacts)) # clasificar por lotes llm_calls = [] classifications = [] for start in range(0, len(indexed), BATCH_SIZE): batch = indexed[start:start + BATCH_SIZE] classifications.extend(classify_batch(batch, llm_calls)) existing_persons = load_existing_persons() existing_orgs = load_existing_orgs() # contar colisiones: comparar slugs base antes de resolver base_slugs = {} for _, c in indexed: s = slugify_obsidian_name(c["fn"]) if s: base_slugs[s] = base_slugs.get(s, 0) + 1 slug_collisions = sum(v - 1 for v in base_slugs.values() if v > 1) plan = build_plan(indexed, classifications, existing_persons, existing_orgs) no_contact = sum(1 for _, c in indexed if not c["tels"] and not c["emails"]) stats = { "total": total, "filtered": filtered, "classified": len(indexed), "no_contact": no_contact, "slug_collisions": slug_collisions, } report(plan, stats, llm_calls) if args.apply: cp, co, en = apply_plan(plan) print(f"\nAPLICADO: personas creadas={cp} orgs creadas={co} enriquecidas={en}") else: print("\n(dry-run: no se escribio nada. Usa --apply para aplicar.)") return 0 if __name__ == "__main__": sys.exit(main())