cb7f6e92a0
- project.md - reports/ - tools/import_google_contacts.py Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
871 lines
34 KiB
Python
871 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""Importa contactos de Google (vCard export) al vault OSINT como fichas de
|
|
persona y organizacion, clasificando con LLM y creando relaciones
|
|
persona <-> organizacion.
|
|
|
|
Flujo:
|
|
1. Parsear el .vcf con split_vcards (grupo `dav`). Extraer FN, TEL*, EMAIL*, ORG, TITLE.
|
|
2. Filtrar ruido/servicio (numeros de operadora, recordatorios, sin >=3 letras).
|
|
3. Clasificar con ask_llm (grupo `claude-direct`) por lotes de ~40, pidiendo JSON estricto.
|
|
4. Dedup contra personas/*.md existentes (match por slug exacto o subconjunto de tokens).
|
|
5. Generar fichas siguiendo projects/osint/CONVENTIONS.md (frontmatter canonico 3b).
|
|
|
|
Modos:
|
|
--dry-run (DEFAULT) no escribe nada; imprime resumen + muestra de 15.
|
|
--apply escribe de verdad usando funciones del grupo `obsidian`.
|
|
|
|
Tool de PROYECTO (vive en projects/osint/tools/). NO es funcion del registry,
|
|
NO se indexa. Idempotente: re-ejecutar no duplica (dedup por slug).
|
|
"""
|
|
import sys
|
|
import os
|
|
import re
|
|
import json
|
|
import argparse
|
|
import datetime
|
|
|
|
sys.path.insert(0, "/home/enmanuel/fn_registry/python/functions")
|
|
|
|
from infra.split_vcards import split_vcards # noqa: E402
|
|
from core.ask_llm import ask_llm # noqa: E402
|
|
from obsidian import ( # noqa: E402
|
|
slugify_obsidian_name,
|
|
list_obsidian_notes,
|
|
read_obsidian_note,
|
|
create_obsidian_note,
|
|
update_obsidian_note,
|
|
)
|
|
|
|
OSINT = "/home/enmanuel/Obsidian/osint"
|
|
VCF_PATH = "/home/enmanuel/Downloads/contacts.vcf"
|
|
FUENTE = "Google Contacts export 2026-06-11"
|
|
LLM_MODEL = "claude-haiku-4-5-20251001"
|
|
BATCH_SIZE = 40
|
|
|
|
# Topónimos locales que el LLM tiende a confundir con organizaciones cuando
|
|
# vienen como sufijo del nombre del contacto (p.ej. "Adrian Quinto Almachar").
|
|
# Un lugar NUNCA se convierte en organizacion ni en relacion. (slugificados)
|
|
_PLACE_BLOCKLIST = {
|
|
"almachar", "barcelona", "madrid", "malaga", "velez-malaga", "velez",
|
|
"aliaguilla", "chamana", "axarquia", "torre-del-mar", "torrox", "nerja",
|
|
"comares", "benamargosa", "moclinejo", "iznate", "cutar",
|
|
}
|
|
|
|
# Frontmatter canonico de persona (CONVENTIONS.md seccion 3b), en orden.
|
|
PERSON_CANON = [
|
|
"tipo", "nombre", "slug", "aliases", "sexo", "fecha_nacimiento", "dni",
|
|
"telefono", "email", "direccion", "pais", "relaciones", "contexto",
|
|
"fuente", "tags",
|
|
]
|
|
|
|
# Frontmatter de organizacion (CONVENTIONS.md secciones 6 y 3b adaptado).
|
|
ORG_CANON = [
|
|
"tipo", "nombre", "slug", "aliases", "telefono", "email", "direccion",
|
|
"pais", "relaciones", "contexto", "fuente", "tags",
|
|
]
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# 1. Parseo de vCards
|
|
# --------------------------------------------------------------------------
|
|
|
|
def _unfold(vcard_text: str) -> str:
|
|
"""Deshace el folding de lineas de vCard (continuacion con espacio/tab)."""
|
|
return re.sub(r"\r?\n[ \t]", "", vcard_text)
|
|
|
|
|
|
def _vcard_values(vcard_text: str, prop: str) -> list:
|
|
"""Devuelve todos los valores de una propiedad (p.ej. TEL, EMAIL).
|
|
|
|
Acepta la forma `PROP;PARAMS:valor` y `PROP:valor`. Decodifica escapes
|
|
simples de vCard (\\, , \\;, \\n) en el valor.
|
|
"""
|
|
vals = []
|
|
for line in vcard_text.splitlines():
|
|
m = re.match(rf"^(?:item\d+\.)?{prop}(?:;[^:]*)?:(.*)$", line, re.IGNORECASE)
|
|
if m:
|
|
v = m.group(1).strip()
|
|
v = v.replace("\\,", ",").replace("\\;", ";").replace("\\n", " ").replace("\\\\", "\\")
|
|
v = v.strip()
|
|
if v:
|
|
vals.append(v)
|
|
return vals
|
|
|
|
|
|
def parse_vcard(vcard_text: str) -> dict:
|
|
"""Extrae FN, todos los TEL, todos los EMAIL, ORG y TITLE de una vCard."""
|
|
txt = _unfold(vcard_text)
|
|
fn_vals = _vcard_values(txt, "FN")
|
|
org_vals = _vcard_values(txt, "ORG")
|
|
org = ""
|
|
if org_vals:
|
|
# ORG viene como `Empresa;Departamento`. Quitar componentes vacios.
|
|
org = " ".join(p.strip() for p in org_vals[0].split(";") if p.strip())
|
|
return {
|
|
"fn": fn_vals[0] if fn_vals else "",
|
|
"tels": _dedup_keep_order(_vcard_values(txt, "TEL")),
|
|
"emails": _dedup_keep_order(_vcard_values(txt, "EMAIL")),
|
|
"org": org,
|
|
"title": (_vcard_values(txt, "TITLE") or [""])[0],
|
|
}
|
|
|
|
|
|
def _dedup_keep_order(items: list) -> list:
|
|
seen, out = set(), []
|
|
for it in items:
|
|
key = it.strip().lower()
|
|
if key and key not in seen:
|
|
seen.add(key)
|
|
out.append(it.strip())
|
|
return out
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# 2. Filtro de ruido/servicio
|
|
# --------------------------------------------------------------------------
|
|
|
|
# Patrones de nombre que delatan numeros de servicio / recordatorios.
|
|
_SERVICE_NAME_RE = re.compile(
|
|
r"^\*" # empieza por *
|
|
r"|^\d{3,5}\b" # codigo corto al inicio (1200, 22122)
|
|
r"|att\.?\s*cliente"
|
|
r"|buz[oó]n|buzon"
|
|
r"|voicemail|voice\s*mail"
|
|
r"|gestiona|consulta\b|informaci[oó]n|recarga"
|
|
r"|servicio\s+al\s+cliente",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def is_service(name: str) -> bool:
|
|
"""True si el contacto es ruido de operadora / recordatorio / sin nombre real."""
|
|
n = (name or "").strip()
|
|
if not n:
|
|
return True
|
|
if _SERVICE_NAME_RE.search(n):
|
|
return True
|
|
# menos de 3 letras = no es un nombre humano ni de negocio real
|
|
letters = re.sub(r"[^A-Za-zÀ-ÿñÑ]", "", n)
|
|
if len(letters) < 3:
|
|
return True
|
|
return False
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# 4. Dedup contra fichas existentes
|
|
# --------------------------------------------------------------------------
|
|
|
|
# Tokens demasiado comunes para fundamentar un match por subconjunto.
|
|
_STOP_TOKENS = {"de", "del", "la", "las", "el", "los", "y", "san", "da", "do"}
|
|
|
|
# Nombres de pila muy comunes: compartir SOLO estos no basta para deducir que
|
|
# dos contactos son la misma persona (hay decenas de "Antonio", "Maria", "Jose").
|
|
# Un match por subconjunto exige al menos un token distintivo fuera de esta lista
|
|
# (tipicamente un apellido).
|
|
_COMMON_GIVEN = {
|
|
"antonio", "jose", "juan", "maria", "manuel", "carlos", "francisco",
|
|
"javier", "david", "miguel", "angel", "luis", "pedro", "pablo", "rafael",
|
|
"fernando", "sergio", "alberto", "alejandro", "daniel", "jesus", "marcos",
|
|
"ana", "carmen", "cristina", "laura", "marta", "lucia", "elena", "sara",
|
|
"paula", "raquel", "gema", "lorena", "natalia", "silvia", "rosa", "isabel",
|
|
"dani", "javi", "manolo", "paco", "pepe", "alex", "nacho", "mari", "lola",
|
|
}
|
|
|
|
|
|
def _name_tokens(name: str) -> set:
|
|
slug = slugify_obsidian_name(name or "")
|
|
return {t for t in slug.split("-") if t and t not in _STOP_TOKENS}
|
|
|
|
|
|
def load_existing_persons() -> list:
|
|
"""Carga (slug, nombre, token_set) de cada ficha de persona del vault."""
|
|
out = []
|
|
for p in list_obsidian_notes(OSINT, subfolder="personas"):
|
|
base = os.path.splitext(os.path.basename(p))[0]
|
|
if base.startswith("_"):
|
|
continue
|
|
try:
|
|
fm = read_obsidian_note(p)["frontmatter"]
|
|
except Exception:
|
|
fm = {}
|
|
nombre = fm.get("nombre") or base.replace("-", " ")
|
|
out.append({
|
|
"slug": base,
|
|
"path": p,
|
|
"nombre": nombre,
|
|
"tokens": _name_tokens(nombre) or _name_tokens(base),
|
|
})
|
|
return out
|
|
|
|
|
|
def load_existing_orgs() -> dict:
|
|
"""Mapa slug -> path de las organizaciones existentes."""
|
|
out = {}
|
|
for p in list_obsidian_notes(OSINT, subfolder="organizaciones"):
|
|
base = os.path.splitext(os.path.basename(p))[0]
|
|
if base.startswith("_"):
|
|
continue
|
|
out[base] = p
|
|
return out
|
|
|
|
|
|
def _distinctive(tokens: set) -> bool:
|
|
"""True si el conjunto de tokens incluye al menos uno distintivo (apellido):
|
|
longitud >=4 y fuera de los nombres de pila ultra-comunes."""
|
|
return any(len(t) >= 4 and t not in _COMMON_GIVEN for t in tokens)
|
|
|
|
|
|
def match_existing_person(name: str, existing: list):
|
|
"""Busca una persona existente que case con `name`. Conservador a proposito.
|
|
|
|
Se considera la MISMA persona solo si:
|
|
- slug exacto, o
|
|
- los tokens del nombre de contacto son subconjunto de los de una ficha
|
|
existente (forma menos especifica del mismo nombre), compartiendo
|
|
>=2 tokens, ambos con >=2 tokens, y con al menos un token distintivo
|
|
(apellido) en el solape.
|
|
|
|
Esto cubre el caso del estandar ("Manuel Gutierrez" subset de "Manuel
|
|
Gutierrez Gamez") y RECHAZA fusiones erroneas por nombre de pila comun
|
|
("Antonio", "Maria") o por dos given names compartidos ("Maria Jose" vs
|
|
"Jose Maria ..."). Ante la duda, NO casa: se prefiere crear una ficha
|
|
nueva (un duplicado es recuperable; una fusion erronea corrompe una
|
|
investigacion existente).
|
|
"""
|
|
cand_slug = slugify_obsidian_name(name)
|
|
cand_tokens = _name_tokens(name)
|
|
if not cand_tokens:
|
|
return None
|
|
for ex in existing:
|
|
if ex["slug"] == cand_slug and cand_slug:
|
|
return ex
|
|
for ex in existing:
|
|
ex_tokens = ex["tokens"]
|
|
if len(cand_tokens) < 2 or len(ex_tokens) < 2:
|
|
continue
|
|
if not (cand_tokens <= ex_tokens):
|
|
continue
|
|
shared = cand_tokens & ex_tokens
|
|
if len(shared) >= 2 and _distinctive(shared):
|
|
return ex
|
|
return None
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# 3. Clasificacion LLM por lotes
|
|
# --------------------------------------------------------------------------
|
|
|
|
_LLM_SYSTEM = (
|
|
"Eres un clasificador de contactos telefonicos en espanol. Devuelves SOLO "
|
|
"un array JSON valido, sin texto alrededor, sin markdown."
|
|
)
|
|
|
|
_LLM_INSTRUCTIONS = """Clasifica cada contacto de la lista. Devuelve un array JSON con un objeto por contacto, en el MISMO orden, con estos campos:
|
|
{"i": <indice entero>, "tipo": "persona"|"organizacion"|"servicio", "persona_nombre": <string|null>, "org_nombre": <string|null>, "rol": <string|null>, "sexo": "hombre"|"mujer"|null}
|
|
|
|
Reglas:
|
|
- tipo="persona" si el contacto es un individuo (nombre de pila + apellidos).
|
|
- tipo="organizacion" si es un negocio, empresa, comercio o servicio (fruteria, autoescuela, seguros, banco, taller, tienda, restaurante, clinica...).
|
|
- tipo="servicio" si es un numero de operadora, recordatorio o automatismo (raro: ya filtramos la mayoria).
|
|
- Si el contacto MEZCLA persona y organizacion, rellena persona_nombre Y org_nombre Y rol.
|
|
Ej: "Emilio Villalba Gestor Orange" -> persona_nombre="Emilio Villalba", org_nombre="Orange", rol="gestor".
|
|
Ej: "Abdul Fruteria Velez" -> tipo="organizacion", org_nombre="Fruteria Velez", persona_nombre="Abdul", rol="dueno".
|
|
- persona_nombre: nombre LIMPIO de la persona (quita el rol y la empresa). null si no hay persona.
|
|
- org_nombre: nombre del negocio/empresa asociado. null si no hay.
|
|
- rol: gestor, comercial, dueno, empleado, contacto... null si no aplica.
|
|
- sexo: deduce del nombre de pila ("hombre"|"mujer"); null si ambiguo o no hay persona.
|
|
- Limpia emojis y typos al inferir, pero NO inventes datos.
|
|
|
|
Contactos:
|
|
"""
|
|
|
|
|
|
def _extract_json_array(text: str):
|
|
"""Extrae el primer array JSON `[...]` de una respuesta, tolerando texto alrededor."""
|
|
if not text:
|
|
return None
|
|
# intento directo
|
|
try:
|
|
v = json.loads(text.strip())
|
|
if isinstance(v, list):
|
|
return v
|
|
except Exception:
|
|
pass
|
|
# buscar el primer '[' y casar corchetes balanceados
|
|
start = text.find("[")
|
|
if start == -1:
|
|
return None
|
|
depth = 0
|
|
in_str = False
|
|
esc = False
|
|
for i in range(start, len(text)):
|
|
c = text[i]
|
|
if in_str:
|
|
if esc:
|
|
esc = False
|
|
elif c == "\\":
|
|
esc = True
|
|
elif c == '"':
|
|
in_str = False
|
|
continue
|
|
if c == '"':
|
|
in_str = True
|
|
elif c == "[":
|
|
depth += 1
|
|
elif c == "]":
|
|
depth -= 1
|
|
if depth == 0:
|
|
chunk = text[start:i + 1]
|
|
try:
|
|
v = json.loads(chunk)
|
|
return v if isinstance(v, list) else None
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def classify_batch(batch: list, llm_calls: list) -> list:
|
|
"""Clasifica un lote de contactos. batch = [(local_idx, contact_dict), ...].
|
|
|
|
Devuelve lista de dicts de clasificacion alineados por 'i' (local_idx).
|
|
Reintenta una vez si el parseo falla; si vuelve a fallar, marca todos como
|
|
persona por defecto y lo anota en llm_calls.
|
|
"""
|
|
lines = []
|
|
for idx, c in batch:
|
|
extra = []
|
|
if c["org"]:
|
|
extra.append(f"ORG={c['org']}")
|
|
if c["title"]:
|
|
extra.append(f"TITLE={c['title']}")
|
|
suffix = f" [{'; '.join(extra)}]" if extra else ""
|
|
lines.append(f"{idx}. {c['fn']}{suffix}")
|
|
prompt = _LLM_INSTRUCTIONS + "\n".join(lines)
|
|
|
|
for attempt in (1, 2):
|
|
try:
|
|
resp = ask_llm(prompt, model=LLM_MODEL, system=_LLM_SYSTEM,
|
|
max_tokens=4096, echo=False)
|
|
except Exception as e: # noqa: BLE001
|
|
llm_calls.append({"size": len(batch), "ok": False, "error": f"{type(e).__name__}: {e}", "attempt": attempt})
|
|
resp = ""
|
|
if not resp:
|
|
llm_calls.append({"size": len(batch), "ok": False, "error": "empty response (auth/token?)", "attempt": attempt})
|
|
if attempt == 2:
|
|
break
|
|
continue
|
|
arr = _extract_json_array(resp)
|
|
if arr is not None:
|
|
llm_calls.append({"size": len(batch), "ok": True, "attempt": attempt})
|
|
return arr
|
|
llm_calls.append({"size": len(batch), "ok": False, "error": "json parse failed", "attempt": attempt})
|
|
|
|
# fallback: todo persona
|
|
return [{"i": idx, "tipo": "persona", "persona_nombre": c["fn"],
|
|
"org_nombre": None, "rol": None, "sexo": None,
|
|
"_fallback": True} for idx, c in batch]
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# 5. Construccion de fichas (planificacion)
|
|
# --------------------------------------------------------------------------
|
|
|
|
def _ordered_frontmatter(values: dict, canon: list) -> dict:
|
|
"""Devuelve un dict ordenado segun `canon`, con extras al final."""
|
|
fm = {}
|
|
for k in canon:
|
|
fm[k] = values.get(k)
|
|
for k, v in values.items():
|
|
if k not in fm:
|
|
fm[k] = v
|
|
return fm
|
|
|
|
|
|
def _contact_block(tels: list, emails: list) -> str:
|
|
"""Seccion ## Contacto con los telefonos/emails extra (mas alla del primero)."""
|
|
lines = []
|
|
extra_tel = tels[1:]
|
|
extra_mail = emails[1:]
|
|
if extra_tel or extra_mail:
|
|
lines.append("## Contacto")
|
|
lines.append("")
|
|
for t in extra_tel:
|
|
lines.append(f"- telefono: {t}")
|
|
for e in extra_mail:
|
|
lines.append(f"- email: {e}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def plan_person(name, sexo, tels, emails, org_slug, org_nombre, rol,
|
|
existing_persons, used_person_slugs):
|
|
"""Planifica crear o enriquecer una persona. Devuelve dict de plan."""
|
|
match = match_existing_person(name, existing_persons)
|
|
nombre = name.strip()
|
|
if match:
|
|
return {
|
|
"action": "enrich_person",
|
|
"slug": match["slug"],
|
|
"path": match["path"],
|
|
"nombre_existente": match["nombre"],
|
|
"alias_add": nombre,
|
|
"tel": tels[0] if tels else None,
|
|
"email": emails[0] if emails else None,
|
|
"tels": tels,
|
|
"emails": emails,
|
|
"org_slug": org_slug,
|
|
"org_nombre": org_nombre,
|
|
"rol": rol,
|
|
}
|
|
# crear nueva
|
|
slug = _resolve_slug(slugify_obsidian_name(nombre) or "contacto", used_person_slugs)
|
|
rel = []
|
|
if org_slug:
|
|
rel.append(f"[[{org_slug}]] — {rol or 'contacto'}")
|
|
fm = _ordered_frontmatter({
|
|
"tipo": "persona",
|
|
"nombre": nombre,
|
|
"slug": slug,
|
|
"aliases": [],
|
|
"sexo": sexo if sexo in ("hombre", "mujer") else None,
|
|
"fecha_nacimiento": None,
|
|
"dni": None,
|
|
"telefono": tels[0] if tels else None,
|
|
"email": emails[0] if emails else None,
|
|
"direccion": None,
|
|
"pais": None,
|
|
"relaciones": rel,
|
|
"contexto": "google-contacts",
|
|
"fuente": FUENTE,
|
|
"tags": ["persona", "osint", "contacto"],
|
|
}, PERSON_CANON)
|
|
body_parts = []
|
|
contact = _contact_block(tels, emails)
|
|
if contact:
|
|
body_parts.append(contact)
|
|
if org_slug:
|
|
body_parts.append("## Relacionado")
|
|
body_parts.append("")
|
|
body_parts.append(f"- [[organizaciones/{org_slug}|{org_nombre}]] — {rol or 'contacto'}")
|
|
body_parts.append("")
|
|
body_parts.append("## Notas")
|
|
body_parts.append("")
|
|
return {
|
|
"action": "create_person",
|
|
"slug": slug,
|
|
"nombre": nombre,
|
|
"frontmatter": fm,
|
|
"body": "\n".join(body_parts),
|
|
"tel": tels[0] if tels else None,
|
|
"email": emails[0] if emails else None,
|
|
"org_slug": org_slug,
|
|
"org_nombre": org_nombre,
|
|
"rol": rol,
|
|
}
|
|
|
|
|
|
def _fuzzy_existing_org(slug: str, existing_orgs: dict):
|
|
"""Devuelve el slug de una org existente que sea casi-duplicado de `slug`.
|
|
|
|
Casa cuando uno es prefijo del otro compartiendo >=5 chars de raiz comun
|
|
(p.ej. "fenixfood" ~ "fenixfood-sl", "biorganic" ~ "biorganicfood-sl",
|
|
"4geekss" ~ "4geeks"). None si no hay casi-duplicado.
|
|
"""
|
|
for ex in existing_orgs:
|
|
a, b = slug, ex
|
|
root = a if len(a) <= len(b) else b
|
|
longer = b if root is a else a
|
|
if len(root) >= 5 and longer.startswith(root):
|
|
return ex
|
|
# tolerar 1-2 chars de cola repetida ("4geekss" vs "4geeks")
|
|
common = os.path.commonprefix([a, b])
|
|
if len(common) >= 5 and abs(len(a) - len(b)) <= 2 and (
|
|
a[len(common):].strip("s-") == "" or b[len(common):].strip("s-") == ""
|
|
):
|
|
return ex
|
|
return None
|
|
|
|
|
|
def plan_org(org_nombre, tels, emails, existing_orgs, used_org_slugs,
|
|
person_slug=None, person_nombre=None, rol=None):
|
|
"""Planifica crear (o reutilizar) una organizacion. Devuelve (slug, plan|None).
|
|
|
|
plan=None si ya existe (en vault o ya planificada en este batch) o si el
|
|
nombre es un toponimo (no se crea org de lugar). slug=None si debe ignorarse.
|
|
"""
|
|
slug = slugify_obsidian_name(org_nombre)
|
|
if not slug:
|
|
return None, None
|
|
# Lugar -> no es organizacion: no crear, no enlazar.
|
|
if slug in _PLACE_BLOCKLIST:
|
|
return None, None
|
|
if slug in existing_orgs or slug in used_org_slugs:
|
|
# ya existe: solo enlazar (no crear). Devolvemos el slug, sin plan de creacion.
|
|
return slug, None
|
|
# Casi-duplicado de una org existente -> reutilizar la existente.
|
|
fuzzy = _fuzzy_existing_org(slug, existing_orgs)
|
|
if fuzzy:
|
|
return fuzzy, None
|
|
rel = []
|
|
if person_slug:
|
|
rel.append(f"[[{person_slug}]] — {rol or 'contacto'}")
|
|
fm = _ordered_frontmatter({
|
|
"tipo": "organizacion",
|
|
"nombre": org_nombre.strip(),
|
|
"slug": slug,
|
|
"aliases": [],
|
|
"telefono": tels[0] if tels else None,
|
|
"email": emails[0] if emails else None,
|
|
"direccion": None,
|
|
"pais": None,
|
|
"relaciones": rel,
|
|
"contexto": "google-contacts",
|
|
"fuente": FUENTE,
|
|
"tags": ["organizacion", "osint", "contacto"],
|
|
}, ORG_CANON)
|
|
body_parts = []
|
|
contact = _contact_block(tels, emails)
|
|
if contact:
|
|
body_parts.append(contact)
|
|
if person_slug:
|
|
body_parts.append("## Relacionado")
|
|
body_parts.append("")
|
|
body_parts.append(f"- [[{person_slug}|{person_nombre}]] — {rol or 'contacto'}")
|
|
body_parts.append("")
|
|
body_parts.append("## Notas")
|
|
body_parts.append("")
|
|
plan = {
|
|
"action": "create_org",
|
|
"slug": slug,
|
|
"nombre": org_nombre.strip(),
|
|
"frontmatter": fm,
|
|
"body": "\n".join(body_parts),
|
|
}
|
|
return slug, plan
|
|
|
|
|
|
def _resolve_slug(base: str, used: set) -> str:
|
|
"""Resuelve colisiones de slug con sufijo -2, -3..."""
|
|
if base not in used:
|
|
used.add(base)
|
|
return base
|
|
k = 2
|
|
while f"{base}-{k}" in used:
|
|
k += 1
|
|
s = f"{base}-{k}"
|
|
used.add(s)
|
|
return s
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Orquestacion
|
|
# --------------------------------------------------------------------------
|
|
|
|
def build_plan(contacts, classifications, existing_persons, existing_orgs):
|
|
"""Construye la lista de acciones (crear/enriquecer) a partir de la clasificacion."""
|
|
by_idx = {}
|
|
for c in classifications:
|
|
if isinstance(c, dict) and "i" in c:
|
|
by_idx[c["i"]] = c
|
|
|
|
person_plans, org_plans, enrich_plans = [], [], []
|
|
relations = [] # (tipo_origen, slug_origen, slug_org, rol)
|
|
used_person_slugs = {p["slug"] for p in existing_persons}
|
|
used_org_slugs = set()
|
|
skipped_service = 0
|
|
|
|
# indice de personas existentes mutable (para que dedup vea las recien creadas)
|
|
persons_index = list(existing_persons)
|
|
|
|
for idx, contact in contacts:
|
|
cls = by_idx.get(idx)
|
|
if not cls:
|
|
cls = {"tipo": "persona", "persona_nombre": contact["fn"],
|
|
"org_nombre": None, "rol": None, "sexo": None}
|
|
tipo = (cls.get("tipo") or "persona").lower()
|
|
tels = contact["tels"]
|
|
emails = contact["emails"]
|
|
rol = cls.get("rol")
|
|
sexo = cls.get("sexo")
|
|
persona_nombre = cls.get("persona_nombre")
|
|
org_nombre = cls.get("org_nombre") or contact["org"] or None
|
|
|
|
if tipo == "servicio":
|
|
skipped_service += 1
|
|
continue
|
|
|
|
if tipo == "organizacion":
|
|
# crear la org (telefono al de la org); persona asociada si la hay
|
|
person_slug = None
|
|
person_disp = None
|
|
if persona_nombre and len(_name_tokens(persona_nombre)) >= 1:
|
|
pmatch = match_existing_person(persona_nombre, persons_index)
|
|
if pmatch:
|
|
person_slug = pmatch["slug"]
|
|
person_disp = pmatch["nombre"]
|
|
enrich_plans.append({
|
|
"action": "enrich_person", "slug": pmatch["slug"],
|
|
"path": pmatch["path"], "nombre_existente": pmatch["nombre"],
|
|
"alias_add": persona_nombre, "tel": None, "email": None,
|
|
"tels": [], "emails": [],
|
|
"org_slug": None, "org_nombre": None, "rol": None,
|
|
})
|
|
else:
|
|
pslug = _resolve_slug(slugify_obsidian_name(persona_nombre) or "contacto", used_person_slugs)
|
|
person_slug = pslug
|
|
person_disp = persona_nombre.strip()
|
|
pfm = _ordered_frontmatter({
|
|
"tipo": "persona", "nombre": persona_nombre.strip(), "slug": pslug,
|
|
"aliases": [], "sexo": sexo if sexo in ("hombre", "mujer") else None,
|
|
"fecha_nacimiento": None, "dni": None, "telefono": None, "email": None,
|
|
"direccion": None, "pais": None,
|
|
"relaciones": [], # se completa abajo con el org slug
|
|
"contexto": "google-contacts", "fuente": FUENTE,
|
|
"tags": ["persona", "osint", "contacto"],
|
|
}, PERSON_CANON)
|
|
person_plans.append({
|
|
"action": "create_person", "slug": pslug,
|
|
"nombre": persona_nombre.strip(), "frontmatter": pfm,
|
|
"body": "## Notas\n", "tel": None, "email": None,
|
|
"org_slug": None, "org_nombre": org_nombre, "rol": rol,
|
|
"_pending_org_rel": True,
|
|
})
|
|
persons_index.append({"slug": pslug, "path": None,
|
|
"nombre": persona_nombre.strip(),
|
|
"tokens": _name_tokens(persona_nombre)})
|
|
|
|
oslug, oplan = plan_org(org_nombre or contact["fn"], tels, emails,
|
|
existing_orgs, used_org_slugs,
|
|
person_slug=person_slug, person_nombre=person_disp, rol=rol)
|
|
if oslug:
|
|
used_org_slugs.add(oslug)
|
|
if oplan:
|
|
org_plans.append(oplan)
|
|
if person_slug:
|
|
relations.append(("persona->org", person_slug, oslug, rol))
|
|
# completar relacion en el person plan recien creado
|
|
for pp in person_plans:
|
|
if pp.get("_pending_org_rel") and pp["slug"] == person_slug:
|
|
pp["frontmatter"]["relaciones"] = [f"[[{oslug}]] — {rol or 'contacto'}"]
|
|
pp["org_slug"] = oslug
|
|
pp["body"] = (
|
|
"## Relacionado\n\n"
|
|
f"- [[organizaciones/{oslug}|{org_nombre}]] — {rol or 'contacto'}\n\n"
|
|
"## Notas\n"
|
|
)
|
|
pp.pop("_pending_org_rel", None)
|
|
continue
|
|
|
|
# tipo == persona
|
|
name = persona_nombre or contact["fn"]
|
|
org_slug = None
|
|
# si la persona trae una org asociada, planificar la org y enlazar
|
|
if org_nombre and len(_name_tokens(org_nombre)) >= 1:
|
|
oslug, oplan = plan_org(org_nombre, [], [], existing_orgs, used_org_slugs)
|
|
if oslug:
|
|
used_org_slugs.add(oslug)
|
|
org_slug = oslug
|
|
if oplan:
|
|
# la org no lleva tel/email del contacto (son de la persona)
|
|
org_plans.append(oplan)
|
|
|
|
pplan = plan_person(name, sexo, tels, emails, org_slug, org_nombre, rol,
|
|
persons_index, used_person_slugs)
|
|
if pplan["action"] == "create_person":
|
|
person_plans.append(pplan)
|
|
persons_index.append({"slug": pplan["slug"], "path": None,
|
|
"nombre": pplan["nombre"],
|
|
"tokens": _name_tokens(pplan["nombre"])})
|
|
if org_slug:
|
|
# backref persona en la org recien planificada
|
|
for op in org_plans:
|
|
if op["slug"] == org_slug and not op["frontmatter"].get("relaciones"):
|
|
op["frontmatter"]["relaciones"] = [f"[[{pplan['slug']}]] — {pplan['rol'] or 'contacto'}"]
|
|
else:
|
|
enrich_plans.append(pplan)
|
|
if org_slug:
|
|
relations.append(("persona->org", pplan["slug"], org_slug, rol))
|
|
|
|
return {
|
|
"person_creates": person_plans,
|
|
"org_creates": org_plans,
|
|
"enriches": enrich_plans,
|
|
"relations": relations,
|
|
"skipped_service": skipped_service,
|
|
}
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Aplicar (solo --apply)
|
|
# --------------------------------------------------------------------------
|
|
|
|
def apply_plan(plan):
|
|
"""Escribe las fichas en disco usando funciones del grupo obsidian."""
|
|
created_p = created_o = enriched = 0
|
|
for pp in plan["person_creates"]:
|
|
create_obsidian_note(OSINT, f"personas/{pp['slug']}",
|
|
body=pp["body"], frontmatter=pp["frontmatter"],
|
|
overwrite=True)
|
|
created_p += 1
|
|
for op in plan["org_creates"]:
|
|
create_obsidian_note(OSINT, f"organizaciones/{op['slug']}",
|
|
body=op["body"], frontmatter=op["frontmatter"],
|
|
overwrite=True)
|
|
created_o += 1
|
|
for ep in plan["enriches"]:
|
|
path = ep["path"]
|
|
if not path or not os.path.exists(path):
|
|
continue
|
|
note = read_obsidian_note(path)
|
|
fm = dict(note["frontmatter"])
|
|
# anadir alias del contacto
|
|
aliases = fm.get("aliases") or []
|
|
if not isinstance(aliases, list):
|
|
aliases = [aliases]
|
|
if ep["alias_add"] and ep["alias_add"] not in aliases and ep["alias_add"] != fm.get("nombre"):
|
|
aliases.append(ep["alias_add"])
|
|
# rellenar telefono/email si faltan
|
|
if ep.get("tel") and not fm.get("telefono"):
|
|
fm["telefono"] = ep["tel"]
|
|
if ep.get("email") and not fm.get("email"):
|
|
fm["email"] = ep["email"]
|
|
update_obsidian_note(path, set_frontmatter={"aliases": aliases,
|
|
"telefono": fm.get("telefono"),
|
|
"email": fm.get("email")})
|
|
enriched += 1
|
|
return created_p, created_o, enriched
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Reporte dry-run
|
|
# --------------------------------------------------------------------------
|
|
|
|
def report(plan, stats, llm_calls):
|
|
n_create_p = len(plan["person_creates"])
|
|
n_enrich = len(plan["enriches"])
|
|
n_create_o = len(plan["org_creates"])
|
|
n_rel = len(plan["relations"])
|
|
print("=" * 64)
|
|
print("DRY-RUN — import_google_contacts.py")
|
|
print("=" * 64)
|
|
print(f"vCards totales en el .vcf .................. {stats['total']}")
|
|
print(f"descartados servicio/ruido ................ {stats['filtered']}")
|
|
print(f"contactos clasificados con LLM ............ {stats['classified']}")
|
|
print(f" de ellos sin telefono ni email .......... {stats['no_contact']}")
|
|
print("-" * 64)
|
|
print(f"PERSONAS a crear .......................... {n_create_p}")
|
|
print(f"PERSONAS a enriquecer (ya existen) ........ {n_enrich}")
|
|
print(f"ORGANIZACIONES a crear .................... {n_create_o}")
|
|
print(f"RELACIONES persona<->organizacion ......... {n_rel}")
|
|
print(f"contactos marcados como servicio (LLM) .... {plan['skipped_service']}")
|
|
print(f"colisiones de slug resueltas (sufijo) ..... {stats['slug_collisions']}")
|
|
print("-" * 64)
|
|
print("Llamadas a ask_llm:")
|
|
ok = sum(1 for c in llm_calls if c["ok"])
|
|
fail = sum(1 for c in llm_calls if not c["ok"])
|
|
print(f" exitosas={ok} fallidas={fail} total_intentos={len(llm_calls)}")
|
|
for c in llm_calls:
|
|
if not c["ok"]:
|
|
print(f" FALLO lote size={c['size']} intento={c['attempt']}: {c.get('error')}")
|
|
print("=" * 64)
|
|
print("MUESTRA de 15 fichas (nombre -> tipo/accion -> tel/email -> relacion):")
|
|
print("-" * 64)
|
|
sample = []
|
|
for pp in plan["person_creates"]:
|
|
rel = f" -> org {pp['org_slug']} ({pp['rol'] or 'contacto'})" if pp.get("org_slug") else ""
|
|
sample.append(f"[crear persona] {pp['nombre']} | tel={pp['tel'] or '-'} email={pp['email'] or '-'}{rel}")
|
|
for op in plan["org_creates"]:
|
|
rels = op["frontmatter"].get("relaciones") or []
|
|
rel = f" -> {rels[0]}" if rels else ""
|
|
tel = op["frontmatter"].get("telefono")
|
|
eml = op["frontmatter"].get("email")
|
|
sample.append(f"[crear org] {op['nombre']} | tel={tel or '-'} email={eml or '-'}{rel}")
|
|
for ep in plan["enriches"]:
|
|
sample.append(f"[enriquecer] {ep['nombre_existente']} (+alias '{ep['alias_add']}', +tel={ep.get('tel') or '-'})")
|
|
for line in sample[:15]:
|
|
print(" " + line)
|
|
if len(sample) < 1:
|
|
print(" (sin fichas planificadas)")
|
|
print("=" * 64)
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# main
|
|
# --------------------------------------------------------------------------
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Importa contactos Google al vault OSINT.")
|
|
ap.add_argument("--apply", action="store_true",
|
|
help="Escribe las fichas en disco. Por defecto: dry-run (no escribe).")
|
|
ap.add_argument("--vcf", default=VCF_PATH, help="Ruta al .vcf de contactos.")
|
|
ap.add_argument("--limit", type=int, default=0,
|
|
help="(debug) limita el numero de contactos clasificados.")
|
|
args = ap.parse_args()
|
|
|
|
if not os.path.exists(args.vcf):
|
|
print(f"ERROR: no existe el .vcf: {args.vcf}", file=sys.stderr)
|
|
return 1
|
|
|
|
with open(args.vcf, "r", encoding="utf-8", errors="replace") as f:
|
|
vcf_text = f.read()
|
|
|
|
cards = split_vcards(vcf_text)
|
|
total = len(cards)
|
|
|
|
contacts = []
|
|
filtered = 0
|
|
for raw in cards:
|
|
c = parse_vcard(raw)
|
|
if is_service(c["fn"]):
|
|
filtered += 1
|
|
continue
|
|
contacts.append(c)
|
|
|
|
if args.limit and args.limit > 0:
|
|
contacts = contacts[:args.limit]
|
|
|
|
# indexar contactos
|
|
indexed = list(enumerate(contacts))
|
|
|
|
# clasificar por lotes
|
|
llm_calls = []
|
|
classifications = []
|
|
for start in range(0, len(indexed), BATCH_SIZE):
|
|
batch = indexed[start:start + BATCH_SIZE]
|
|
classifications.extend(classify_batch(batch, llm_calls))
|
|
|
|
existing_persons = load_existing_persons()
|
|
existing_orgs = load_existing_orgs()
|
|
|
|
# contar colisiones: comparar slugs base antes de resolver
|
|
base_slugs = {}
|
|
for _, c in indexed:
|
|
s = slugify_obsidian_name(c["fn"])
|
|
if s:
|
|
base_slugs[s] = base_slugs.get(s, 0) + 1
|
|
slug_collisions = sum(v - 1 for v in base_slugs.values() if v > 1)
|
|
|
|
plan = build_plan(indexed, classifications, existing_persons, existing_orgs)
|
|
|
|
no_contact = sum(1 for _, c in indexed if not c["tels"] and not c["emails"])
|
|
stats = {
|
|
"total": total,
|
|
"filtered": filtered,
|
|
"classified": len(indexed),
|
|
"no_contact": no_contact,
|
|
"slug_collisions": slug_collisions,
|
|
}
|
|
|
|
report(plan, stats, llm_calls)
|
|
|
|
if args.apply:
|
|
cp, co, en = apply_plan(plan)
|
|
print(f"\nAPLICADO: personas creadas={cp} orgs creadas={co} enriquecidas={en}")
|
|
else:
|
|
print("\n(dry-run: no se escribio nada. Usa --apply para aplicar.)")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|