Compare commits

..

2 Commits

Author SHA1 Message Date
egutierrez 0e7b615a1e chore: auto-commit (2 archivos)
- CONVENTIONS.md
- tools/gen_osint_tools.py

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-14 23:55:18 +02:00
egutierrez cb7f6e92a0 chore: auto-commit (3 archivos)
- project.md
- reports/
- tools/import_google_contacts.py

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-13 21:56:57 +02:00
5 changed files with 1862 additions and 0 deletions
+110
View File
@@ -167,3 +167,113 @@ usa `tipo: organizacion` / `tipo: lugar` en vez de `tipo: persona`.
El migrador debe ser re-ejecutable: si una persona ya existe en osint con su slug, se El migrador debe ser re-ejecutable: si una persona ya existe en osint con su slug, se
actualiza (no se duplica). Los attachments ya movidos no se vuelven a mover. actualiza (no se duplica). Los attachments ya movidos no se vuelven a mover.
## 9. Scans de red (recon)
Todo escaneo de red de una investigación —WHOIS, RDAP, DNS, nmap, traceroute, ping— se
**archiva SIEMPRE en OSINT**. No existen scans sueltos: el resultado queda como nota navegable
en el vault y como fila consultable en la base de datos. Lo gestionan las funciones del grupo
de capacidad `recon` del registry (dominio `cybersecurity`); ver `docs/capabilities/recon.md`.
### 9.1 Nota del scan en el vault
Cada scan produce una nota Markdown bajo la carpeta del dominio escaneado:
```
dominios/<slug>/recon/<scan_type>-<YYYYMMDD-HHMM>.md
```
donde `<scan_type>` es uno de `whois | rdap | dns | nmap | traceroute | ping` y el timestamp
tiene granularidad de minuto. Frontmatter de la nota:
```yaml
tipo: scan-red
scan_tipo: whois # whois|rdap|dns|nmap|traceroute|ping
target: "ejemplo.com" # objetivo original (dominio, host o IP)
slug: ejemplo.com # slug del target (clave de la carpeta)
fecha: 2026-06-14T13:18:00 # ISO, momento del scan
herramienta: whois # CLI usada (whois, dig, nmap, ...)
tags: [scan-red, whois, recon]
```
Body: cabecera con target/tipo/herramienta/fecha, un `## Resumen` opcional con los campos
destacados del scan, y la salida cruda completa (`raw`) dentro de un bloque de código. La nota
es la **capa crítica**: si no se puede escribir, el guardado falla.
### 9.2 Tabla `network_scans` (DuckDB, service osint_db)
Además de la nota, cada scan se registra en la tabla `network_scans` (schema `main`) de la
base DuckDB que posee el service `osint_db` (single-writer), vía
`POST http://127.0.0.1:8771/api/scan`. Columnas:
| Columna | Qué |
|---|---|
| `id` | Identificador del scan |
| `target` | Objetivo original (dominio/host/IP) |
| `target_slug` | Slug del target (clave de agrupación) |
| `scan_type` | `whois \| rdap \| dns \| nmap \| traceroute \| ping` |
| `tool` | CLI usada (whois, dig, nmap, ...) |
| `scan_ts` | Timestamp ISO del scan |
| `note_path` | Ruta relativa de la nota en el vault |
| `summary` | JSON con los campos resumidos del scan |
| `created_at` | Timestamp de inserción |
Es la **capa best-effort**: si `osint_db` está caído o no expone el endpoint, el guardado
degrada a solo-nota (`registered=False` + aviso) sin fallar. El re-ingest del vault NO borra
`network_scans` —es una tabla de datos vivos, no derivada de las notas.
### 9.3 Cómo lanzar y guardar
El camino canónico es el pipeline one-shot del registry, que escanea y archiva en una sola
llamada:
```bash
cd /home/enmanuel/fn_registry
./fn run recon_osint <target> <scan_type> # p.ej. ./fn run recon_osint ejemplo.com whois
```
Para un nmap pesado (full-tcp, vuln, udp-top) lanzar en segundo plano por la duración:
```bash
nohup ./fn run recon_osint scanme.nmap.org nmap --profile full-tcp --timeout-s 7200 \
> /tmp/recon-fulltcp.log 2>&1 &
```
Alternativa atómica (controlas el scan y lo guardas aparte) desde Python, importando las
funciones del registry —no se reescriben:
```python
import sys; sys.path.insert(0, "python/functions")
from cybersecurity import dns_records
from cybersecurity.save_scan_to_osint import save_scan_to_osint
scan = dns_records("ejemplo.com")
if scan["status"] == "ok":
save_scan_to_osint("ejemplo.com", "dns", scan["raw"],
summary={"A": scan["records"].get("A")}, tool="dig")
```
### 9.4 Cómo consultar scans guardados
Desde una nota del vault, con un bloque `osintdb` (plugin osint-db) que consulta la tabla:
````markdown
```osintdb
SELECT scan_type, tool, scan_ts, note_path
FROM network_scans
WHERE target_slug = 'ejemplo.com'
ORDER BY scan_ts DESC
```
````
O contra el service directamente vía `/api/query` (mismo SQL). El slug del target se deriva
igual que en todo el vault:
```python
import re
slug = re.sub(r"[^a-z0-9._-]+", "-", target.lower())
```
> Nota: el `slug` de un dominio/host (p.ej. `ejemplo.com`, `192.168.1.10`) conserva puntos y
> guiones porque el set permitido es `[a-z0-9._-]`; difiere del slug de persona de la sección 2,
> que solo admite `[a-z0-9-]`.
+10
View File
@@ -25,6 +25,16 @@ El CRUD del vault se hace con el grupo de funciones del registry `obsidian`
alimenta las investigaciones, ver el grupo `web-proxy` y el tooling de browser del project alimenta las investigaciones, ver el grupo `web-proxy` y el tooling de browser del project
`web_scraping`. `web_scraping`.
### Stack DuckDB (fuente de verdad estructurada)
Desde el 12/06/2026 los datos estructurados del project (entidades del vault + contactos y
eventos de Xandikos) viven en una base DuckDB que es la fuente de verdad, con el vault como
capa de prosa + vista. Tres piezas: service `apps/osint_db` (FastAPI 127.0.0.1:8771, dueño
único de la base), plugin de Obsidian `apps/osint_obsidian_plugin` (bloques ```osintdb con
queries en vivo dentro de notas) y render headless de tablas Markdown congeladas via bloques
sentinel. Arquitectura, contrato API, modelo de tablas (maestras con `note_path`, maestras
DAV y derivadas sin referencias a notas) y operacion: ver `DUCKDB_STACK.md`.
### Relacion con web_scraping ### Relacion con web_scraping
`web_scraping` aporta la captura/automatizacion (perfiles Chromium, CDP, proxy, flow replay). `web_scraping` aporta la captura/automatizacion (perfiles Chromium, CDP, proxy, flow replay).
@@ -0,0 +1,9 @@
# Report — Migración persons multi-valor (20260613-0046)
- Fichas a migrar (con tel/email/direccion): 634
- Render DB→nota OK: 634
- Fallos: 0
- Duración: 7.1s
- Backup: projects/osint/apps/osint_db/data/backups/vault-md-20260613*.tgz
Cada ficha gana `telefonos: [...]`, `emails: [...]`, `direcciones: [...]` en el frontmatter (singulares mantenidos por compat); el cuerpo (prosa) se preserva.
+863
View File
@@ -0,0 +1,863 @@
#!/usr/bin/env python3
"""Genera las fichas de herramientas OSINT en el vault osint + el MOC indice.
Usa la funcion del registry create_obsidian_note_py_obsidian para escribir cada
ficha como .md plano con frontmatter YAML (sin abrir la GUI de Obsidian).
Fuente de verdad: la lista TOOLS de abajo. Idempotente con overwrite=True.
"""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "python", "functions"))
# Permitir ejecucion desde la raiz del repo tambien:
sys.path.insert(0, os.path.join("/home/enmanuel/fn_registry", "python", "functions"))
from obsidian import create_obsidian_note # noqa: E402
VAULT = "/home/enmanuel/Obsidian/osint"
# Etiquetas de categoria legibles para el MOC.
CAT_LABELS = {
"buscador": "Buscadores y meta-buscadores",
"identidad": "Identidad — username, email, telefono, brechas",
"social": "Redes sociales y rostros",
"dominio": "Dominios e infraestructura (pasivo)",
"geolocalizacion": "Geolocalizacion (imagen, mapas, satelite, metadatos, IP)",
"imagen-forense": "Verificacion forense de imagen y video",
"archivo": "Archivo e historico web",
"empresa-es": "Empresas y registros (España)",
"framework": "Frameworks y portales de referencia",
}
CAT_ORDER = list(CAT_LABELS.keys())
# Cada tool: slug, nombre, url, cat, coste, reg (requiere cuenta), amb, antibot
# (bloquea curl / WAF, necesita navegador real), tags extra, para, como, gotchas.
TOOLS = [
# ---------- Buscadores ----------
dict(slug="google-dorking", nombre="Google Dorking", url="https://www.google.com/advanced_search",
cat="buscador", coste="free", reg=False, amb="global", antibot=False,
tags=["dorks", "buscador"],
para="Busqueda avanzada con operadores (site:, filetype:, intext:, inurl:) para encontrar documentos, perfiles y filtraciones indexadas.",
como="Combina operadores: `site:linkedin.com \"nombre\"`, `filetype:pdf \"empresa\"`, `intext:\"email@dominio\"`.",
gotchas=["Google limita resultados y muestra captcha si detecta scraping.",
"Los dorks no son ilegales pero acceder a lo que exponen puede serlo segun contexto."]),
dict(slug="bing", nombre="Bing Search", url="https://www.bing.com",
cat="buscador", coste="free", reg=False, amb="global", antibot=False,
tags=["buscador"],
para="Segundo indice: a veces conserva resultados que Google ya purgo y soporta operadores propios.",
como="Usa operadores `site:`, `ip:`, `feed:`. Util como contraste de Google.",
gotchas=["Indice menor que Google; complementario, no sustituto."]),
dict(slug="yandex-search", nombre="Yandex Search", url="https://yandex.com",
cat="buscador", coste="free", reg=False, amb="global", antibot=False,
tags=["buscador", "ruso"],
para="Buscador ruso con cobertura fuerte de contenido del este de Europa y Asia que Google indexa peor.",
como="Busqueda normal + su reverse image (ver ficha yandex-images) es la mejor del mercado.",
gotchas=["Interfaz/resultados sesgados a region; usa yandex.com (no .ru) para ingles."]),
dict(slug="duckduckgo", nombre="DuckDuckGo", url="https://duckduckgo.com",
cat="buscador", coste="free", reg=False, amb="global", antibot=False,
tags=["buscador", "privacidad"],
para="Buscador sin tracking; sus bangs (!g, !w) saltan a otros indices rapido.",
como="Usa `!` bangs para redirigir busquedas; util para consultas sin personalizacion.",
gotchas=["Indice propio limitado; mezcla resultados de Bing."]),
dict(slug="brave-search", nombre="Brave Search", url="https://search.brave.com",
cat="buscador", coste="free", reg=False, amb="global", antibot=False,
tags=["buscador", "privacidad"],
para="Indice independiente (no depende de Google/Bing); buen contraste para descubrir fuentes distintas.",
como="Busqueda directa; ofrece API de pago para automatizar.",
gotchas=["Indice mas joven; cobertura desigual por idioma."]),
dict(slug="intelligence-x", nombre="Intelligence X", url="https://intelx.io",
cat="buscador", coste="freemium", reg=True, amb="global", antibot=False,
tags=["leaks", "pastes", "darkweb"],
para="Motor que indexa filtraciones, pastes, documentos, darkweb y datos historicos por selector (email, dominio, BTC, IP).",
como="Busca un selector (email/dominio); la vista previa es gratis, el contenido completo requiere creditos.",
gotchas=["Manejar datos de brechas puede tener implicaciones legales segun jurisdiccion.",
"Cuenta gratuita muy limitada."]),
# ---------- Identidad ----------
dict(slug="whatsmyname", nombre="WhatsMyName", url="https://whatsmyname.app",
cat="identidad", coste="free", reg=False, amb="global", antibot=False,
tags=["username", "enumeracion"],
para="Enumera en que cientos de sitios existe un nombre de usuario concreto.",
como="Escribe el username; devuelve presencia por sitio. Mismo dataset que usan herramientas CLI.",
gotchas=["Falsos positivos: algunos sitios devuelven 200 para cualquier usuario.",
"No confirma que sea la MISMA persona, solo que el handle existe."]),
dict(slug="sherlock", nombre="Sherlock (CLI)", url="https://github.com/sherlock-project/sherlock",
cat="identidad", coste="free", reg=False, amb="global", antibot=False,
tags=["username", "cli"],
para="Herramienta CLI que busca un username en 400+ redes sociales.",
como="`sherlock <username>`; genera lista de URLs donde el handle existe.",
gotchas=["Falsos positivos frecuentes; verificar a mano.",
"Requiere instalacion local (pip/docker)."]),
dict(slug="maigret", nombre="Maigret (CLI)", url="https://github.com/soxoj/maigret",
cat="identidad", coste="free", reg=False, amb="global", antibot=False,
tags=["username", "cli"],
para="Sucesor mas potente de Sherlock: 2500+ sitios y extrae datos del perfil (no solo presencia).",
como="`maigret <username> --html`; genera informe con perfiles y metadatos extraidos.",
gotchas=["Mas lento por la cobertura; usar --top-sites para acotar.",
"Instalacion local requerida."]),
dict(slug="namechk", nombre="Namechk", url="https://namechk.com",
cat="identidad", coste="free", reg=False, amb="global", antibot=True,
tags=["username", "dominio"],
para="Comprueba disponibilidad de un username (y dominios) en muchas plataformas a la vez.",
como="Escribe el handle; marca ocupado/libre por servicio.",
gotchas=["Pensado para branding, no para OSINT: no enlaza al perfil.",
"Protegido por WAF; necesita navegador real (curl da 403)."]),
dict(slug="instantusername", nombre="Instant Username Search", url="https://instantusername.com",
cat="identidad", coste="free", reg=False, amb="global", antibot=True,
tags=["username"],
para="Chequeo de username en tiempo real sobre decenas de servicios.",
como="Escribe el handle; resultados incrementales segun teclea.",
gotchas=["Cobertura menor que Maigret.",
"WAF: necesita navegador real."]),
dict(slug="hunter-io", nombre="Hunter.io", url="https://hunter.io",
cat="identidad", coste="freemium", reg=True, amb="global", antibot=False,
tags=["email", "empresa"],
para="Encuentra y verifica direcciones de correo asociadas a un dominio corporativo, con el patron (nombre.apellido@).",
como="Introduce un dominio; devuelve correos conocidos + patron. Verificador de entregabilidad incluido.",
gotchas=["Plan gratis pocas busquedas/mes.",
"Solo correos corporativos publicos, no personales."]),
dict(slug="emailrep", nombre="EmailRep", url="https://emailrep.io",
cat="identidad", coste="freemium", reg=True, amb="global", antibot=False,
tags=["email", "reputacion"],
para="Perfil de reputacion de un email: antiguedad, perfiles sociales ligados, si aparece en brechas, si es desechable.",
como="Consulta el email via web o API; devuelve senales de riesgo y presencia.",
gotchas=["API key gratuita con cuota baja.",
"Datos agregados, no siempre actuales."]),
dict(slug="epieos", nombre="Epieos", url="https://epieos.com",
cat="identidad", coste="freemium", reg=False, amb="global", antibot=True,
tags=["email", "telefono", "google-account"],
para="A partir de un email o telefono revela cuenta de Google asociada (nombre, foto, reviews, calendario publico) y servicios vinculados.",
como="Introduce email o telefono; muestra Google account, Gravatar y servicios donde esta registrado.",
gotchas=["Protegido por DataDome (captcha); necesita navegador real.",
"Funciones avanzadas de pago."]),
dict(slug="holehe", nombre="holehe (CLI)", url="https://github.com/megadose/holehe",
cat="identidad", coste="free", reg=False, amb="global", antibot=False,
tags=["email", "cli"],
para="Comprueba en que 120+ servicios esta registrado un email, sin alertar al titular.",
como="`holehe email@dominio`; marca used/not-used por servicio via flujo de recuperacion.",
gotchas=["Algunos modulos quedan obsoletos cuando los sitios cambian su login.",
"Instalacion local requerida."]),
dict(slug="haveibeenpwned", nombre="Have I Been Pwned", url="https://haveibeenpwned.com",
cat="identidad", coste="freemium", reg=False, amb="global", antibot=False,
tags=["brechas", "email"],
para="Indica si un email aparece en brechas de datos publicas conocidas y en cuales.",
como="Introduce el email en la web; la API (para automatizar) es de pago.",
gotchas=["No muestra la contraseña, solo la brecha.",
"API key de pago; el chequeo web es gratis."]),
dict(slug="dehashed", nombre="DeHashed", url="https://dehashed.com",
cat="identidad", coste="pago", reg=True, amb="global", antibot=True,
tags=["brechas", "credenciales"],
para="Buscador de credenciales filtradas por email, usuario, nombre, telefono o IP, con el dato concreto de la brecha.",
como="Busca un selector; muestra registros (a veces con contraseña en claro/hash). Requiere suscripcion.",
gotchas=["Uso de credenciales filtradas: cuidado legal y etico.",
"De pago; WAF, navegador real."]),
dict(slug="leakcheck", nombre="LeakCheck", url="https://leakcheck.io",
cat="identidad", coste="freemium", reg=True, amb="global", antibot=False,
tags=["brechas", "credenciales"],
para="Alternativa a DeHashed: busca apariciones en brechas por email, usuario, telefono o dominio.",
como="Introduce el selector; vista parcial gratis, detalle con plan.",
gotchas=["Mismas precauciones legales que cualquier base de brechas."]),
dict(slug="phoneinfoga", nombre="PhoneInfoga (CLI)", url="https://github.com/sundowndev/phoneinfoga",
cat="identidad", coste="free", reg=False, amb="global", antibot=False,
tags=["telefono", "cli"],
para="Recon pasivo de numeros de telefono: operador, tipo de linea, pais y dorks automaticos para rastrear el numero.",
como="`phoneinfoga scan -n +34...`; genera footprint y enlaces de busqueda.",
gotchas=["No 'hackea' el numero; solo agrega datos publicos.",
"Instalacion local; algunos scanners requieren API keys."]),
dict(slug="truecaller", nombre="Truecaller", url="https://www.truecaller.com",
cat="identidad", coste="freemium", reg=True, amb="global", antibot=False,
tags=["telefono", "identificacion"],
para="Identifica el nombre asociado a un numero de telefono via su base colaborativa.",
como="Buscar el numero en web/app (requiere login). Util para name lookup.",
gotchas=["Requiere cuenta y a veces app movil.",
"Datos aportados por usuarios: pueden ser erroneos o estar desactualizados."]),
# ---------- Social ----------
dict(slug="social-searcher", nombre="Social Searcher", url="https://www.social-searcher.com",
cat="social", coste="freemium", reg=False, amb="global", antibot=False,
tags=["redes", "monitorizacion"],
para="Busca menciones publicas de una palabra/usuario en varias redes a la vez y analiza sentimiento.",
como="Introduce termino/handle; agrega posts publicos recientes.",
gotchas=["Solo contenido publico e indexado recientemente.",
"Cuota gratuita limitada por dia."]),
dict(slug="instaloader", nombre="Instaloader (CLI)", url="https://instaloader.github.io",
cat="social", coste="free", reg=False, amb="global", antibot=False,
tags=["instagram", "cli"],
para="Descarga posts, stories, bio y metadatos publicos de perfiles de Instagram de forma reproducible.",
como="`instaloader profile <usuario>`; baja media + JSON de metadatos.",
gotchas=["Instagram limita por rate y puede pedir login; usar con moderacion.",
"Loguearse con tu cuenta deja de ser pasivo y arriesga baneo."]),
dict(slug="x-advanced-search", nombre="X (Twitter) Advanced Search", url="https://x.com/search-advanced",
cat="social", coste="free", reg=True, amb="global", antibot=False,
tags=["twitter", "x"],
para="Filtra tweets por usuario, fecha, palabras exactas, ubicacion y tipo, sin scrapear.",
como="Usa operadores `from:`, `since:`, `until:`, `near:`, `geocode:`.",
gotchas=["X ahora exige login para ver resultados.",
"Busqueda geo (`geocode:`) cada vez mas limitada."]),
dict(slug="pimeyes", nombre="PimEyes", url="https://pimeyes.com",
cat="social", coste="pago", reg=True, amb="global", antibot=False,
tags=["rostro", "reverse-face"],
para="Buscador de caras: sube una foto y encuentra otras apariciones del mismo rostro en la web.",
como="Sube la imagen del rostro; muestra coincidencias. Ver donde aparecen requiere plan de pago.",
gotchas=["Implicaciones de privacidad serias; uso responsable.",
"Ver URLs de las coincidencias es de pago."]),
dict(slug="facecheck-id", nombre="FaceCheck.ID", url="https://facecheck.id",
cat="social", coste="freemium", reg=True, amb="global", antibot=False,
tags=["rostro", "reverse-face"],
para="Reverse face search orientado a redes y noticias; alternativa a PimEyes.",
como="Sube el rostro; devuelve coincidencias con score. Detalle completo con creditos.",
gotchas=["Resultados con ruido; verificar siempre.",
"Mismas cautelas de privacidad que PimEyes."]),
# ---------- Dominio ----------
dict(slug="shodan", nombre="Shodan", url="https://www.shodan.io",
cat="dominio", coste="freemium", reg=True, amb="global", antibot=False,
tags=["infra", "puertos", "iot"],
para="Motor de busqueda de dispositivos conectados: puertos, servicios, banners, camaras, ICS, ya indexados (no escaneas tu).",
como="Busca `hostname:`, `org:`, `port:`, `country:`. La consulta lee el indice de Shodan, es pasivo.",
gotchas=["Cuenta gratis con filtros/resultados limitados.",
"El dato puede estar cacheado/desactualizado."]),
dict(slug="censys", nombre="Censys Search", url="https://search.censys.io",
cat="dominio", coste="freemium", reg=True, amb="global", antibot=True,
tags=["infra", "certificados", "hosts"],
para="Inventario de hosts y certificados en internet; muy fuerte para mapear infraestructura y certificados de una org.",
como="Busca por host, certificado o dominio; pivota por fingerprint de cert.",
gotchas=["WAF: navegador real.",
"Free tier con cuota de consultas."]),
dict(slug="fofa", nombre="FOFA", url="https://fofa.info",
cat="dominio", coste="freemium", reg=True, amb="global", antibot=False,
tags=["infra", "ciberespacio"],
para="Buscador chino tipo Shodan/Censys; util para encontrar assets que los otros no indexan.",
como="Sintaxis `domain=`, `ip=`, `title=`. Resultados del indice (pasivo).",
gotchas=["Interfaz parcialmente en chino.",
"Free tier limitado."]),
dict(slug="crtsh", nombre="crt.sh", url="https://crt.sh",
cat="dominio", coste="free", reg=False, amb="global", antibot=True,
tags=["certificados", "subdominios"],
para="Consulta Certificate Transparency: revela subdominios de un dominio a partir de los certificados emitidos.",
como="`https://crt.sh/?q=%25.dominio.com` lista todos los subdominios certificados.",
gotchas=["Servidor a veces lento o caido (curl da timeout); reintentar.",
"Solo ve dominios con certificado emitido."]),
dict(slug="securitytrails", nombre="SecurityTrails", url="https://securitytrails.com",
cat="dominio", coste="freemium", reg=True, amb="global", antibot=True,
tags=["dns", "historico", "whois"],
para="DNS e historico WHOIS pasivo: registros DNS actuales e historicos, subdominios, cambios de propietario.",
como="Busca el dominio; ve historico A/MX/NS y subdominios. API para automatizar.",
gotchas=["Free tier muy limitado.",
"WAF: navegador real."]),
dict(slug="dnsdumpster", nombre="DNSDumpster", url="https://dnsdumpster.com",
cat="dominio", coste="free", reg=False, amb="global", antibot=False,
tags=["dns", "subdominios", "mapa"],
para="Mapa DNS gratuito de un dominio: subdominios, registros MX/NS y grafo de hosts.",
como="Introduce el dominio; genera tabla + grafo de la superficie DNS.",
gotchas=["Datos pasivos, no exhaustivos.",
"Limite de consultas por dia."]),
dict(slug="viewdns", nombre="ViewDNS.info", url="https://viewdns.info",
cat="dominio", coste="freemium", reg=False, amb="global", antibot=False,
tags=["dns", "whois", "reverse-ip"],
para="Caja de herramientas DNS/WHOIS: reverse IP (que dominios comparten IP), whois, propagacion, historico.",
como="Elige la herramienta (reverse IP lookup, whois history) e introduce dominio/IP.",
gotchas=["API de pago; web gratis con limites.",
"Reverse IP incompleto en hostings compartidos grandes."]),
dict(slug="urlscan-io", nombre="urlscan.io", url="https://urlscan.io",
cat="dominio", coste="freemium", reg=False, amb="global", antibot=False,
tags=["url", "scan", "screenshot"],
para="Analiza una URL en sandbox: capturas, recursos cargados, dominios contactados, tecnologias. La busqueda de scans previos es 100% pasiva.",
como="Busca el dominio en la pestaña Search para ver scans publicos previos sin lanzar uno nuevo.",
gotchas=["Lanzar un scan publico es visible para terceros; usa 'unlisted/private' si no quieres alertar.",
"Un scan activo toca el sitio objetivo (deja de ser pasivo)."]),
# ---------- Geolocalizacion ----------
dict(slug="yandex-images", nombre="Yandex Images (reverse)", url="https://yandex.com/images/",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["reverse-image", "rostro", "lugar"],
para="La mejor busqueda inversa de imagenes para lugares y rostros; clave para geolocalizar fotos.",
como="Sube la foto o pega su URL; Yandex sugiere imagenes y lugares visualmente similares.",
gotchas=["Sesgo a contenido del este; usar junto a Google Lens.",
"Puede pedir captcha tras varias subidas."]),
dict(slug="google-lens", nombre="Google Lens", url="https://lens.google.com",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["reverse-image", "objetos", "texto"],
para="Reconoce objetos, texto, productos y lugares en una imagen; bueno para identificar carteles, logos, edificios.",
como="Sube la imagen; recorta la zona de interes (cartel, fachada) para afinar.",
gotchas=["Mejor para objetos/texto que para coincidencia exacta de foto.",
"Combinar con Yandex para lugares."]),
dict(slug="bing-visual-search", nombre="Bing Visual Search", url="https://www.bing.com/visualsearch",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["reverse-image"],
para="Tercera busqueda inversa para contrastar Yandex y Google.",
como="Sube la imagen; revisa coincidencias y paginas que la contienen.",
gotchas=["Cobertura menor; usar como tercera opinion."]),
dict(slug="tineye", nombre="TinEye", url="https://tineye.com",
cat="geolocalizacion", coste="freemium", reg=False, amb="global", antibot=False,
tags=["reverse-image", "origen"],
para="Reverse image enfocado al ORIGEN y primera aparicion de una imagen (no a contenido similar).",
como="Sube la imagen; ordena por 'oldest' para encontrar la fuente original.",
gotchas=["No busca caras/objetos similares, solo la misma imagen y ediciones.",
"Indice mas pequeño que Yandex."]),
dict(slug="geospy", nombre="GeoSpy AI", url="https://geospy.ai",
cat="geolocalizacion", coste="freemium", reg=True, amb="global", antibot=False,
tags=["geolocalizacion", "ia", "lugar"],
para="Estima la ubicacion de una foto mediante IA, incluso sin metadatos, a partir de pistas visuales.",
como="Sube la imagen; devuelve una hipotesis de pais/ciudad con probabilidad.",
gotchas=["Estimacion probabilistica: verificar siempre con mapas/street view.",
"Acceso completo de pago."]),
dict(slug="google-earth", nombre="Google Earth Pro", url="https://earth.google.com",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["satelite", "historico", "3d"],
para="Imagenes satelitales con HISTORICO temporal y vista 3D; clave para datar cambios y medir.",
como="Usa la barra de tiempo para ver la misma zona en distintas fechas; mide distancias y alturas.",
gotchas=["Resolucion y fechas varian mucho por zona.",
"Version Pro de escritorio tiene mas herramientas que la web."]),
dict(slug="google-maps", nombre="Google Maps + Street View", url="https://maps.google.com",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["mapa", "street-view", "negocios"],
para="Mapa, vista de calle con historico, reseñas y fotos de usuarios para confirmar y datar una ubicacion.",
como="En Street View usa el reloj para ver capturas antiguas; revisa fotos de reseñas para interiores.",
gotchas=["Street View no cubre todo; combinar con Mapillary.",
"Las fotos de usuarios pueden estar mal ubicadas."]),
dict(slug="bing-maps", nombre="Bing Maps (Bird's Eye)", url="https://www.bing.com/maps",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["mapa", "oblicua", "satelite"],
para="Vista oblicua 'Bird's Eye' (45 grados) que muestra fachadas que el cenital no ve.",
como="Activa 'Bird's Eye'; rota la vista para ver los cuatro lados de un edificio.",
gotchas=["Cobertura oblicua limitada a ciertas ciudades."]),
dict(slug="yandex-maps", nombre="Yandex Maps", url="https://yandex.com/maps",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["mapa", "street-view"],
para="Street view (panoramas) y mapas con cobertura fuerte en Rusia, Turquia, Asia central y este de Europa.",
como="Activa panoramas; util donde Google Street View no llega.",
gotchas=["Cobertura floja en Europa occidental.",
"Interfaz parcialmente en ruso."]),
dict(slug="mapillary", nombre="Mapillary", url="https://www.mapillary.com",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=True,
tags=["street-level", "crowdsourced"],
para="Imagenes a nivel de calle aportadas por usuarios; cubre rutas y zonas que Street View ignora.",
como="Navega el mapa; las lineas verdes son tramos con fotos. Filtra por fecha.",
gotchas=["Calidad y cobertura desiguales (depende de aportaciones).",
"WAF en la home; navegador real."]),
dict(slug="kartaview", nombre="KartaView", url="https://kartaview.org",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["street-level", "crowdsourced", "open"],
para="Alternativa abierta a Mapillary con imagenes a nivel de calle crowdsourced.",
como="Explora el mapa para tramos fotografiados; descarga libre.",
gotchas=["Cobertura menor que Mapillary."]),
dict(slug="openstreetmap", nombre="OpenStreetMap", url="https://www.openstreetmap.org",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["mapa", "open", "datos"],
para="Mapa libre con datos crudos etiquetados (tipo de edificio, comercio, altura) consultables.",
como="Usa el editor/inspector para ver tags de un elemento; base para Overpass.",
gotchas=["Detalle depende de la comunidad local."]),
dict(slug="overpass-turbo", nombre="Overpass Turbo", url="https://overpass-turbo.eu",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["consulta", "osm", "features"],
para="Consulta los datos de OSM por tipo de objeto: 'todas las gasolineras / antenas / iglesias en esta zona'.",
como="Escribe una query Overpass QL y ejecutala sobre el area visible; ideal para acotar candidatos al geolocalizar.",
gotchas=["Curva de aprendizaje de la sintaxis QL.",
"Solo encuentra lo que esta etiquetado en OSM."]),
dict(slug="wikimapia", nombre="Wikimapia", url="https://wikimapia.org",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=True,
tags=["mapa", "crowdsourced", "etiquetas"],
para="Mapa colaborativo con descripciones de lugares y edificios que otros mapas no etiquetan.",
como="Navega y lee las descripciones de los poligonos dibujados por usuarios.",
gotchas=["Informacion sin verificar; tratar como pista.",
"WAF; navegador real."]),
dict(slug="sentinel-eo-browser", nombre="Sentinel Hub EO Browser", url="https://apps.sentinel-hub.com/eo-browser/",
cat="geolocalizacion", coste="free", reg=True, amb="global", antibot=False,
tags=["satelite", "sentinel", "multiespectral"],
para="Imagenes Sentinel/Landsat recientes y multiespectrales; ver cambios, incendios, agua, vegetacion por fecha.",
como="Selecciona zona y fecha; cambia entre bandas (true color, NDVI, etc.).",
gotchas=["Resolucion ~10m (no ve coches/personas).",
"Requiere cuenta gratuita para algunas capas."]),
dict(slug="zoom-earth", nombre="Zoom Earth", url="https://zoom.earth",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["satelite", "tiempo-real", "clima"],
para="Vista satelital casi en tiempo real con capas meteorologicas (nubes, tormentas, incendios).",
como="Navega el globo; util para contexto temporal/meteo de un evento.",
gotchas=["Resolucion baja; para contexto, no detalle."]),
dict(slug="nasa-worldview", nombre="NASA Worldview", url="https://worldview.earthdata.nasa.gov",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["satelite", "historico", "nasa"],
para="Imagenes satelitales diarias historicas de la NASA (MODIS/VIIRS) con barra temporal global.",
como="Elige fecha y capa; descarga la imagen del dia para una region.",
gotchas=["Resolucion baja (cientos de m); para fenomenos grandes."]),
dict(slug="suncalc", nombre="SunCalc", url="https://www.suncalc.org",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["chronolocation", "sol", "sombras"],
para="Calcula la posicion del sol y direccion de las sombras para una ubicacion y hora dadas (chronolocation).",
como="Fija el punto en el mapa y la fecha; ajusta la hora hasta que la sombra coincida con la de la foto.",
gotchas=["Necesitas estimar bien la fecha/estacion.",
"Funciona al reves: deducir la hora a partir de la sombra observada."]),
dict(slug="shadowmap", nombre="Shadowmap", url="https://shadowmap.org",
cat="geolocalizacion", coste="freemium", reg=False, amb="global", antibot=False,
tags=["chronolocation", "sombras", "3d"],
para="Simula sombras de edificios en 3D segun hora/fecha; util para chronolocation en ciudad.",
como="Coloca la vista en la zona y desliza la hora para comparar sombras de edificios.",
gotchas=["Modelo 3D solo en ciudades grandes.",
"Funciones avanzadas de pago."]),
dict(slug="exiftool", nombre="ExifTool (CLI)", url="https://exiftool.org",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["metadatos", "exif", "cli"],
para="Lee todos los metadatos de una imagen/archivo: GPS, fecha, camara, software. El estandar para EXIF.",
como="`exiftool foto.jpg`; busca GPSLatitude/Longitude y DateTimeOriginal.",
gotchas=["Las redes sociales borran EXIF al subir; suele haber GPS solo en originales.",
"Instalacion local (perl)."]),
dict(slug="metadata2go", nombre="Metadata2Go", url="https://www.metadata2go.com",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["metadatos", "exif", "online"],
para="Visor EXIF online sin instalar nada: sube el archivo y lee sus metadatos.",
como="Sube la imagen; muestra GPS, fecha y datos de camara.",
gotchas=["Subes el archivo a un tercero: no usar con material sensible.",
"Preferir ExifTool local para evidencia."]),
dict(slug="jimpl", nombre="Jimpl", url="https://jimpl.com",
cat="geolocalizacion", coste="free", reg=False, amb="global", antibot=False,
tags=["metadatos", "exif", "gps", "online"],
para="Visor EXIF online que situa la coordenada GPS de la foto directamente en un mapa.",
como="Sube la imagen; si tiene GPS, lo pinta en el mapa.",
gotchas=["Mismo aviso de privacidad: subes a un tercero."]),
dict(slug="ipinfo", nombre="IPinfo", url="https://ipinfo.io",
cat="geolocalizacion", coste="freemium", reg=True, amb="global", antibot=False,
tags=["ip-geo", "asn"],
para="Geolocalizacion de IP, ASN, organizacion y tipo de conexion (hosting/movil/vpn).",
como="`ipinfo.io/<ip>` en web o API; util para ubicar el origen aproximado de una IP.",
gotchas=["Geo de IP es aproximada (ciudad/region, no direccion).",
"VPN/proxy falsean la ubicacion."]),
dict(slug="ipgeolocation", nombre="ipgeolocation.io", url="https://ipgeolocation.io",
cat="geolocalizacion", coste="freemium", reg=True, amb="global", antibot=False,
tags=["ip-geo", "api"],
para="API de geolocalizacion de IP con zona horaria, moneda y datos de red.",
como="Consulta por IP via API; devuelve pais, region, ciudad aproximada.",
gotchas=["Precision limitada; varias fuentes discrepan.",
"Requiere API key."]),
dict(slug="maxmind", nombre="MaxMind GeoIP", url="https://www.maxmind.com",
cat="geolocalizacion", coste="freemium", reg=True, amb="global", antibot=False,
tags=["ip-geo", "base-datos"],
para="Base de datos GeoIP estandar de la industria (GeoLite2 gratis) para resolver IP a ubicacion offline.",
como="Descarga GeoLite2 (cuenta gratis) y consulta localmente; o usa su web demo.",
gotchas=["GeoLite2 menos precisa que la version de pago.",
"Requiere cuenta para descargar las DB."]),
# ---------- Imagen forense ----------
dict(slug="invid-weverify", nombre="InVID / WeVerify", url="https://www.invid-project.eu",
cat="imagen-forense", coste="free", reg=False, amb="global", antibot=False,
tags=["video", "verificacion", "frames", "extension"],
para="Extension del navegador para verificar videos/imagenes: extrae frames, hace reverse search, lee metadatos y detecta manipulaciones.",
como="Instala la extension (Fake News Debunker); pega la URL del video para sacar keyframes y buscarlos.",
gotchas=["Es una extension de navegador, no una web de consulta directa.",
"Algunas integraciones de redes se rompen cuando cambian sus APIs."]),
dict(slug="forensically", nombre="Forensically", url="https://29a.ch/photo-forensics/",
cat="imagen-forense", coste="free", reg=False, amb="global", antibot=False,
tags=["forense", "ela", "clonado"],
para="Suite forense de imagen en el navegador: ELA, deteccion de clonado, analisis de ruido, lupa de detalle.",
como="Sube la foto; usa Clone Detection y Error Level Analysis para detectar zonas editadas.",
gotchas=["ELA da pistas, no pruebas; interpretar con cuidado.",
"Funciona en local en el navegador (no sube la imagen)."]),
dict(slug="fotoforensics", nombre="FotoForensics", url="https://fotoforensics.com",
cat="imagen-forense", coste="free", reg=False, amb="global", antibot=False,
tags=["forense", "ela"],
para="Analisis ELA online clasico para detectar montajes en imagenes JPEG.",
como="Sube la imagen; las zonas con distinto nivel de error sugieren edicion.",
gotchas=["Subes la imagen a un tercero (queda publica un tiempo).",
"ELA tiene muchos falsos positivos en imagenes recomprimidas."]),
# ---------- Archivo ----------
dict(slug="wayback-machine", nombre="Wayback Machine", url="https://web.archive.org",
cat="archivo", coste="free", reg=False, amb="global", antibot=False,
tags=["archivo", "historico", "web"],
para="Archivo historico de paginas web: ver como era un sitio/perfil en una fecha pasada.",
como="Pega la URL; navega por las capturas en la linea de tiempo. Permite guardar una captura nueva.",
gotchas=["No todo esta archivado ni en todas las fechas.",
"robots.txt o peticiones de borrado pueden ocultar capturas."]),
dict(slug="archive-today", nombre="archive.today", url="https://archive.ph",
cat="archivo", coste="free", reg=False, amb="global", antibot=False,
tags=["archivo", "snapshot"],
para="Captura puntual de una pagina (incluye contenido dinamico) que queda congelada e inmune a borrados.",
como="Pega la URL para crear o buscar una captura; util para preservar evidencia antes de que se borre.",
gotchas=["Capturas puntuales, no rastreo continuo como Wayback.",
"Dominios espejo varios (.ph/.is/.today)."]),
dict(slug="cachedview", nombre="CachedView", url="https://cachedview.nl",
cat="archivo", coste="free", reg=False, amb="global", antibot=False,
tags=["cache", "google"],
para="Atajo para ver versiones cacheadas de una pagina (Google Cache, Wayback) en un sitio.",
como="Pega la URL; ofrece enlaces a las caches disponibles.",
gotchas=["Google Cache esta practicamente retirado; cada vez menos util.",
"Depende de que el tercero conserve la copia."]),
# ---------- Empresa ES ----------
dict(slug="catastro", nombre="Sede Catastro", url="https://www.sedecatastro.gob.es",
cat="empresa-es", coste="free", reg=False, amb="espana", antibot=False,
tags=["inmueble", "catastro", "geo"],
para="Catastro español: localiza inmuebles por direccion o referencia catastral, con superficie, uso, año y geometria sobre mapa.",
como="Busca por direccion; obtiene referencia catastral, plano y datos del inmueble (titular parcial sin certificado).",
gotchas=["El titular completo requiere certificado/identificacion y acreditar interes.",
"Cruzar referencia catastral con la direccion del objetivo."]),
dict(slug="borme", nombre="BORME (BOE)", url="https://www.boe.es/diario_borme/",
cat="empresa-es", coste="free", reg=False, amb="espana", antibot=False,
tags=["mercantil", "empresa", "boletin"],
para="Boletin Oficial del Registro Mercantil: constituciones, nombramientos, ceses y cambios de empresas españolas.",
como="Busca por nombre de persona o sociedad; revela en que empresas figura alguien como administrador.",
gotchas=["URL correcta es /diario_borme/ (la /borme/ da 404).",
"Datos historicos por fecha de publicacion."]),
dict(slug="libreborme", nombre="Libreborme", url="https://libreborme.net",
cat="empresa-es", coste="free", reg=False, amb="espana", antibot=True,
tags=["mercantil", "empresa", "buscador"],
para="Interfaz buscable sobre los datos del BORME: ficha de persona/empresa con sus cargos y relaciones.",
como="Busca el nombre; ve sus sociedades, cargos y co-administradores de un vistazo.",
gotchas=["Cobertura desde ~2009; datos antiguos pueden faltar.",
"WAF; navegador real."]),
dict(slug="einforma", nombre="eInforma", url="https://www.einforma.com",
cat="empresa-es", coste="freemium", reg=True, amb="espana", antibot=False,
tags=["mercantil", "empresa", "informe"],
para="Informes mercantiles de empresas españolas: cargos, cuentas, CIF, vinculaciones.",
como="Busca la empresa o el administrador; datos basicos gratis, informe completo de pago.",
gotchas=["Lo detallado es de pago.",
"Requiere cuenta."]),
dict(slug="axesor", nombre="Axesor", url="https://www.axesor.es",
cat="empresa-es", coste="freemium", reg=True, amb="espana", antibot=False,
tags=["mercantil", "empresa", "rating"],
para="Informacion mercantil y de riesgo de empresas españolas; alternativa a eInforma.",
como="Busca empresa/persona; ficha basica gratis, informe financiero de pago.",
gotchas=["Datos detallados de pago.",
"Requiere cuenta."]),
dict(slug="infoempresa", nombre="InfoEmpresa", url="https://www.infoempresa.com",
cat="empresa-es", coste="freemium", reg=False, amb="espana", antibot=False,
tags=["mercantil", "empresa"],
para="Buscador de empresas españolas con administradores, objeto social y datos de contacto.",
como="Busca por nombre de empresa o de persona para ver sus cargos.",
gotchas=["Parte de la ficha de pago.",
"Datos a veces desactualizados."]),
dict(slug="idealista", nombre="Idealista", url="https://www.idealista.com",
cat="empresa-es", coste="free", reg=False, amb="espana", antibot=True,
tags=["inmueble", "fotos", "geo"],
para="Portal inmobiliario: fotos y planos de viviendas que permiten cruzar una direccion con el interior y el entorno.",
como="Busca por zona/direccion; las fotos del anuncio revelan interior, vistas y referencias del lugar.",
gotchas=["Anuncios caducan; usar Wayback para los retirados.",
"WAF (DataDome); navegador real."]),
dict(slug="fotocasa", nombre="Fotocasa", url="https://www.fotocasa.es",
cat="empresa-es", coste="free", reg=False, amb="espana", antibot=False,
tags=["inmueble", "fotos", "geo"],
para="Segundo portal inmobiliario español; util para contrastar fotos/anuncios con Idealista.",
como="Busca por zona; cruza fotos y precios con el otro portal.",
gotchas=["Cobertura solapada con Idealista; usar ambos."]),
dict(slug="infobel", nombre="Infobel", url="https://www.infobel.com/es",
cat="empresa-es", coste="free", reg=False, amb="espana", antibot=True,
tags=["telefono", "directorio", "paginas-blancas"],
para="Paginas blancas/amarillas online: cruza nombre, telefono y direccion en España y otros paises.",
como="Busca por nombre o telefono para resolver el otro dato.",
gotchas=["Cobertura desigual; muchos numeros no listados (LOPD).",
"WAF; navegador real."]),
# ---------- Frameworks ----------
dict(slug="osint-framework", nombre="OSINT Framework", url="https://osintframework.com",
cat="framework", coste="free", reg=False, amb="global", antibot=False,
tags=["indice", "arbol", "recursos"],
para="Arbol navegable de cientos de recursos OSINT clasificados por tipo de dato (email, username, dominio, geo...).",
como="Navega el arbol por categoria para descubrir herramientas especificas para cada dato.",
gotchas=["Algunos enlaces estan muertos o desactualizados.",
"Es un indice, no ejecuta nada."]),
dict(slug="bellingcat-toolkit", nombre="Bellingcat's Online Toolkit", url="https://bellingcat.gitbook.io/toolkit",
cat="framework", coste="free", reg=False, amb="global", antibot=False,
tags=["indice", "investigacion", "geo"],
para="Coleccion curada por Bellingcat de herramientas para investigacion, con fuerte enfasis en geolocalizacion y verificacion.",
como="Busca por categoria (maps, satellite, social) la herramienta recomendada y probada por investigadores.",
gotchas=["Curada pero amplia; empezar por las marcadas como favoritas."]),
dict(slug="inteltechniques", nombre="IntelTechniques Tools", url="https://inteltechniques.com/tools/",
cat="framework", coste="free", reg=False, amb="global", antibot=False,
tags=["indice", "formularios", "bazzell"],
para="Conjunto de formularios de busqueda (Michael Bazzell) que automatizan consultas por email, username, telefono, etc.",
como="Abre la herramienta del dato que tengas; rellena y lanza busquedas en multiples servicios.",
gotchas=["Algunas herramientas se rompen cuando los servicios cambian sus URLs.",
"Parte del contenido premium esta en sus libros."]),
dict(slug="start-me-osint", nombre="Start.me — OSINT Collection", url="https://start.me/p/DPYPMz/the-ultimate-osint-collection",
cat="framework", coste="free", reg=False, amb="global", antibot=True,
tags=["indice", "enlaces", "dashboard"],
para="Dashboard comunitario con cientos de enlaces OSINT agrupados por tema; bueno para descubrir fuentes nuevas.",
como="Navega los paneles por categoria; marca los enlaces utiles.",
gotchas=["Calidad variable (es comunitario).",
"WAF; navegador real."]),
]
# ============================================================================
# Grupo recon: funciones del registry (NO sitios web) que se ejecutan con
# `fn run` y archivan su resultado por objetivo en el vault + DuckDB.
# docs/capabilities/recon.md + projects/osint/CONVENTIONS.md §9.
# ============================================================================
RECON_TOOLS = [
dict(slug="whois-lookup", nombre="whois_lookup", rid="whois_lookup_py_cybersecurity",
modo="pasivo", sudo=False, persiste=True,
comando="fn run recon_osint <target> whois",
para="Lookup WHOIS de un dominio o IP: registrar, pais del registrante, fechas (creacion/expiracion/actualizacion) y name servers, parseados best-effort sobre el raw.",
gotchas=["Pasivo (consulta el registro, no toca al objetivo).",
"El parseo depende del formato del registrar; el `raw` completo siempre se guarda."]),
dict(slug="rdap-lookup", nombre="rdap_lookup", rid="rdap_lookup_py_cybersecurity",
modo="pasivo", sudo=False, persiste=True,
comando="fn run recon_osint <target> rdap",
para="Lookup RDAP (sustituto JSON moderno de WHOIS) de dominio, IP o ASN (p.ej. AS15169); devuelve datos estructurados.",
gotchas=["Pasivo.",
"Mas fiable de parsear que WHOIS; preferir cuando el TLD lo soporta."]),
dict(slug="dns-records", nombre="dns_records", rid="dns_records_py_cybersecurity",
modo="pasivo", sudo=False, persiste=True,
comando="fn run recon_osint <target> dns",
para="Registros DNS via `dig +short` (A, AAAA, MX, NS, SOA, TXT, CNAME por defecto) en un dict por tipo.",
gotchas=["Pasivo (consulta a resolvers DNS, no al objetivo).",
"Resultados dependen del resolver/cache local."]),
dict(slug="ping-host", nombre="ping_host", rid="ping_host_py_cybersecurity",
modo="activo", sudo=False, persiste=True,
comando="fn run recon_osint <target> ping",
para="Sondeo ICMP: porcentaje de perdida y RTT (avg/min/max) hacia el host.",
gotchas=["ACTIVO: envia paquetes al objetivo.",
"Host filtrado por firewall = loss 100% pero status:ok (no es error)."]),
dict(slug="traceroute-host", nombre="traceroute_host", rid="traceroute_host_py_cybersecurity",
modo="activo", sudo=False, persiste=True,
comando="fn run recon_osint <target> traceroute",
para="Traza la ruta de red hasta el host: lista de hops con nombre/IP/RTT.",
gotchas=["ACTIVO: genera trafico hacia el objetivo y la ruta intermedia.",
"Hops sin respuesta (`* * *`) salen con hosts vacios."]),
dict(slug="nmap-scan", nombre="nmap_scan", rid="nmap_scan_py_cybersecurity",
modo="activo", sudo="perfiles os/udp-top/aggressive", persiste=True,
comando="fn run recon_osint <target> nmap # perfil quick por defecto",
para="Escaneo de puertos y servicios con nmap por perfiles (quick, top1000, service -sV -sC, vuln, udp-top, aggressive -A, discovery CIDR, os). Salida XML parseada a open_ports/hosts_up.",
gotchas=["ACTIVO E INTRUSIVO: solo contra objetivos PROPIOS o con autorizacion explicita. Escanear infra ajena puede ser ilegal.",
"Perfiles os/udp-top/aggressive requieren sudo.",
"Perfiles largos (vuln, aggressive) → lanzar en segundo plano (&/background).",
"discovery acepta CIDR: cuidado de no barrer rangos que no son tuyos."]),
dict(slug="save-scan-to-osint", nombre="save_scan_to_osint", rid="save_scan_to_osint_py_cybersecurity",
modo="sink", sudo=False, persiste=True,
comando="# se invoca dentro del pipeline; uso directo para archivar un raw externo",
para="Sink comun: archiva el resultado de CUALQUIER scan en el ecosistema OSINT. Dos capas: nota Markdown tipada en el vault (siempre) + POST a osint_db para registro DuckDB (best-effort).",
gotchas=["Si el service osint_db esta caido o el endpoint da 404, degrada a solo-nota (register_warning) sin fallar.",
"No lanza excepciones: devuelve dict de estado con note_path/registered/scan_id."]),
dict(slug="recon-osint", nombre="recon_osint (pipeline)", rid="recon_osint_py_pipelines",
modo="segun scan", sudo="si el scan lo pide", persiste=True,
comando="fn run recon_osint <target> <whois|rdap|dns|ping|traceroute|nmap>",
para="Pipeline one-shot: ejecuta un scan del tipo pedido y lo archiva en OSINT en una sola llamada. El camino canonico para recon + archivado por objetivo.",
gotchas=["Hereda el modo del scan elegido (whois/rdap/dns pasivos; ping/traceroute/nmap activos).",
"`save=False` ejecuta sin archivar (raro; por defecto archiva)."]),
]
RECON_PERSIST_LINES = [
"- **Nota** (siempre, fuente de verdad): `~/Obsidian/osint/dominios/<slug>/recon/<tipo>-<ts>.md` (`tipo: scan-red`, raw en bloque de codigo).",
"- **DuckDB** (best-effort): tabla `network_scans` en `osint_db` via `POST 127.0.0.1:8771/api/scan`.",
"- **Consultar** (code block `osintdb` del plugin): `SELECT * FROM network_scans WHERE target_slug='<slug>';`",
"- Service `osint_db` caido → degrada a solo-nota sin fallar.",
]
def build_recon_frontmatter(t):
return {
"tipo": "herramienta",
"nombre": t["nombre"],
"slug": t["slug"],
"registry_id": t["rid"],
"categoria": "recon",
"osint_modo": t["modo"],
"coste": "free",
"comando": t["comando"],
"requiere_sudo": t["sudo"],
"persiste_en_vault": bool(t["persiste"]),
"tags": ["herramienta", "osint", "recon", "registry", "red"],
}
def build_recon_body(t):
lines = []
lines.append("> Funcion del registry (no es un sitio web): se ejecuta con `fn run` y archiva el resultado por objetivo.")
lines.append("")
lines.append("## Para que")
lines.append("")
lines.append(t["para"])
lines.append("")
lines.append("## Como llamar")
lines.append("")
lines.append("```bash")
lines.append(t["comando"])
lines.append("```")
lines.append("")
lines.append(f"Registry ID: `{t['rid']}`. Inspeccionar: `mcp__registry__fn_show id=\"{t['rid']}\"`.")
lines.append("")
lines.append("## Persistencia (resultados por objetivo)")
lines.append("")
if t["modo"] == "sink":
lines.append("Es el sink que escribe la nota y registra en DuckDB:")
else:
lines.append("Al pasar por el pipeline `recon_osint` (o por `save_scan_to_osint`):")
lines.append("")
lines.extend(RECON_PERSIST_LINES)
lines.append("")
lines.append("## Gotchas")
lines.append("")
for g in t["gotchas"]:
lines.append(f"- {g}")
return "\n".join(lines)
def build_body(t):
lines = []
lines.append("## Para que")
lines.append("")
lines.append(t["para"])
lines.append("")
lines.append("## Como usar")
lines.append("")
lines.append(t["como"])
lines.append("")
lines.append("## Gotchas")
lines.append("")
for g in t["gotchas"]:
lines.append(f"- {g}")
return "\n".join(lines)
def build_frontmatter(t):
fm = {
"tipo": "herramienta",
"nombre": t["nombre"],
"slug": t["slug"],
"url": t["url"],
"categoria": t["cat"],
"osint_modo": "pasivo",
"coste": t["coste"],
"registro": bool(t["reg"]),
"ambito": t["amb"],
"anti_bot": bool(t["antibot"]),
"tags": ["herramienta", "osint", t["cat"]] + t["tags"],
}
return fm
def main():
created = []
# Validar slugs unicos (web + recon no colisionan).
slugs = [t["slug"] for t in TOOLS] + [t["slug"] for t in RECON_TOOLS]
assert len(slugs) == len(set(slugs)), "slugs duplicados: " + str(
[s for s in slugs if slugs.count(s) > 1]
)
for t in TOOLS:
rel = f"herramientas/{t['slug']}.md"
path = create_obsidian_note(
vault_dir=VAULT,
rel_path=rel,
body=build_body(t),
frontmatter=build_frontmatter(t),
overwrite=True,
)
created.append((t["cat"], t["slug"], path))
for t in RECON_TOOLS:
rel = f"herramientas/{t['slug']}.md"
path = create_obsidian_note(
vault_dir=VAULT,
rel_path=rel,
body=build_recon_body(t),
frontmatter=build_recon_frontmatter(t),
overwrite=True,
)
created.append(("recon", t["slug"], path))
# ----- MOC index -----
moc_lines = []
moc_lines.append("Indice de herramientas online para investigaciones OSINT pasivo.")
moc_lines.append("")
moc_lines.append(
"Cada herramienta tiene su ficha con `## Para que`, `## Como usar` y `## Gotchas`. "
"El campo `anti_bot: true` marca las que bloquean clientes simples y necesitan un navegador real "
"(perfil Chromium dedicado). Modo de uso por defecto: **pasivo** (no se interactua con los sistemas del objetivo)."
)
moc_lines.append("")
total = len(TOOLS) + len(RECON_TOOLS)
moc_lines.append(
f"**Total: {total} herramientas** ({len(TOOLS)} sitios/CLIs externos pasivos "
f"+ {len(RECON_TOOLS)} funciones del registry del grupo `recon`, estas ultimas ejecutables "
f"con `fn run` y con archivado por objetivo)."
)
moc_lines.append("")
for cat in CAT_ORDER:
items = [t for t in TOOLS if t["cat"] == cat]
if not items:
continue
moc_lines.append(f"## {CAT_LABELS[cat]}")
moc_lines.append("")
moc_lines.append("| Herramienta | Coste | Cuenta | Ambito | Para que |")
moc_lines.append("|---|---|---|---|---|")
for t in items:
cuenta = "si" if t["reg"] else "no"
ab = " ⚠️navegador" if t["antibot"] else ""
link = f"[[{t['slug']}\\|{t['nombre']}]]"
resumen = t["para"].strip()
moc_lines.append(
f"| {link}{ab} | {t['coste']} | {cuenta} | {t['amb']} | {resumen} |"
)
moc_lines.append("")
# ----- Seccion recon (funciones del registry, no sitios web) -----
moc_lines.append("## Recon de red (registry, ejecutable)")
moc_lines.append("")
moc_lines.append(
"Estas NO son sitios web: son funciones del registry (grupo `recon`, dominio `cybersecurity`) "
"que se ejecutan con `fn run` y que **archivan su resultado por objetivo** en el vault + DuckDB. "
"Doctrina: *todo escaneo se guarda siempre*. Pagina madre: `docs/capabilities/recon.md`."
)
moc_lines.append("")
moc_lines.append("| Herramienta | Registry ID | Modo | Sudo | Para que |")
moc_lines.append("|---|---|---|---|---|")
for t in RECON_TOOLS:
sudo = t["sudo"] if isinstance(t["sudo"], str) else ("si" if t["sudo"] else "no")
modo_tag = " ⚠️activo" if t["modo"] == "activo" else ""
link = f"[[{t['slug']}\\|{t['nombre']}]]"
moc_lines.append(
f"| {link}{modo_tag} | `{t['rid']}` | {t['modo']} | {sudo} | {t['para'].strip()} |"
)
moc_lines.append("")
moc_lines.append("**Patron canonico (recon + archivado por objetivo en 1 call):**")
moc_lines.append("")
moc_lines.append("```bash")
moc_lines.append("fn run recon_osint <target> <whois|rdap|dns|ping|traceroute|nmap>")
moc_lines.append("```")
moc_lines.append("")
moc_lines.append("Cada call deja:")
moc_lines.extend(RECON_PERSIST_LINES)
moc_lines.append("")
moc_lines.append("## Notas de uso")
moc_lines.append("")
moc_lines.append(
"- **Pasivo vs activo:** todo este indice es OSINT pasivo (fuentes publicas y de terceros). "
"Lanzar un scan que toque el sistema del objetivo (p.ej. un scan nuevo en urlscan, loguearse en una cuenta) "
"deja de ser pasivo."
)
moc_lines.append(
"- **Geolocalizacion de una foto (flujo tipico):** 1) `[[exiftool]]`/`[[jimpl]]` por si hay GPS en metadatos; "
"2) reverse image con `[[yandex-images]]` + `[[google-lens]]`; 3) confirmar en `[[google-maps]]`/`[[google-earth]]` + `[[mapillary]]`; "
"4) datar por sombras con `[[suncalc]]`/`[[shadowmap]]`; 5) acotar features con `[[overpass-turbo]]`."
)
moc_lines.append(
"- **España:** para inmuebles/direcciones cruzar `[[catastro]]` + `[[idealista]]`/`[[fotocasa]]`; "
"para empresas y cargos `[[borme]]`/`[[libreborme]]`."
)
moc_lines.append(
"- **Brechas y credenciales** (`[[dehashed]]`, `[[leakcheck]]`, `[[intelligence-x]]`): manejar con cautela legal/etica."
)
moc_lines.append(
"- **Recon de red:** `[[ping-host]]`, `[[traceroute-host]]` y sobre todo `[[nmap-scan]]` son ACTIVOS "
"(tocan al objetivo). Solo contra infra PROPIA o con autorizacion explicita. `[[whois-lookup]]`/`[[rdap-lookup]]`/`[[dns-records]]` "
"son pasivos. Todo lo que ejecutes con `[[recon-osint]]` queda archivado por objetivo en el vault."
)
moc_fm = {
"tipo": "index",
"tags": ["osint", "moc", "herramientas"],
}
moc_path = create_obsidian_note(
vault_dir=VAULT,
rel_path="herramientas/_indice.md",
body="\n".join(moc_lines),
frontmatter=moc_fm,
overwrite=True,
)
print(f"Fichas creadas: {len(created)}")
print(f"MOC: {moc_path}")
# Resumen por categoria
from collections import Counter
c = Counter(cat for cat, _, _ in created)
for cat in CAT_ORDER:
if c.get(cat):
print(f" {cat:18} {c[cat]}")
if __name__ == "__main__":
main()
+870
View File
@@ -0,0 +1,870 @@
#!/usr/bin/env python3
"""Importa contactos de Google (vCard export) al vault OSINT como fichas de
persona y organizacion, clasificando con LLM y creando relaciones
persona <-> organizacion.
Flujo:
1. Parsear el .vcf con split_vcards (grupo `dav`). Extraer FN, TEL*, EMAIL*, ORG, TITLE.
2. Filtrar ruido/servicio (numeros de operadora, recordatorios, sin >=3 letras).
3. Clasificar con ask_llm (grupo `claude-direct`) por lotes de ~40, pidiendo JSON estricto.
4. Dedup contra personas/*.md existentes (match por slug exacto o subconjunto de tokens).
5. Generar fichas siguiendo projects/osint/CONVENTIONS.md (frontmatter canonico 3b).
Modos:
--dry-run (DEFAULT) no escribe nada; imprime resumen + muestra de 15.
--apply escribe de verdad usando funciones del grupo `obsidian`.
Tool de PROYECTO (vive en projects/osint/tools/). NO es funcion del registry,
NO se indexa. Idempotente: re-ejecutar no duplica (dedup por slug).
"""
import sys
import os
import re
import json
import argparse
import datetime
sys.path.insert(0, "/home/enmanuel/fn_registry/python/functions")
from infra.split_vcards import split_vcards # noqa: E402
from core.ask_llm import ask_llm # noqa: E402
from obsidian import ( # noqa: E402
slugify_obsidian_name,
list_obsidian_notes,
read_obsidian_note,
create_obsidian_note,
update_obsidian_note,
)
OSINT = "/home/enmanuel/Obsidian/osint"
VCF_PATH = "/home/enmanuel/Downloads/contacts.vcf"
FUENTE = "Google Contacts export 2026-06-11"
LLM_MODEL = "claude-haiku-4-5-20251001"
BATCH_SIZE = 40
# Topónimos locales que el LLM tiende a confundir con organizaciones cuando
# vienen como sufijo del nombre del contacto (p.ej. "Adrian Quinto Almachar").
# Un lugar NUNCA se convierte en organizacion ni en relacion. (slugificados)
_PLACE_BLOCKLIST = {
"almachar", "barcelona", "madrid", "malaga", "velez-malaga", "velez",
"aliaguilla", "chamana", "axarquia", "torre-del-mar", "torrox", "nerja",
"comares", "benamargosa", "moclinejo", "iznate", "cutar",
}
# Frontmatter canonico de persona (CONVENTIONS.md seccion 3b), en orden.
PERSON_CANON = [
"tipo", "nombre", "slug", "aliases", "sexo", "fecha_nacimiento", "dni",
"telefono", "email", "direccion", "pais", "relaciones", "contexto",
"fuente", "tags",
]
# Frontmatter de organizacion (CONVENTIONS.md secciones 6 y 3b adaptado).
ORG_CANON = [
"tipo", "nombre", "slug", "aliases", "telefono", "email", "direccion",
"pais", "relaciones", "contexto", "fuente", "tags",
]
# --------------------------------------------------------------------------
# 1. Parseo de vCards
# --------------------------------------------------------------------------
def _unfold(vcard_text: str) -> str:
"""Deshace el folding de lineas de vCard (continuacion con espacio/tab)."""
return re.sub(r"\r?\n[ \t]", "", vcard_text)
def _vcard_values(vcard_text: str, prop: str) -> list:
"""Devuelve todos los valores de una propiedad (p.ej. TEL, EMAIL).
Acepta la forma `PROP;PARAMS:valor` y `PROP:valor`. Decodifica escapes
simples de vCard (\\, , \\;, \\n) en el valor.
"""
vals = []
for line in vcard_text.splitlines():
m = re.match(rf"^(?:item\d+\.)?{prop}(?:;[^:]*)?:(.*)$", line, re.IGNORECASE)
if m:
v = m.group(1).strip()
v = v.replace("\\,", ",").replace("\\;", ";").replace("\\n", " ").replace("\\\\", "\\")
v = v.strip()
if v:
vals.append(v)
return vals
def parse_vcard(vcard_text: str) -> dict:
"""Extrae FN, todos los TEL, todos los EMAIL, ORG y TITLE de una vCard."""
txt = _unfold(vcard_text)
fn_vals = _vcard_values(txt, "FN")
org_vals = _vcard_values(txt, "ORG")
org = ""
if org_vals:
# ORG viene como `Empresa;Departamento`. Quitar componentes vacios.
org = " ".join(p.strip() for p in org_vals[0].split(";") if p.strip())
return {
"fn": fn_vals[0] if fn_vals else "",
"tels": _dedup_keep_order(_vcard_values(txt, "TEL")),
"emails": _dedup_keep_order(_vcard_values(txt, "EMAIL")),
"org": org,
"title": (_vcard_values(txt, "TITLE") or [""])[0],
}
def _dedup_keep_order(items: list) -> list:
seen, out = set(), []
for it in items:
key = it.strip().lower()
if key and key not in seen:
seen.add(key)
out.append(it.strip())
return out
# --------------------------------------------------------------------------
# 2. Filtro de ruido/servicio
# --------------------------------------------------------------------------
# Patrones de nombre que delatan numeros de servicio / recordatorios.
_SERVICE_NAME_RE = re.compile(
r"^\*" # empieza por *
r"|^\d{3,5}\b" # codigo corto al inicio (1200, 22122)
r"|att\.?\s*cliente"
r"|buz[oó]n|buzon"
r"|voicemail|voice\s*mail"
r"|gestiona|consulta\b|informaci[oó]n|recarga"
r"|servicio\s+al\s+cliente",
re.IGNORECASE,
)
def is_service(name: str) -> bool:
"""True si el contacto es ruido de operadora / recordatorio / sin nombre real."""
n = (name or "").strip()
if not n:
return True
if _SERVICE_NAME_RE.search(n):
return True
# menos de 3 letras = no es un nombre humano ni de negocio real
letters = re.sub(r"[^A-Za-zÀ-ÿñÑ]", "", n)
if len(letters) < 3:
return True
return False
# --------------------------------------------------------------------------
# 4. Dedup contra fichas existentes
# --------------------------------------------------------------------------
# Tokens demasiado comunes para fundamentar un match por subconjunto.
_STOP_TOKENS = {"de", "del", "la", "las", "el", "los", "y", "san", "da", "do"}
# Nombres de pila muy comunes: compartir SOLO estos no basta para deducir que
# dos contactos son la misma persona (hay decenas de "Antonio", "Maria", "Jose").
# Un match por subconjunto exige al menos un token distintivo fuera de esta lista
# (tipicamente un apellido).
_COMMON_GIVEN = {
"antonio", "jose", "juan", "maria", "manuel", "carlos", "francisco",
"javier", "david", "miguel", "angel", "luis", "pedro", "pablo", "rafael",
"fernando", "sergio", "alberto", "alejandro", "daniel", "jesus", "marcos",
"ana", "carmen", "cristina", "laura", "marta", "lucia", "elena", "sara",
"paula", "raquel", "gema", "lorena", "natalia", "silvia", "rosa", "isabel",
"dani", "javi", "manolo", "paco", "pepe", "alex", "nacho", "mari", "lola",
}
def _name_tokens(name: str) -> set:
slug = slugify_obsidian_name(name or "")
return {t for t in slug.split("-") if t and t not in _STOP_TOKENS}
def load_existing_persons() -> list:
"""Carga (slug, nombre, token_set) de cada ficha de persona del vault."""
out = []
for p in list_obsidian_notes(OSINT, subfolder="personas"):
base = os.path.splitext(os.path.basename(p))[0]
if base.startswith("_"):
continue
try:
fm = read_obsidian_note(p)["frontmatter"]
except Exception:
fm = {}
nombre = fm.get("nombre") or base.replace("-", " ")
out.append({
"slug": base,
"path": p,
"nombre": nombre,
"tokens": _name_tokens(nombre) or _name_tokens(base),
})
return out
def load_existing_orgs() -> dict:
"""Mapa slug -> path de las organizaciones existentes."""
out = {}
for p in list_obsidian_notes(OSINT, subfolder="organizaciones"):
base = os.path.splitext(os.path.basename(p))[0]
if base.startswith("_"):
continue
out[base] = p
return out
def _distinctive(tokens: set) -> bool:
"""True si el conjunto de tokens incluye al menos uno distintivo (apellido):
longitud >=4 y fuera de los nombres de pila ultra-comunes."""
return any(len(t) >= 4 and t not in _COMMON_GIVEN for t in tokens)
def match_existing_person(name: str, existing: list):
"""Busca una persona existente que case con `name`. Conservador a proposito.
Se considera la MISMA persona solo si:
- slug exacto, o
- los tokens del nombre de contacto son subconjunto de los de una ficha
existente (forma menos especifica del mismo nombre), compartiendo
>=2 tokens, ambos con >=2 tokens, y con al menos un token distintivo
(apellido) en el solape.
Esto cubre el caso del estandar ("Manuel Gutierrez" subset de "Manuel
Gutierrez Gamez") y RECHAZA fusiones erroneas por nombre de pila comun
("Antonio", "Maria") o por dos given names compartidos ("Maria Jose" vs
"Jose Maria ..."). Ante la duda, NO casa: se prefiere crear una ficha
nueva (un duplicado es recuperable; una fusion erronea corrompe una
investigacion existente).
"""
cand_slug = slugify_obsidian_name(name)
cand_tokens = _name_tokens(name)
if not cand_tokens:
return None
for ex in existing:
if ex["slug"] == cand_slug and cand_slug:
return ex
for ex in existing:
ex_tokens = ex["tokens"]
if len(cand_tokens) < 2 or len(ex_tokens) < 2:
continue
if not (cand_tokens <= ex_tokens):
continue
shared = cand_tokens & ex_tokens
if len(shared) >= 2 and _distinctive(shared):
return ex
return None
# --------------------------------------------------------------------------
# 3. Clasificacion LLM por lotes
# --------------------------------------------------------------------------
_LLM_SYSTEM = (
"Eres un clasificador de contactos telefonicos en espanol. Devuelves SOLO "
"un array JSON valido, sin texto alrededor, sin markdown."
)
_LLM_INSTRUCTIONS = """Clasifica cada contacto de la lista. Devuelve un array JSON con un objeto por contacto, en el MISMO orden, con estos campos:
{"i": <indice entero>, "tipo": "persona"|"organizacion"|"servicio", "persona_nombre": <string|null>, "org_nombre": <string|null>, "rol": <string|null>, "sexo": "hombre"|"mujer"|null}
Reglas:
- tipo="persona" si el contacto es un individuo (nombre de pila + apellidos).
- tipo="organizacion" si es un negocio, empresa, comercio o servicio (fruteria, autoescuela, seguros, banco, taller, tienda, restaurante, clinica...).
- tipo="servicio" si es un numero de operadora, recordatorio o automatismo (raro: ya filtramos la mayoria).
- Si el contacto MEZCLA persona y organizacion, rellena persona_nombre Y org_nombre Y rol.
Ej: "Emilio Villalba Gestor Orange" -> persona_nombre="Emilio Villalba", org_nombre="Orange", rol="gestor".
Ej: "Abdul Fruteria Velez" -> tipo="organizacion", org_nombre="Fruteria Velez", persona_nombre="Abdul", rol="dueno".
- persona_nombre: nombre LIMPIO de la persona (quita el rol y la empresa). null si no hay persona.
- org_nombre: nombre del negocio/empresa asociado. null si no hay.
- rol: gestor, comercial, dueno, empleado, contacto... null si no aplica.
- sexo: deduce del nombre de pila ("hombre"|"mujer"); null si ambiguo o no hay persona.
- Limpia emojis y typos al inferir, pero NO inventes datos.
Contactos:
"""
def _extract_json_array(text: str):
"""Extrae el primer array JSON `[...]` de una respuesta, tolerando texto alrededor."""
if not text:
return None
# intento directo
try:
v = json.loads(text.strip())
if isinstance(v, list):
return v
except Exception:
pass
# buscar el primer '[' y casar corchetes balanceados
start = text.find("[")
if start == -1:
return None
depth = 0
in_str = False
esc = False
for i in range(start, len(text)):
c = text[i]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
continue
if c == '"':
in_str = True
elif c == "[":
depth += 1
elif c == "]":
depth -= 1
if depth == 0:
chunk = text[start:i + 1]
try:
v = json.loads(chunk)
return v if isinstance(v, list) else None
except Exception:
return None
return None
def classify_batch(batch: list, llm_calls: list) -> list:
"""Clasifica un lote de contactos. batch = [(local_idx, contact_dict), ...].
Devuelve lista de dicts de clasificacion alineados por 'i' (local_idx).
Reintenta una vez si el parseo falla; si vuelve a fallar, marca todos como
persona por defecto y lo anota en llm_calls.
"""
lines = []
for idx, c in batch:
extra = []
if c["org"]:
extra.append(f"ORG={c['org']}")
if c["title"]:
extra.append(f"TITLE={c['title']}")
suffix = f" [{'; '.join(extra)}]" if extra else ""
lines.append(f"{idx}. {c['fn']}{suffix}")
prompt = _LLM_INSTRUCTIONS + "\n".join(lines)
for attempt in (1, 2):
try:
resp = ask_llm(prompt, model=LLM_MODEL, system=_LLM_SYSTEM,
max_tokens=4096, echo=False)
except Exception as e: # noqa: BLE001
llm_calls.append({"size": len(batch), "ok": False, "error": f"{type(e).__name__}: {e}", "attempt": attempt})
resp = ""
if not resp:
llm_calls.append({"size": len(batch), "ok": False, "error": "empty response (auth/token?)", "attempt": attempt})
if attempt == 2:
break
continue
arr = _extract_json_array(resp)
if arr is not None:
llm_calls.append({"size": len(batch), "ok": True, "attempt": attempt})
return arr
llm_calls.append({"size": len(batch), "ok": False, "error": "json parse failed", "attempt": attempt})
# fallback: todo persona
return [{"i": idx, "tipo": "persona", "persona_nombre": c["fn"],
"org_nombre": None, "rol": None, "sexo": None,
"_fallback": True} for idx, c in batch]
# --------------------------------------------------------------------------
# 5. Construccion de fichas (planificacion)
# --------------------------------------------------------------------------
def _ordered_frontmatter(values: dict, canon: list) -> dict:
"""Devuelve un dict ordenado segun `canon`, con extras al final."""
fm = {}
for k in canon:
fm[k] = values.get(k)
for k, v in values.items():
if k not in fm:
fm[k] = v
return fm
def _contact_block(tels: list, emails: list) -> str:
"""Seccion ## Contacto con los telefonos/emails extra (mas alla del primero)."""
lines = []
extra_tel = tels[1:]
extra_mail = emails[1:]
if extra_tel or extra_mail:
lines.append("## Contacto")
lines.append("")
for t in extra_tel:
lines.append(f"- telefono: {t}")
for e in extra_mail:
lines.append(f"- email: {e}")
lines.append("")
return "\n".join(lines)
def plan_person(name, sexo, tels, emails, org_slug, org_nombre, rol,
existing_persons, used_person_slugs):
"""Planifica crear o enriquecer una persona. Devuelve dict de plan."""
match = match_existing_person(name, existing_persons)
nombre = name.strip()
if match:
return {
"action": "enrich_person",
"slug": match["slug"],
"path": match["path"],
"nombre_existente": match["nombre"],
"alias_add": nombre,
"tel": tels[0] if tels else None,
"email": emails[0] if emails else None,
"tels": tels,
"emails": emails,
"org_slug": org_slug,
"org_nombre": org_nombre,
"rol": rol,
}
# crear nueva
slug = _resolve_slug(slugify_obsidian_name(nombre) or "contacto", used_person_slugs)
rel = []
if org_slug:
rel.append(f"[[{org_slug}]] — {rol or 'contacto'}")
fm = _ordered_frontmatter({
"tipo": "persona",
"nombre": nombre,
"slug": slug,
"aliases": [],
"sexo": sexo if sexo in ("hombre", "mujer") else None,
"fecha_nacimiento": None,
"dni": None,
"telefono": tels[0] if tels else None,
"email": emails[0] if emails else None,
"direccion": None,
"pais": None,
"relaciones": rel,
"contexto": "google-contacts",
"fuente": FUENTE,
"tags": ["persona", "osint", "contacto"],
}, PERSON_CANON)
body_parts = []
contact = _contact_block(tels, emails)
if contact:
body_parts.append(contact)
if org_slug:
body_parts.append("## Relacionado")
body_parts.append("")
body_parts.append(f"- [[organizaciones/{org_slug}|{org_nombre}]] — {rol or 'contacto'}")
body_parts.append("")
body_parts.append("## Notas")
body_parts.append("")
return {
"action": "create_person",
"slug": slug,
"nombre": nombre,
"frontmatter": fm,
"body": "\n".join(body_parts),
"tel": tels[0] if tels else None,
"email": emails[0] if emails else None,
"org_slug": org_slug,
"org_nombre": org_nombre,
"rol": rol,
}
def _fuzzy_existing_org(slug: str, existing_orgs: dict):
"""Devuelve el slug de una org existente que sea casi-duplicado de `slug`.
Casa cuando uno es prefijo del otro compartiendo >=5 chars de raiz comun
(p.ej. "fenixfood" ~ "fenixfood-sl", "biorganic" ~ "biorganicfood-sl",
"4geekss" ~ "4geeks"). None si no hay casi-duplicado.
"""
for ex in existing_orgs:
a, b = slug, ex
root = a if len(a) <= len(b) else b
longer = b if root is a else a
if len(root) >= 5 and longer.startswith(root):
return ex
# tolerar 1-2 chars de cola repetida ("4geekss" vs "4geeks")
common = os.path.commonprefix([a, b])
if len(common) >= 5 and abs(len(a) - len(b)) <= 2 and (
a[len(common):].strip("s-") == "" or b[len(common):].strip("s-") == ""
):
return ex
return None
def plan_org(org_nombre, tels, emails, existing_orgs, used_org_slugs,
person_slug=None, person_nombre=None, rol=None):
"""Planifica crear (o reutilizar) una organizacion. Devuelve (slug, plan|None).
plan=None si ya existe (en vault o ya planificada en este batch) o si el
nombre es un toponimo (no se crea org de lugar). slug=None si debe ignorarse.
"""
slug = slugify_obsidian_name(org_nombre)
if not slug:
return None, None
# Lugar -> no es organizacion: no crear, no enlazar.
if slug in _PLACE_BLOCKLIST:
return None, None
if slug in existing_orgs or slug in used_org_slugs:
# ya existe: solo enlazar (no crear). Devolvemos el slug, sin plan de creacion.
return slug, None
# Casi-duplicado de una org existente -> reutilizar la existente.
fuzzy = _fuzzy_existing_org(slug, existing_orgs)
if fuzzy:
return fuzzy, None
rel = []
if person_slug:
rel.append(f"[[{person_slug}]] — {rol or 'contacto'}")
fm = _ordered_frontmatter({
"tipo": "organizacion",
"nombre": org_nombre.strip(),
"slug": slug,
"aliases": [],
"telefono": tels[0] if tels else None,
"email": emails[0] if emails else None,
"direccion": None,
"pais": None,
"relaciones": rel,
"contexto": "google-contacts",
"fuente": FUENTE,
"tags": ["organizacion", "osint", "contacto"],
}, ORG_CANON)
body_parts = []
contact = _contact_block(tels, emails)
if contact:
body_parts.append(contact)
if person_slug:
body_parts.append("## Relacionado")
body_parts.append("")
body_parts.append(f"- [[{person_slug}|{person_nombre}]] — {rol or 'contacto'}")
body_parts.append("")
body_parts.append("## Notas")
body_parts.append("")
plan = {
"action": "create_org",
"slug": slug,
"nombre": org_nombre.strip(),
"frontmatter": fm,
"body": "\n".join(body_parts),
}
return slug, plan
def _resolve_slug(base: str, used: set) -> str:
"""Resuelve colisiones de slug con sufijo -2, -3..."""
if base not in used:
used.add(base)
return base
k = 2
while f"{base}-{k}" in used:
k += 1
s = f"{base}-{k}"
used.add(s)
return s
# --------------------------------------------------------------------------
# Orquestacion
# --------------------------------------------------------------------------
def build_plan(contacts, classifications, existing_persons, existing_orgs):
"""Construye la lista de acciones (crear/enriquecer) a partir de la clasificacion."""
by_idx = {}
for c in classifications:
if isinstance(c, dict) and "i" in c:
by_idx[c["i"]] = c
person_plans, org_plans, enrich_plans = [], [], []
relations = [] # (tipo_origen, slug_origen, slug_org, rol)
used_person_slugs = {p["slug"] for p in existing_persons}
used_org_slugs = set()
skipped_service = 0
# indice de personas existentes mutable (para que dedup vea las recien creadas)
persons_index = list(existing_persons)
for idx, contact in contacts:
cls = by_idx.get(idx)
if not cls:
cls = {"tipo": "persona", "persona_nombre": contact["fn"],
"org_nombre": None, "rol": None, "sexo": None}
tipo = (cls.get("tipo") or "persona").lower()
tels = contact["tels"]
emails = contact["emails"]
rol = cls.get("rol")
sexo = cls.get("sexo")
persona_nombre = cls.get("persona_nombre")
org_nombre = cls.get("org_nombre") or contact["org"] or None
if tipo == "servicio":
skipped_service += 1
continue
if tipo == "organizacion":
# crear la org (telefono al de la org); persona asociada si la hay
person_slug = None
person_disp = None
if persona_nombre and len(_name_tokens(persona_nombre)) >= 1:
pmatch = match_existing_person(persona_nombre, persons_index)
if pmatch:
person_slug = pmatch["slug"]
person_disp = pmatch["nombre"]
enrich_plans.append({
"action": "enrich_person", "slug": pmatch["slug"],
"path": pmatch["path"], "nombre_existente": pmatch["nombre"],
"alias_add": persona_nombre, "tel": None, "email": None,
"tels": [], "emails": [],
"org_slug": None, "org_nombre": None, "rol": None,
})
else:
pslug = _resolve_slug(slugify_obsidian_name(persona_nombre) or "contacto", used_person_slugs)
person_slug = pslug
person_disp = persona_nombre.strip()
pfm = _ordered_frontmatter({
"tipo": "persona", "nombre": persona_nombre.strip(), "slug": pslug,
"aliases": [], "sexo": sexo if sexo in ("hombre", "mujer") else None,
"fecha_nacimiento": None, "dni": None, "telefono": None, "email": None,
"direccion": None, "pais": None,
"relaciones": [], # se completa abajo con el org slug
"contexto": "google-contacts", "fuente": FUENTE,
"tags": ["persona", "osint", "contacto"],
}, PERSON_CANON)
person_plans.append({
"action": "create_person", "slug": pslug,
"nombre": persona_nombre.strip(), "frontmatter": pfm,
"body": "## Notas\n", "tel": None, "email": None,
"org_slug": None, "org_nombre": org_nombre, "rol": rol,
"_pending_org_rel": True,
})
persons_index.append({"slug": pslug, "path": None,
"nombre": persona_nombre.strip(),
"tokens": _name_tokens(persona_nombre)})
oslug, oplan = plan_org(org_nombre or contact["fn"], tels, emails,
existing_orgs, used_org_slugs,
person_slug=person_slug, person_nombre=person_disp, rol=rol)
if oslug:
used_org_slugs.add(oslug)
if oplan:
org_plans.append(oplan)
if person_slug:
relations.append(("persona->org", person_slug, oslug, rol))
# completar relacion en el person plan recien creado
for pp in person_plans:
if pp.get("_pending_org_rel") and pp["slug"] == person_slug:
pp["frontmatter"]["relaciones"] = [f"[[{oslug}]] — {rol or 'contacto'}"]
pp["org_slug"] = oslug
pp["body"] = (
"## Relacionado\n\n"
f"- [[organizaciones/{oslug}|{org_nombre}]] — {rol or 'contacto'}\n\n"
"## Notas\n"
)
pp.pop("_pending_org_rel", None)
continue
# tipo == persona
name = persona_nombre or contact["fn"]
org_slug = None
# si la persona trae una org asociada, planificar la org y enlazar
if org_nombre and len(_name_tokens(org_nombre)) >= 1:
oslug, oplan = plan_org(org_nombre, [], [], existing_orgs, used_org_slugs)
if oslug:
used_org_slugs.add(oslug)
org_slug = oslug
if oplan:
# la org no lleva tel/email del contacto (son de la persona)
org_plans.append(oplan)
pplan = plan_person(name, sexo, tels, emails, org_slug, org_nombre, rol,
persons_index, used_person_slugs)
if pplan["action"] == "create_person":
person_plans.append(pplan)
persons_index.append({"slug": pplan["slug"], "path": None,
"nombre": pplan["nombre"],
"tokens": _name_tokens(pplan["nombre"])})
if org_slug:
# backref persona en la org recien planificada
for op in org_plans:
if op["slug"] == org_slug and not op["frontmatter"].get("relaciones"):
op["frontmatter"]["relaciones"] = [f"[[{pplan['slug']}]] — {pplan['rol'] or 'contacto'}"]
else:
enrich_plans.append(pplan)
if org_slug:
relations.append(("persona->org", pplan["slug"], org_slug, rol))
return {
"person_creates": person_plans,
"org_creates": org_plans,
"enriches": enrich_plans,
"relations": relations,
"skipped_service": skipped_service,
}
# --------------------------------------------------------------------------
# Aplicar (solo --apply)
# --------------------------------------------------------------------------
def apply_plan(plan):
"""Escribe las fichas en disco usando funciones del grupo obsidian."""
created_p = created_o = enriched = 0
for pp in plan["person_creates"]:
create_obsidian_note(OSINT, f"personas/{pp['slug']}",
body=pp["body"], frontmatter=pp["frontmatter"],
overwrite=True)
created_p += 1
for op in plan["org_creates"]:
create_obsidian_note(OSINT, f"organizaciones/{op['slug']}",
body=op["body"], frontmatter=op["frontmatter"],
overwrite=True)
created_o += 1
for ep in plan["enriches"]:
path = ep["path"]
if not path or not os.path.exists(path):
continue
note = read_obsidian_note(path)
fm = dict(note["frontmatter"])
# anadir alias del contacto
aliases = fm.get("aliases") or []
if not isinstance(aliases, list):
aliases = [aliases]
if ep["alias_add"] and ep["alias_add"] not in aliases and ep["alias_add"] != fm.get("nombre"):
aliases.append(ep["alias_add"])
# rellenar telefono/email si faltan
if ep.get("tel") and not fm.get("telefono"):
fm["telefono"] = ep["tel"]
if ep.get("email") and not fm.get("email"):
fm["email"] = ep["email"]
update_obsidian_note(path, set_frontmatter={"aliases": aliases,
"telefono": fm.get("telefono"),
"email": fm.get("email")})
enriched += 1
return created_p, created_o, enriched
# --------------------------------------------------------------------------
# Reporte dry-run
# --------------------------------------------------------------------------
def report(plan, stats, llm_calls):
n_create_p = len(plan["person_creates"])
n_enrich = len(plan["enriches"])
n_create_o = len(plan["org_creates"])
n_rel = len(plan["relations"])
print("=" * 64)
print("DRY-RUN — import_google_contacts.py")
print("=" * 64)
print(f"vCards totales en el .vcf .................. {stats['total']}")
print(f"descartados servicio/ruido ................ {stats['filtered']}")
print(f"contactos clasificados con LLM ............ {stats['classified']}")
print(f" de ellos sin telefono ni email .......... {stats['no_contact']}")
print("-" * 64)
print(f"PERSONAS a crear .......................... {n_create_p}")
print(f"PERSONAS a enriquecer (ya existen) ........ {n_enrich}")
print(f"ORGANIZACIONES a crear .................... {n_create_o}")
print(f"RELACIONES persona<->organizacion ......... {n_rel}")
print(f"contactos marcados como servicio (LLM) .... {plan['skipped_service']}")
print(f"colisiones de slug resueltas (sufijo) ..... {stats['slug_collisions']}")
print("-" * 64)
print("Llamadas a ask_llm:")
ok = sum(1 for c in llm_calls if c["ok"])
fail = sum(1 for c in llm_calls if not c["ok"])
print(f" exitosas={ok} fallidas={fail} total_intentos={len(llm_calls)}")
for c in llm_calls:
if not c["ok"]:
print(f" FALLO lote size={c['size']} intento={c['attempt']}: {c.get('error')}")
print("=" * 64)
print("MUESTRA de 15 fichas (nombre -> tipo/accion -> tel/email -> relacion):")
print("-" * 64)
sample = []
for pp in plan["person_creates"]:
rel = f" -> org {pp['org_slug']} ({pp['rol'] or 'contacto'})" if pp.get("org_slug") else ""
sample.append(f"[crear persona] {pp['nombre']} | tel={pp['tel'] or '-'} email={pp['email'] or '-'}{rel}")
for op in plan["org_creates"]:
rels = op["frontmatter"].get("relaciones") or []
rel = f" -> {rels[0]}" if rels else ""
tel = op["frontmatter"].get("telefono")
eml = op["frontmatter"].get("email")
sample.append(f"[crear org] {op['nombre']} | tel={tel or '-'} email={eml or '-'}{rel}")
for ep in plan["enriches"]:
sample.append(f"[enriquecer] {ep['nombre_existente']} (+alias '{ep['alias_add']}', +tel={ep.get('tel') or '-'})")
for line in sample[:15]:
print(" " + line)
if len(sample) < 1:
print(" (sin fichas planificadas)")
print("=" * 64)
# --------------------------------------------------------------------------
# main
# --------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(description="Importa contactos Google al vault OSINT.")
ap.add_argument("--apply", action="store_true",
help="Escribe las fichas en disco. Por defecto: dry-run (no escribe).")
ap.add_argument("--vcf", default=VCF_PATH, help="Ruta al .vcf de contactos.")
ap.add_argument("--limit", type=int, default=0,
help="(debug) limita el numero de contactos clasificados.")
args = ap.parse_args()
if not os.path.exists(args.vcf):
print(f"ERROR: no existe el .vcf: {args.vcf}", file=sys.stderr)
return 1
with open(args.vcf, "r", encoding="utf-8", errors="replace") as f:
vcf_text = f.read()
cards = split_vcards(vcf_text)
total = len(cards)
contacts = []
filtered = 0
for raw in cards:
c = parse_vcard(raw)
if is_service(c["fn"]):
filtered += 1
continue
contacts.append(c)
if args.limit and args.limit > 0:
contacts = contacts[:args.limit]
# indexar contactos
indexed = list(enumerate(contacts))
# clasificar por lotes
llm_calls = []
classifications = []
for start in range(0, len(indexed), BATCH_SIZE):
batch = indexed[start:start + BATCH_SIZE]
classifications.extend(classify_batch(batch, llm_calls))
existing_persons = load_existing_persons()
existing_orgs = load_existing_orgs()
# contar colisiones: comparar slugs base antes de resolver
base_slugs = {}
for _, c in indexed:
s = slugify_obsidian_name(c["fn"])
if s:
base_slugs[s] = base_slugs.get(s, 0) + 1
slug_collisions = sum(v - 1 for v in base_slugs.values() if v > 1)
plan = build_plan(indexed, classifications, existing_persons, existing_orgs)
no_contact = sum(1 for _, c in indexed if not c["tels"] and not c["emails"])
stats = {
"total": total,
"filtered": filtered,
"classified": len(indexed),
"no_contact": no_contact,
"slug_collisions": slug_collisions,
}
report(plan, stats, llm_calls)
if args.apply:
cp, co, en = apply_plan(plan)
print(f"\nAPLICADO: personas creadas={cp} orgs creadas={co} enriquecidas={en}")
else:
print("\n(dry-run: no se escribio nada. Usa --apply para aplicar.)")
return 0
if __name__ == "__main__":
sys.exit(main())