diff --git a/tools/import_contacts_vcf.py b/tools/import_contacts_vcf.py new file mode 100644 index 0000000..d03bab8 --- /dev/null +++ b/tools/import_contacts_vcf.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python3 +"""Importación/enriquecimiento idempotente de contactos desde un .vcf. + +Dos operaciones, ambas idempotentes y NO destructivas (solo INSERT/UPDATE +aditivo, nunca DELETE): + + 1. backfill — calcula la clave de importación determinística (import_key) de + los contactos ya presentes en la DB y la guarda. La clave la + genera la función del registry ``contact_import_key`` a partir + de la identidad estable del contacto (teléfonos normalizados > + emails > nombre normalizado). + + 2. enrich — lee un .vcf (p.ej. el export de Google) y, para cada tarjeta, + localiza el contacto existente por import_key (rápido) con + fallback por teléfono compartido, y le AÑADE lo que falte: + teléfonos y emails nuevos (en contacts) y direcciones (en la + persona enlazada por note_path, que es de donde el push de + agenda las propaga al móvil). Nunca pisa ni borra datos. + +La DB osint.duckdb es single-writer (la posee el service osint_db). Este tool +abre una conexión de lectura para el plan y, solo con --apply, una conexión de +escritura breve mientras el service está inactivo. Hacer backup antes de --apply. + +Uso: + python3 import_contacts_vcf.py backfill --dry-run + python3 import_contacts_vcf.py backfill --apply + python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --dry-run + python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --apply +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import sys + +import duckdb + +# --- Acceso a la función del registry contact_import_key --------------------- +_THIS = os.path.dirname(os.path.abspath(__file__)) +_FN_DIR = os.path.normpath(os.path.join(_THIS, "..", "..", "..", "python", "functions")) +if not os.path.isdir(os.path.join(_FN_DIR, "core")): + _FN_DIR = os.path.expanduser("~/fn_registry/python/functions") +sys.path.insert(0, _FN_DIR) +from core.contact_import_key import contact_import_key # noqa: E402 + +DEFAULT_DB = os.path.join(_THIS, "..", "apps", "osint_db", "data", "osint.duckdb") + + +# --- Normalización y parseo -------------------------------------------------- + + +def norm_phone(p: str) -> str: + """Últimos 9 dígitos de un teléfono (mismo criterio que la DB/registry).""" + d = re.sub(r"\D", "", str(p or "")) + return d[-9:] if len(d) >= 9 else d + + +def _unfold(text: str) -> str: + return re.sub(r"\r?\n[ \t]", "", text) + + +def _adr_to_text(raw: str) -> str: + """Dirección legible desde un valor ADR estructurado (7 componentes ';').""" + parts = [p.strip() for p in raw.split(";")] + nonempty = [p for p in parts if p] + if len(parts) >= 3 and parts[2]: + tail = [p for p in parts[3:] if p] + return ", ".join([parts[2]] + tail) if tail else parts[2] + return ", ".join(nonempty) + + +def parse_vcf(path: str) -> list: + """Parsea un .vcf a una lista de dicts con los campos de interés por tarjeta. + + Cada dict: {fn, tels (valores originales), emails, adrs (texto legible), + bdays}. Los teléfonos/emails se devuelven en su forma original (no + normalizada) para preservar el formato legible al añadirlos. + """ + text = _unfold(open(path, encoding="utf-8", errors="replace").read()) + cards = [] + for block in re.split(r"(?=BEGIN:VCARD)", text): + if "BEGIN:VCARD" not in block: + continue + fn = re.search(r"^FN:(.+)$", block, re.M) + tels = [t.strip() for t in re.findall(r"^TEL[^:]*:(.+)$", block, re.M) if t.strip()] + emails = [e.strip() for e in re.findall(r"^EMAIL[^:]*:(.+)$", block, re.M) if "@" in e] + adrs = [_adr_to_text(a) for a in re.findall(r"^ADR[^:]*:(.+)$", block, re.M)] + adrs = [a for a in adrs if a] + bdays = [b.strip() for b in re.findall(r"^BDAY[^:]*:(.+)$", block, re.M) if b.strip()] + cards.append( + { + "fn": fn.group(1).strip() if fn else "", + "tels": tels, + "emails": emails, + "adrs": adrs, + "bdays": bdays, + } + ) + return cards + + +# --- Carga de la DB ---------------------------------------------------------- + + +def load_contacts(con) -> list: + """Filas de contacts como dicts con tels/emails decodificados.""" + rows = con.execute( + "SELECT uid, fn, tels, emails, note_path, import_key FROM contacts" + ).fetchall() + out = [] + for uid, fn, tels, emails, note_path, import_key in rows: + out.append( + { + "uid": uid, + "fn": fn, + "tels": json.loads(tels or "[]"), + "emails": json.loads(emails or "[]"), + "note_path": note_path, + "import_key": import_key, + } + ) + return out + + +def key_of(contact: dict) -> str: + return contact_import_key(contact.get("fn") or "", contact.get("tels") or [], contact.get("emails") or []) + + +# --- backfill ---------------------------------------------------------------- + + +def cmd_backfill(db_path: str, apply: bool) -> int: + con = duckdb.connect(db_path, read_only=not apply) + try: + contacts = load_contacts(con) + updates = [] + collisions = {} + for c in contacts: + k = key_of(c) + collisions.setdefault(k, []).append(c["uid"]) + if c["import_key"] != k: + updates.append((k, c["uid"])) + dup = {k: v for k, v in collisions.items() if len(v) > 1} + print(f"contactos: {len(contacts)}") + print(f"import_key a (re)calcular: {len(updates)}") + print(f"claves con colisión (>1 contacto): {len(dup)}") + for k, uids in list(dup.items())[:5]: + print(f" {k}: {uids}") + if apply: + for k, uid in updates: + con.execute("UPDATE contacts SET import_key = ? WHERE uid = ?", [k, uid]) + print(f"APLICADO: {len(updates)} import_key escritas") + else: + print("(dry-run: nada escrito; usa --apply)") + return 0 + finally: + con.close() + + +# --- enrich ------------------------------------------------------------------ + + +def build_plan(contacts: list, cards: list) -> list: + """Plan de enriquecimiento: por contacto existente, qué tel/email/adr añadir. + + Match por import_key (rápido) con fallback por teléfono normalizado + compartido. Solo genera entradas con algún cambio real. + """ + by_key = {} + by_phone = {} + for c in contacts: + k = c["import_key"] or key_of(c) + by_key.setdefault(k, c) + for t in c["tels"]: + by_phone.setdefault(norm_phone(t), c) + + plan = [] + for card in cards: + ck = contact_import_key(card["fn"], card["tels"], card["emails"]) + hit = by_key.get(ck) + how = "import_key" + if hit is None: + for t in card["tels"]: + hit = by_phone.get(norm_phone(t)) + if hit: + how = "phone" + break + if hit is None: + continue + db_tel_norm = {norm_phone(t) for t in hit["tels"]} + db_em = {e.strip().lower() for e in hit["emails"]} + add_tel = [t for t in card["tels"] if norm_phone(t) and norm_phone(t) not in db_tel_norm] + add_em = [e for e in card["emails"] if e.strip().lower() not in db_em] + # dedup interno preservando orden + add_tel = list(dict.fromkeys(add_tel)) + add_em = list(dict.fromkeys(add_em)) + if add_tel or add_em or card["adrs"]: + plan.append( + { + "uid": hit["uid"], + "fn": hit["fn"], + "note_path": hit["note_path"], + "match": how, + "add_tel": add_tel, + "add_email": add_em, + "add_adr": card["adrs"], + "new_tels": hit["tels"] + add_tel, + "new_emails": hit["emails"] + add_em, + } + ) + return plan + + +def _person_add_adrs(con, note_path: str, adrs: list) -> int: + """Añade direcciones a la persona enlazada (sin duplicar). Devuelve cuántas.""" + row = con.execute( + "SELECT direcciones, direccion FROM persons WHERE note_path = ?", [note_path] + ).fetchone() + if row is None: + return 0 + current = json.loads(row[0] or "[]") if row[0] else [] + if not current and row[1]: + current = [row[1]] + existing_norm = {re.sub(r"\s+", " ", x).strip().lower() for x in current} + added = [a for a in adrs if re.sub(r"\s+", " ", a).strip().lower() not in existing_norm] + if not added: + return 0 + merged = current + added + con.execute( + "UPDATE persons SET direcciones = ?, direccion = ? WHERE note_path = ?", + [json.dumps(merged, ensure_ascii=False), merged[0], note_path], + ) + return len(added) + + +def cmd_enrich(db_path: str, vcf: str, apply: bool) -> int: + cards = parse_vcf(vcf) + con = duckdb.connect(db_path, read_only=not apply) + try: + before = con.execute("SELECT count(*) FROM contacts").fetchone()[0] + contacts = load_contacts(con) + plan = build_plan(contacts, cards) + n_tel = sum(len(p["add_tel"]) for p in plan) + n_em = sum(len(p["add_email"]) for p in plan) + n_adr_targets = sum(1 for p in plan if p["add_adr"] and p["note_path"]) + print(f".vcf tarjetas: {len(cards)} contactos DB: {before}") + print(f"contactos a enriquecer: {len(plan)} (+{n_tel} tel, +{n_em} email, " + f"direcciones a {n_adr_targets} personas enlazadas)") + for p in plan[:30]: + ch = [] + if p["add_tel"]: + ch.append(f"+{len(p['add_tel'])}tel") + if p["add_email"]: + ch.append(f"+{len(p['add_email'])}email") + if p["add_adr"]: + ch.append("+adr" if p["note_path"] else "+adr(SIN persona)") + print(f" [{p['match']:10}] {(p['fn'] or '?')[:34]:34} {' '.join(ch)}") + if not apply: + print("(dry-run: nada escrito; usa --apply)") + return 0 + adr_added = 0 + for p in plan: + con.execute( + "UPDATE contacts SET tels = ?, emails = ? WHERE uid = ?", + [ + json.dumps(p["new_tels"], ensure_ascii=False), + json.dumps(p["new_emails"], ensure_ascii=False), + p["uid"], + ], + ) + if p["add_adr"] and p["note_path"]: + adr_added += _person_add_adrs(con, p["note_path"], p["add_adr"]) + after = con.execute("SELECT count(*) FROM contacts").fetchone()[0] + assert after == before, f"PÉRDIDA: contactos {before} -> {after}" + print(f"APLICADO: {len(plan)} contactos enriquecidos, {adr_added} direcciones " + f"añadidas a personas. Conteo intacto: {before} == {after}") + return 0 + finally: + con.close() + + +def main(argv=None) -> int: + ap = argparse.ArgumentParser(description=__doc__.split("\n")[0]) + sub = ap.add_subparsers(dest="cmd", required=True) + pb = sub.add_parser("backfill", help="calcula y guarda import_key de los contactos") + pb.add_argument("--db", default=DEFAULT_DB) + pb.add_argument("--apply", action="store_true") + pe = sub.add_parser("enrich", help="enriquece contactos desde un .vcf") + pe.add_argument("--vcf", required=True) + pe.add_argument("--db", default=DEFAULT_DB) + pe.add_argument("--apply", action="store_true") + args = ap.parse_args(argv) + db = os.path.abspath(os.path.expanduser(args.db)) + if not os.path.exists(db): + print(f"ERROR: DB no existe: {db}", file=sys.stderr) + return 2 + if args.cmd == "backfill": + return cmd_backfill(db, args.apply) + if args.cmd == "enrich": + vcf = os.path.expanduser(args.vcf) + if not os.path.exists(vcf): + print(f"ERROR: .vcf no existe: {vcf}", file=sys.stderr) + return 2 + return cmd_enrich(db, vcf, args.apply) + return 1 + + +if __name__ == "__main__": + sys.exit(main())