ec9b70a72a
Backfill de la clave de importacion (contact_import_key del registry) de los contactos existentes + enriquecimiento aditivo desde un .vcf de Google (telefonos/emails faltantes en contacts, direcciones en la persona enlazada). Match por import_key con fallback por telefono. No destructivo: solo INSERT/UPDATE, con assert de conteo intacto. Recupero los campos que el import original descarto. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
313 lines
12 KiB
Python
313 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Importación/enriquecimiento idempotente de contactos desde un .vcf.
|
|
|
|
Dos operaciones, ambas idempotentes y NO destructivas (solo INSERT/UPDATE
|
|
aditivo, nunca DELETE):
|
|
|
|
1. backfill — calcula la clave de importación determinística (import_key) de
|
|
los contactos ya presentes en la DB y la guarda. La clave la
|
|
genera la función del registry ``contact_import_key`` a partir
|
|
de la identidad estable del contacto (teléfonos normalizados >
|
|
emails > nombre normalizado).
|
|
|
|
2. enrich — lee un .vcf (p.ej. el export de Google) y, para cada tarjeta,
|
|
localiza el contacto existente por import_key (rápido) con
|
|
fallback por teléfono compartido, y le AÑADE lo que falte:
|
|
teléfonos y emails nuevos (en contacts) y direcciones (en la
|
|
persona enlazada por note_path, que es de donde el push de
|
|
agenda las propaga al móvil). Nunca pisa ni borra datos.
|
|
|
|
La DB osint.duckdb es single-writer (la posee el service osint_db). Este tool
|
|
abre una conexión de lectura para el plan y, solo con --apply, una conexión de
|
|
escritura breve mientras el service está inactivo. Hacer backup antes de --apply.
|
|
|
|
Uso:
|
|
python3 import_contacts_vcf.py backfill --dry-run
|
|
python3 import_contacts_vcf.py backfill --apply
|
|
python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --dry-run
|
|
python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --apply
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
import duckdb
|
|
|
|
# --- Acceso a la función del registry contact_import_key ---------------------
|
|
_THIS = os.path.dirname(os.path.abspath(__file__))
|
|
_FN_DIR = os.path.normpath(os.path.join(_THIS, "..", "..", "..", "python", "functions"))
|
|
if not os.path.isdir(os.path.join(_FN_DIR, "core")):
|
|
_FN_DIR = os.path.expanduser("~/fn_registry/python/functions")
|
|
sys.path.insert(0, _FN_DIR)
|
|
from core.contact_import_key import contact_import_key # noqa: E402
|
|
|
|
DEFAULT_DB = os.path.join(_THIS, "..", "apps", "osint_db", "data", "osint.duckdb")
|
|
|
|
|
|
# --- Normalización y parseo --------------------------------------------------
|
|
|
|
|
|
def norm_phone(p: str) -> str:
|
|
"""Últimos 9 dígitos de un teléfono (mismo criterio que la DB/registry)."""
|
|
d = re.sub(r"\D", "", str(p or ""))
|
|
return d[-9:] if len(d) >= 9 else d
|
|
|
|
|
|
def _unfold(text: str) -> str:
|
|
return re.sub(r"\r?\n[ \t]", "", text)
|
|
|
|
|
|
def _adr_to_text(raw: str) -> str:
|
|
"""Dirección legible desde un valor ADR estructurado (7 componentes ';')."""
|
|
parts = [p.strip() for p in raw.split(";")]
|
|
nonempty = [p for p in parts if p]
|
|
if len(parts) >= 3 and parts[2]:
|
|
tail = [p for p in parts[3:] if p]
|
|
return ", ".join([parts[2]] + tail) if tail else parts[2]
|
|
return ", ".join(nonempty)
|
|
|
|
|
|
def parse_vcf(path: str) -> list:
|
|
"""Parsea un .vcf a una lista de dicts con los campos de interés por tarjeta.
|
|
|
|
Cada dict: {fn, tels (valores originales), emails, adrs (texto legible),
|
|
bdays}. Los teléfonos/emails se devuelven en su forma original (no
|
|
normalizada) para preservar el formato legible al añadirlos.
|
|
"""
|
|
text = _unfold(open(path, encoding="utf-8", errors="replace").read())
|
|
cards = []
|
|
for block in re.split(r"(?=BEGIN:VCARD)", text):
|
|
if "BEGIN:VCARD" not in block:
|
|
continue
|
|
fn = re.search(r"^FN:(.+)$", block, re.M)
|
|
tels = [t.strip() for t in re.findall(r"^TEL[^:]*:(.+)$", block, re.M) if t.strip()]
|
|
emails = [e.strip() for e in re.findall(r"^EMAIL[^:]*:(.+)$", block, re.M) if "@" in e]
|
|
adrs = [_adr_to_text(a) for a in re.findall(r"^ADR[^:]*:(.+)$", block, re.M)]
|
|
adrs = [a for a in adrs if a]
|
|
bdays = [b.strip() for b in re.findall(r"^BDAY[^:]*:(.+)$", block, re.M) if b.strip()]
|
|
cards.append(
|
|
{
|
|
"fn": fn.group(1).strip() if fn else "",
|
|
"tels": tels,
|
|
"emails": emails,
|
|
"adrs": adrs,
|
|
"bdays": bdays,
|
|
}
|
|
)
|
|
return cards
|
|
|
|
|
|
# --- Carga de la DB ----------------------------------------------------------
|
|
|
|
|
|
def load_contacts(con) -> list:
|
|
"""Filas de contacts como dicts con tels/emails decodificados."""
|
|
rows = con.execute(
|
|
"SELECT uid, fn, tels, emails, note_path, import_key FROM contacts"
|
|
).fetchall()
|
|
out = []
|
|
for uid, fn, tels, emails, note_path, import_key in rows:
|
|
out.append(
|
|
{
|
|
"uid": uid,
|
|
"fn": fn,
|
|
"tels": json.loads(tels or "[]"),
|
|
"emails": json.loads(emails or "[]"),
|
|
"note_path": note_path,
|
|
"import_key": import_key,
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
def key_of(contact: dict) -> str:
|
|
return contact_import_key(contact.get("fn") or "", contact.get("tels") or [], contact.get("emails") or [])
|
|
|
|
|
|
# --- backfill ----------------------------------------------------------------
|
|
|
|
|
|
def cmd_backfill(db_path: str, apply: bool) -> int:
|
|
con = duckdb.connect(db_path, read_only=not apply)
|
|
try:
|
|
contacts = load_contacts(con)
|
|
updates = []
|
|
collisions = {}
|
|
for c in contacts:
|
|
k = key_of(c)
|
|
collisions.setdefault(k, []).append(c["uid"])
|
|
if c["import_key"] != k:
|
|
updates.append((k, c["uid"]))
|
|
dup = {k: v for k, v in collisions.items() if len(v) > 1}
|
|
print(f"contactos: {len(contacts)}")
|
|
print(f"import_key a (re)calcular: {len(updates)}")
|
|
print(f"claves con colisión (>1 contacto): {len(dup)}")
|
|
for k, uids in list(dup.items())[:5]:
|
|
print(f" {k}: {uids}")
|
|
if apply:
|
|
for k, uid in updates:
|
|
con.execute("UPDATE contacts SET import_key = ? WHERE uid = ?", [k, uid])
|
|
print(f"APLICADO: {len(updates)} import_key escritas")
|
|
else:
|
|
print("(dry-run: nada escrito; usa --apply)")
|
|
return 0
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
# --- enrich ------------------------------------------------------------------
|
|
|
|
|
|
def build_plan(contacts: list, cards: list) -> list:
|
|
"""Plan de enriquecimiento: por contacto existente, qué tel/email/adr añadir.
|
|
|
|
Match por import_key (rápido) con fallback por teléfono normalizado
|
|
compartido. Solo genera entradas con algún cambio real.
|
|
"""
|
|
by_key = {}
|
|
by_phone = {}
|
|
for c in contacts:
|
|
k = c["import_key"] or key_of(c)
|
|
by_key.setdefault(k, c)
|
|
for t in c["tels"]:
|
|
by_phone.setdefault(norm_phone(t), c)
|
|
|
|
plan = []
|
|
for card in cards:
|
|
ck = contact_import_key(card["fn"], card["tels"], card["emails"])
|
|
hit = by_key.get(ck)
|
|
how = "import_key"
|
|
if hit is None:
|
|
for t in card["tels"]:
|
|
hit = by_phone.get(norm_phone(t))
|
|
if hit:
|
|
how = "phone"
|
|
break
|
|
if hit is None:
|
|
continue
|
|
db_tel_norm = {norm_phone(t) for t in hit["tels"]}
|
|
db_em = {e.strip().lower() for e in hit["emails"]}
|
|
add_tel = [t for t in card["tels"] if norm_phone(t) and norm_phone(t) not in db_tel_norm]
|
|
add_em = [e for e in card["emails"] if e.strip().lower() not in db_em]
|
|
# dedup interno preservando orden
|
|
add_tel = list(dict.fromkeys(add_tel))
|
|
add_em = list(dict.fromkeys(add_em))
|
|
if add_tel or add_em or card["adrs"]:
|
|
plan.append(
|
|
{
|
|
"uid": hit["uid"],
|
|
"fn": hit["fn"],
|
|
"note_path": hit["note_path"],
|
|
"match": how,
|
|
"add_tel": add_tel,
|
|
"add_email": add_em,
|
|
"add_adr": card["adrs"],
|
|
"new_tels": hit["tels"] + add_tel,
|
|
"new_emails": hit["emails"] + add_em,
|
|
}
|
|
)
|
|
return plan
|
|
|
|
|
|
def _person_add_adrs(con, note_path: str, adrs: list) -> int:
|
|
"""Añade direcciones a la persona enlazada (sin duplicar). Devuelve cuántas."""
|
|
row = con.execute(
|
|
"SELECT direcciones, direccion FROM persons WHERE note_path = ?", [note_path]
|
|
).fetchone()
|
|
if row is None:
|
|
return 0
|
|
current = json.loads(row[0] or "[]") if row[0] else []
|
|
if not current and row[1]:
|
|
current = [row[1]]
|
|
existing_norm = {re.sub(r"\s+", " ", x).strip().lower() for x in current}
|
|
added = [a for a in adrs if re.sub(r"\s+", " ", a).strip().lower() not in existing_norm]
|
|
if not added:
|
|
return 0
|
|
merged = current + added
|
|
con.execute(
|
|
"UPDATE persons SET direcciones = ?, direccion = ? WHERE note_path = ?",
|
|
[json.dumps(merged, ensure_ascii=False), merged[0], note_path],
|
|
)
|
|
return len(added)
|
|
|
|
|
|
def cmd_enrich(db_path: str, vcf: str, apply: bool) -> int:
|
|
cards = parse_vcf(vcf)
|
|
con = duckdb.connect(db_path, read_only=not apply)
|
|
try:
|
|
before = con.execute("SELECT count(*) FROM contacts").fetchone()[0]
|
|
contacts = load_contacts(con)
|
|
plan = build_plan(contacts, cards)
|
|
n_tel = sum(len(p["add_tel"]) for p in plan)
|
|
n_em = sum(len(p["add_email"]) for p in plan)
|
|
n_adr_targets = sum(1 for p in plan if p["add_adr"] and p["note_path"])
|
|
print(f".vcf tarjetas: {len(cards)} contactos DB: {before}")
|
|
print(f"contactos a enriquecer: {len(plan)} (+{n_tel} tel, +{n_em} email, "
|
|
f"direcciones a {n_adr_targets} personas enlazadas)")
|
|
for p in plan[:30]:
|
|
ch = []
|
|
if p["add_tel"]:
|
|
ch.append(f"+{len(p['add_tel'])}tel")
|
|
if p["add_email"]:
|
|
ch.append(f"+{len(p['add_email'])}email")
|
|
if p["add_adr"]:
|
|
ch.append("+adr" if p["note_path"] else "+adr(SIN persona)")
|
|
print(f" [{p['match']:10}] {(p['fn'] or '?')[:34]:34} {' '.join(ch)}")
|
|
if not apply:
|
|
print("(dry-run: nada escrito; usa --apply)")
|
|
return 0
|
|
adr_added = 0
|
|
for p in plan:
|
|
con.execute(
|
|
"UPDATE contacts SET tels = ?, emails = ? WHERE uid = ?",
|
|
[
|
|
json.dumps(p["new_tels"], ensure_ascii=False),
|
|
json.dumps(p["new_emails"], ensure_ascii=False),
|
|
p["uid"],
|
|
],
|
|
)
|
|
if p["add_adr"] and p["note_path"]:
|
|
adr_added += _person_add_adrs(con, p["note_path"], p["add_adr"])
|
|
after = con.execute("SELECT count(*) FROM contacts").fetchone()[0]
|
|
assert after == before, f"PÉRDIDA: contactos {before} -> {after}"
|
|
print(f"APLICADO: {len(plan)} contactos enriquecidos, {adr_added} direcciones "
|
|
f"añadidas a personas. Conteo intacto: {before} == {after}")
|
|
return 0
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def main(argv=None) -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__.split("\n")[0])
|
|
sub = ap.add_subparsers(dest="cmd", required=True)
|
|
pb = sub.add_parser("backfill", help="calcula y guarda import_key de los contactos")
|
|
pb.add_argument("--db", default=DEFAULT_DB)
|
|
pb.add_argument("--apply", action="store_true")
|
|
pe = sub.add_parser("enrich", help="enriquece contactos desde un .vcf")
|
|
pe.add_argument("--vcf", required=True)
|
|
pe.add_argument("--db", default=DEFAULT_DB)
|
|
pe.add_argument("--apply", action="store_true")
|
|
args = ap.parse_args(argv)
|
|
db = os.path.abspath(os.path.expanduser(args.db))
|
|
if not os.path.exists(db):
|
|
print(f"ERROR: DB no existe: {db}", file=sys.stderr)
|
|
return 2
|
|
if args.cmd == "backfill":
|
|
return cmd_backfill(db, args.apply)
|
|
if args.cmd == "enrich":
|
|
vcf = os.path.expanduser(args.vcf)
|
|
if not os.path.exists(vcf):
|
|
print(f"ERROR: .vcf no existe: {vcf}", file=sys.stderr)
|
|
return 2
|
|
return cmd_enrich(db, vcf, args.apply)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|