#!/usr/bin/env python3 """Importación/enriquecimiento idempotente de contactos desde un .vcf. Dos operaciones, ambas idempotentes y NO destructivas (solo INSERT/UPDATE aditivo, nunca DELETE): 1. backfill — calcula la clave de importación determinística (import_key) de los contactos ya presentes en la DB y la guarda. La clave la genera la función del registry ``contact_import_key`` a partir de la identidad estable del contacto (teléfonos normalizados > emails > nombre normalizado). 2. enrich — lee un .vcf (p.ej. el export de Google) y, para cada tarjeta, localiza el contacto existente por import_key (rápido) con fallback por teléfono compartido, y le AÑADE lo que falte: teléfonos y emails nuevos (en contacts) y direcciones (en la persona enlazada por note_path, que es de donde el push de agenda las propaga al móvil). Nunca pisa ni borra datos. La DB osint.duckdb es single-writer (la posee el service osint_db). Este tool abre una conexión de lectura para el plan y, solo con --apply, una conexión de escritura breve mientras el service está inactivo. Hacer backup antes de --apply. Uso: python3 import_contacts_vcf.py backfill --dry-run python3 import_contacts_vcf.py backfill --apply python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --dry-run python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --apply """ from __future__ import annotations import argparse import json import os import re import sys import duckdb # --- Acceso a la función del registry contact_import_key --------------------- _THIS = os.path.dirname(os.path.abspath(__file__)) _FN_DIR = os.path.normpath(os.path.join(_THIS, "..", "..", "..", "python", "functions")) if not os.path.isdir(os.path.join(_FN_DIR, "core")): _FN_DIR = os.path.expanduser("~/fn_registry/python/functions") sys.path.insert(0, _FN_DIR) from core.contact_import_key import contact_import_key # noqa: E402 DEFAULT_DB = os.path.join(_THIS, "..", "apps", "osint_db", "data", "osint.duckdb") # --- Normalización y parseo -------------------------------------------------- def norm_phone(p: str) -> str: """Últimos 9 dígitos de un teléfono (mismo criterio que la DB/registry).""" d = re.sub(r"\D", "", str(p or "")) return d[-9:] if len(d) >= 9 else d def _unfold(text: str) -> str: return re.sub(r"\r?\n[ \t]", "", text) def _adr_to_text(raw: str) -> str: """Dirección legible desde un valor ADR estructurado (7 componentes ';').""" parts = [p.strip() for p in raw.split(";")] nonempty = [p for p in parts if p] if len(parts) >= 3 and parts[2]: tail = [p for p in parts[3:] if p] return ", ".join([parts[2]] + tail) if tail else parts[2] return ", ".join(nonempty) def parse_vcf(path: str) -> list: """Parsea un .vcf a una lista de dicts con los campos de interés por tarjeta. Cada dict: {fn, tels (valores originales), emails, adrs (texto legible), bdays}. Los teléfonos/emails se devuelven en su forma original (no normalizada) para preservar el formato legible al añadirlos. """ text = _unfold(open(path, encoding="utf-8", errors="replace").read()) cards = [] for block in re.split(r"(?=BEGIN:VCARD)", text): if "BEGIN:VCARD" not in block: continue fn = re.search(r"^FN:(.+)$", block, re.M) tels = [t.strip() for t in re.findall(r"^TEL[^:]*:(.+)$", block, re.M) if t.strip()] emails = [e.strip() for e in re.findall(r"^EMAIL[^:]*:(.+)$", block, re.M) if "@" in e] adrs = [_adr_to_text(a) for a in re.findall(r"^ADR[^:]*:(.+)$", block, re.M)] adrs = [a for a in adrs if a] bdays = [b.strip() for b in re.findall(r"^BDAY[^:]*:(.+)$", block, re.M) if b.strip()] cards.append( { "fn": fn.group(1).strip() if fn else "", "tels": tels, "emails": emails, "adrs": adrs, "bdays": bdays, } ) return cards # --- Carga de la DB ---------------------------------------------------------- def load_contacts(con) -> list: """Filas de contacts como dicts con tels/emails decodificados.""" rows = con.execute( "SELECT uid, fn, tels, emails, note_path, import_key FROM contacts" ).fetchall() out = [] for uid, fn, tels, emails, note_path, import_key in rows: out.append( { "uid": uid, "fn": fn, "tels": json.loads(tels or "[]"), "emails": json.loads(emails or "[]"), "note_path": note_path, "import_key": import_key, } ) return out def key_of(contact: dict) -> str: return contact_import_key(contact.get("fn") or "", contact.get("tels") or [], contact.get("emails") or []) # --- backfill ---------------------------------------------------------------- def cmd_backfill(db_path: str, apply: bool) -> int: con = duckdb.connect(db_path, read_only=not apply) try: contacts = load_contacts(con) updates = [] collisions = {} for c in contacts: k = key_of(c) collisions.setdefault(k, []).append(c["uid"]) if c["import_key"] != k: updates.append((k, c["uid"])) dup = {k: v for k, v in collisions.items() if len(v) > 1} print(f"contactos: {len(contacts)}") print(f"import_key a (re)calcular: {len(updates)}") print(f"claves con colisión (>1 contacto): {len(dup)}") for k, uids in list(dup.items())[:5]: print(f" {k}: {uids}") if apply: for k, uid in updates: con.execute("UPDATE contacts SET import_key = ? WHERE uid = ?", [k, uid]) print(f"APLICADO: {len(updates)} import_key escritas") else: print("(dry-run: nada escrito; usa --apply)") return 0 finally: con.close() # --- enrich ------------------------------------------------------------------ def build_plan(contacts: list, cards: list) -> list: """Plan de enriquecimiento: por contacto existente, qué tel/email/adr añadir. Match por import_key (rápido) con fallback por teléfono normalizado compartido. Solo genera entradas con algún cambio real. """ by_key = {} by_phone = {} for c in contacts: k = c["import_key"] or key_of(c) by_key.setdefault(k, c) for t in c["tels"]: by_phone.setdefault(norm_phone(t), c) plan = [] for card in cards: ck = contact_import_key(card["fn"], card["tels"], card["emails"]) hit = by_key.get(ck) how = "import_key" if hit is None: for t in card["tels"]: hit = by_phone.get(norm_phone(t)) if hit: how = "phone" break if hit is None: continue db_tel_norm = {norm_phone(t) for t in hit["tels"]} db_em = {e.strip().lower() for e in hit["emails"]} add_tel = [t for t in card["tels"] if norm_phone(t) and norm_phone(t) not in db_tel_norm] add_em = [e for e in card["emails"] if e.strip().lower() not in db_em] # dedup interno preservando orden add_tel = list(dict.fromkeys(add_tel)) add_em = list(dict.fromkeys(add_em)) if add_tel or add_em or card["adrs"]: plan.append( { "uid": hit["uid"], "fn": hit["fn"], "note_path": hit["note_path"], "match": how, "add_tel": add_tel, "add_email": add_em, "add_adr": card["adrs"], "new_tels": hit["tels"] + add_tel, "new_emails": hit["emails"] + add_em, } ) return plan def _person_add_adrs(con, note_path: str, adrs: list) -> int: """Añade direcciones a la persona enlazada (sin duplicar). Devuelve cuántas.""" row = con.execute( "SELECT direcciones, direccion FROM persons WHERE note_path = ?", [note_path] ).fetchone() if row is None: return 0 current = json.loads(row[0] or "[]") if row[0] else [] if not current and row[1]: current = [row[1]] existing_norm = {re.sub(r"\s+", " ", x).strip().lower() for x in current} added = [a for a in adrs if re.sub(r"\s+", " ", a).strip().lower() not in existing_norm] if not added: return 0 merged = current + added con.execute( "UPDATE persons SET direcciones = ?, direccion = ? WHERE note_path = ?", [json.dumps(merged, ensure_ascii=False), merged[0], note_path], ) return len(added) def cmd_enrich(db_path: str, vcf: str, apply: bool) -> int: cards = parse_vcf(vcf) con = duckdb.connect(db_path, read_only=not apply) try: before = con.execute("SELECT count(*) FROM contacts").fetchone()[0] contacts = load_contacts(con) plan = build_plan(contacts, cards) n_tel = sum(len(p["add_tel"]) for p in plan) n_em = sum(len(p["add_email"]) for p in plan) n_adr_targets = sum(1 for p in plan if p["add_adr"] and p["note_path"]) print(f".vcf tarjetas: {len(cards)} contactos DB: {before}") print(f"contactos a enriquecer: {len(plan)} (+{n_tel} tel, +{n_em} email, " f"direcciones a {n_adr_targets} personas enlazadas)") for p in plan[:30]: ch = [] if p["add_tel"]: ch.append(f"+{len(p['add_tel'])}tel") if p["add_email"]: ch.append(f"+{len(p['add_email'])}email") if p["add_adr"]: ch.append("+adr" if p["note_path"] else "+adr(SIN persona)") print(f" [{p['match']:10}] {(p['fn'] or '?')[:34]:34} {' '.join(ch)}") if not apply: print("(dry-run: nada escrito; usa --apply)") return 0 adr_added = 0 for p in plan: con.execute( "UPDATE contacts SET tels = ?, emails = ? WHERE uid = ?", [ json.dumps(p["new_tels"], ensure_ascii=False), json.dumps(p["new_emails"], ensure_ascii=False), p["uid"], ], ) if p["add_adr"] and p["note_path"]: adr_added += _person_add_adrs(con, p["note_path"], p["add_adr"]) after = con.execute("SELECT count(*) FROM contacts").fetchone()[0] assert after == before, f"PÉRDIDA: contactos {before} -> {after}" print(f"APLICADO: {len(plan)} contactos enriquecidos, {adr_added} direcciones " f"añadidas a personas. Conteo intacto: {before} == {after}") return 0 finally: con.close() def main(argv=None) -> int: ap = argparse.ArgumentParser(description=__doc__.split("\n")[0]) sub = ap.add_subparsers(dest="cmd", required=True) pb = sub.add_parser("backfill", help="calcula y guarda import_key de los contactos") pb.add_argument("--db", default=DEFAULT_DB) pb.add_argument("--apply", action="store_true") pe = sub.add_parser("enrich", help="enriquece contactos desde un .vcf") pe.add_argument("--vcf", required=True) pe.add_argument("--db", default=DEFAULT_DB) pe.add_argument("--apply", action="store_true") args = ap.parse_args(argv) db = os.path.abspath(os.path.expanduser(args.db)) if not os.path.exists(db): print(f"ERROR: DB no existe: {db}", file=sys.stderr) return 2 if args.cmd == "backfill": return cmd_backfill(db, args.apply) if args.cmd == "enrich": vcf = os.path.expanduser(args.vcf) if not os.path.exists(vcf): print(f"ERROR: .vcf no existe: {vcf}", file=sys.stderr) return 2 return cmd_enrich(db, vcf, args.apply) return 1 if __name__ == "__main__": sys.exit(main())