Files
osint/tools/import_contacts_vcf.py
T
egutierrez ec9b70a72a feat(tools): import_contacts_vcf — backfill de import_key + enriquecimiento idempotente desde .vcf
Backfill de la clave de importacion (contact_import_key del registry) de los
contactos existentes + enriquecimiento aditivo desde un .vcf de Google
(telefonos/emails faltantes en contacts, direcciones en la persona enlazada).
Match por import_key con fallback por telefono. No destructivo: solo INSERT/UPDATE,
con assert de conteo intacto. Recupero los campos que el import original descarto.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 11:48:03 +02:00

313 lines
12 KiB
Python

#!/usr/bin/env python3
"""Importación/enriquecimiento idempotente de contactos desde un .vcf.
Dos operaciones, ambas idempotentes y NO destructivas (solo INSERT/UPDATE
aditivo, nunca DELETE):
1. backfill — calcula la clave de importación determinística (import_key) de
los contactos ya presentes en la DB y la guarda. La clave la
genera la función del registry ``contact_import_key`` a partir
de la identidad estable del contacto (teléfonos normalizados >
emails > nombre normalizado).
2. enrich — lee un .vcf (p.ej. el export de Google) y, para cada tarjeta,
localiza el contacto existente por import_key (rápido) con
fallback por teléfono compartido, y le AÑADE lo que falte:
teléfonos y emails nuevos (en contacts) y direcciones (en la
persona enlazada por note_path, que es de donde el push de
agenda las propaga al móvil). Nunca pisa ni borra datos.
La DB osint.duckdb es single-writer (la posee el service osint_db). Este tool
abre una conexión de lectura para el plan y, solo con --apply, una conexión de
escritura breve mientras el service está inactivo. Hacer backup antes de --apply.
Uso:
python3 import_contacts_vcf.py backfill --dry-run
python3 import_contacts_vcf.py backfill --apply
python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --dry-run
python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --apply
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import duckdb
# --- Acceso a la función del registry contact_import_key ---------------------
_THIS = os.path.dirname(os.path.abspath(__file__))
_FN_DIR = os.path.normpath(os.path.join(_THIS, "..", "..", "..", "python", "functions"))
if not os.path.isdir(os.path.join(_FN_DIR, "core")):
_FN_DIR = os.path.expanduser("~/fn_registry/python/functions")
sys.path.insert(0, _FN_DIR)
from core.contact_import_key import contact_import_key # noqa: E402
DEFAULT_DB = os.path.join(_THIS, "..", "apps", "osint_db", "data", "osint.duckdb")
# --- Normalización y parseo --------------------------------------------------
def norm_phone(p: str) -> str:
"""Últimos 9 dígitos de un teléfono (mismo criterio que la DB/registry)."""
d = re.sub(r"\D", "", str(p or ""))
return d[-9:] if len(d) >= 9 else d
def _unfold(text: str) -> str:
return re.sub(r"\r?\n[ \t]", "", text)
def _adr_to_text(raw: str) -> str:
"""Dirección legible desde un valor ADR estructurado (7 componentes ';')."""
parts = [p.strip() for p in raw.split(";")]
nonempty = [p for p in parts if p]
if len(parts) >= 3 and parts[2]:
tail = [p for p in parts[3:] if p]
return ", ".join([parts[2]] + tail) if tail else parts[2]
return ", ".join(nonempty)
def parse_vcf(path: str) -> list:
"""Parsea un .vcf a una lista de dicts con los campos de interés por tarjeta.
Cada dict: {fn, tels (valores originales), emails, adrs (texto legible),
bdays}. Los teléfonos/emails se devuelven en su forma original (no
normalizada) para preservar el formato legible al añadirlos.
"""
text = _unfold(open(path, encoding="utf-8", errors="replace").read())
cards = []
for block in re.split(r"(?=BEGIN:VCARD)", text):
if "BEGIN:VCARD" not in block:
continue
fn = re.search(r"^FN:(.+)$", block, re.M)
tels = [t.strip() for t in re.findall(r"^TEL[^:]*:(.+)$", block, re.M) if t.strip()]
emails = [e.strip() for e in re.findall(r"^EMAIL[^:]*:(.+)$", block, re.M) if "@" in e]
adrs = [_adr_to_text(a) for a in re.findall(r"^ADR[^:]*:(.+)$", block, re.M)]
adrs = [a for a in adrs if a]
bdays = [b.strip() for b in re.findall(r"^BDAY[^:]*:(.+)$", block, re.M) if b.strip()]
cards.append(
{
"fn": fn.group(1).strip() if fn else "",
"tels": tels,
"emails": emails,
"adrs": adrs,
"bdays": bdays,
}
)
return cards
# --- Carga de la DB ----------------------------------------------------------
def load_contacts(con) -> list:
"""Filas de contacts como dicts con tels/emails decodificados."""
rows = con.execute(
"SELECT uid, fn, tels, emails, note_path, import_key FROM contacts"
).fetchall()
out = []
for uid, fn, tels, emails, note_path, import_key in rows:
out.append(
{
"uid": uid,
"fn": fn,
"tels": json.loads(tels or "[]"),
"emails": json.loads(emails or "[]"),
"note_path": note_path,
"import_key": import_key,
}
)
return out
def key_of(contact: dict) -> str:
return contact_import_key(contact.get("fn") or "", contact.get("tels") or [], contact.get("emails") or [])
# --- backfill ----------------------------------------------------------------
def cmd_backfill(db_path: str, apply: bool) -> int:
con = duckdb.connect(db_path, read_only=not apply)
try:
contacts = load_contacts(con)
updates = []
collisions = {}
for c in contacts:
k = key_of(c)
collisions.setdefault(k, []).append(c["uid"])
if c["import_key"] != k:
updates.append((k, c["uid"]))
dup = {k: v for k, v in collisions.items() if len(v) > 1}
print(f"contactos: {len(contacts)}")
print(f"import_key a (re)calcular: {len(updates)}")
print(f"claves con colisión (>1 contacto): {len(dup)}")
for k, uids in list(dup.items())[:5]:
print(f" {k}: {uids}")
if apply:
for k, uid in updates:
con.execute("UPDATE contacts SET import_key = ? WHERE uid = ?", [k, uid])
print(f"APLICADO: {len(updates)} import_key escritas")
else:
print("(dry-run: nada escrito; usa --apply)")
return 0
finally:
con.close()
# --- enrich ------------------------------------------------------------------
def build_plan(contacts: list, cards: list) -> list:
"""Plan de enriquecimiento: por contacto existente, qué tel/email/adr añadir.
Match por import_key (rápido) con fallback por teléfono normalizado
compartido. Solo genera entradas con algún cambio real.
"""
by_key = {}
by_phone = {}
for c in contacts:
k = c["import_key"] or key_of(c)
by_key.setdefault(k, c)
for t in c["tels"]:
by_phone.setdefault(norm_phone(t), c)
plan = []
for card in cards:
ck = contact_import_key(card["fn"], card["tels"], card["emails"])
hit = by_key.get(ck)
how = "import_key"
if hit is None:
for t in card["tels"]:
hit = by_phone.get(norm_phone(t))
if hit:
how = "phone"
break
if hit is None:
continue
db_tel_norm = {norm_phone(t) for t in hit["tels"]}
db_em = {e.strip().lower() for e in hit["emails"]}
add_tel = [t for t in card["tels"] if norm_phone(t) and norm_phone(t) not in db_tel_norm]
add_em = [e for e in card["emails"] if e.strip().lower() not in db_em]
# dedup interno preservando orden
add_tel = list(dict.fromkeys(add_tel))
add_em = list(dict.fromkeys(add_em))
if add_tel or add_em or card["adrs"]:
plan.append(
{
"uid": hit["uid"],
"fn": hit["fn"],
"note_path": hit["note_path"],
"match": how,
"add_tel": add_tel,
"add_email": add_em,
"add_adr": card["adrs"],
"new_tels": hit["tels"] + add_tel,
"new_emails": hit["emails"] + add_em,
}
)
return plan
def _person_add_adrs(con, note_path: str, adrs: list) -> int:
"""Añade direcciones a la persona enlazada (sin duplicar). Devuelve cuántas."""
row = con.execute(
"SELECT direcciones, direccion FROM persons WHERE note_path = ?", [note_path]
).fetchone()
if row is None:
return 0
current = json.loads(row[0] or "[]") if row[0] else []
if not current and row[1]:
current = [row[1]]
existing_norm = {re.sub(r"\s+", " ", x).strip().lower() for x in current}
added = [a for a in adrs if re.sub(r"\s+", " ", a).strip().lower() not in existing_norm]
if not added:
return 0
merged = current + added
con.execute(
"UPDATE persons SET direcciones = ?, direccion = ? WHERE note_path = ?",
[json.dumps(merged, ensure_ascii=False), merged[0], note_path],
)
return len(added)
def cmd_enrich(db_path: str, vcf: str, apply: bool) -> int:
cards = parse_vcf(vcf)
con = duckdb.connect(db_path, read_only=not apply)
try:
before = con.execute("SELECT count(*) FROM contacts").fetchone()[0]
contacts = load_contacts(con)
plan = build_plan(contacts, cards)
n_tel = sum(len(p["add_tel"]) for p in plan)
n_em = sum(len(p["add_email"]) for p in plan)
n_adr_targets = sum(1 for p in plan if p["add_adr"] and p["note_path"])
print(f".vcf tarjetas: {len(cards)} contactos DB: {before}")
print(f"contactos a enriquecer: {len(plan)} (+{n_tel} tel, +{n_em} email, "
f"direcciones a {n_adr_targets} personas enlazadas)")
for p in plan[:30]:
ch = []
if p["add_tel"]:
ch.append(f"+{len(p['add_tel'])}tel")
if p["add_email"]:
ch.append(f"+{len(p['add_email'])}email")
if p["add_adr"]:
ch.append("+adr" if p["note_path"] else "+adr(SIN persona)")
print(f" [{p['match']:10}] {(p['fn'] or '?')[:34]:34} {' '.join(ch)}")
if not apply:
print("(dry-run: nada escrito; usa --apply)")
return 0
adr_added = 0
for p in plan:
con.execute(
"UPDATE contacts SET tels = ?, emails = ? WHERE uid = ?",
[
json.dumps(p["new_tels"], ensure_ascii=False),
json.dumps(p["new_emails"], ensure_ascii=False),
p["uid"],
],
)
if p["add_adr"] and p["note_path"]:
adr_added += _person_add_adrs(con, p["note_path"], p["add_adr"])
after = con.execute("SELECT count(*) FROM contacts").fetchone()[0]
assert after == before, f"PÉRDIDA: contactos {before} -> {after}"
print(f"APLICADO: {len(plan)} contactos enriquecidos, {adr_added} direcciones "
f"añadidas a personas. Conteo intacto: {before} == {after}")
return 0
finally:
con.close()
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description=__doc__.split("\n")[0])
sub = ap.add_subparsers(dest="cmd", required=True)
pb = sub.add_parser("backfill", help="calcula y guarda import_key de los contactos")
pb.add_argument("--db", default=DEFAULT_DB)
pb.add_argument("--apply", action="store_true")
pe = sub.add_parser("enrich", help="enriquece contactos desde un .vcf")
pe.add_argument("--vcf", required=True)
pe.add_argument("--db", default=DEFAULT_DB)
pe.add_argument("--apply", action="store_true")
args = ap.parse_args(argv)
db = os.path.abspath(os.path.expanduser(args.db))
if not os.path.exists(db):
print(f"ERROR: DB no existe: {db}", file=sys.stderr)
return 2
if args.cmd == "backfill":
return cmd_backfill(db, args.apply)
if args.cmd == "enrich":
vcf = os.path.expanduser(args.vcf)
if not os.path.exists(vcf):
print(f"ERROR: .vcf no existe: {vcf}", file=sys.stderr)
return 2
return cmd_enrich(db, vcf, args.apply)
return 1
if __name__ == "__main__":
sys.exit(main())