feat(infra): auto-commit con 88 cambios

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-11 00:16:46 +02:00
parent 6bc97df5c0
commit eb8dbf66a1
126 changed files with 10933 additions and 287 deletions
@@ -22,6 +22,21 @@ from .extract_mac_addresses import extract_mac_addresses
from .extract_phone_numbers import extract_phone_numbers
from .extract_iocs import extract_iocs
# OSINT passive atomic functions (grupo osint-passive).
from .extract_exif_metadata import extract_exif_metadata
from .extract_pdf_metadata import extract_pdf_metadata
from .guess_email_formats import guess_email_formats
from .enumerate_username_sites import enumerate_username_sites
from .build_search_dorks import build_search_dorks
from .whois_lookup import whois_lookup
from .dns_records import dns_records
from .enum_subdomains_crtsh import enum_subdomains_crtsh
# OSINT passive enrichment orchestrators (grupo osint-enrich).
from .scan_ficha_attachments_metadata import scan_ficha_attachments_metadata
from .enrich_person_passive import enrich_person_passive
from .enrich_org_passive import enrich_org_passive
__all__ = [
"hash_sha256",
"hash_md5",
@@ -44,4 +59,15 @@ __all__ = [
"extract_mac_addresses",
"extract_phone_numbers",
"extract_iocs",
"extract_exif_metadata",
"extract_pdf_metadata",
"guess_email_formats",
"enumerate_username_sites",
"build_search_dorks",
"whois_lookup",
"dns_records",
"enum_subdomains_crtsh",
"scan_ficha_attachments_metadata",
"enrich_person_passive",
"enrich_org_passive",
]
@@ -0,0 +1,64 @@
---
name: build_search_dorks
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def build_search_dorks(target: str, tipo: str = 'persona', extra_domains: list | None = None) -> list"
description: "Genera consultas (dorks) de motor de busqueda para investigar un target segun su tipo (persona|email|dominio|usuario): frase exacta, site:linkedin, filetype:pdf, intext con cv/curriculum, site:<dominio> filetype:xlsx, dorks de leaks/pastebin para email, redes sociales para usuario, etc. extra_domains acota via site:. OSINT pasivo puro, sin red."
tags: [osint-passive, dork, search, recon, google-dork, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
params:
- name: target
desc: "Cadena objetivo: nombre de persona, email, dominio o username segun el tipo."
- name: tipo
desc: "Uno de 'persona', 'email', 'dominio', 'usuario'. Cualquier otro valor devuelve solo la frase exacta. Default 'persona'."
- name: extra_domains
desc: "Lista opcional de dominios para añadir dorks 'site:<dominio> \"<target>\"' independientemente del tipo."
output: "Lista de strings de dork listos para pegar en un buscador, en orden de generacion."
tested: true
tests:
- "test_persona_genera_dorks_esperados"
- "test_email_genera_dorks_esperados"
- "test_dominio_genera_site_dorks"
- "test_usuario_genera_redes_sociales"
- "test_extra_domains_acota_con_site"
- "test_tipo_desconocido_solo_frase_exacta"
test_file_path: "python/functions/cybersecurity/build_search_dorks_test.py"
file_path: "python/functions/cybersecurity/build_search_dorks.py"
---
## Ejemplo
```python
build_search_dorks("Juan Perez", tipo="persona", extra_domains=["acme.com"])
# ['"Juan Perez"',
# '"Juan Perez" filetype:pdf',
# 'site:linkedin.com/in "Juan Perez"',
# 'site:twitter.com "Juan Perez"',
# 'intext:"Juan Perez" (curriculum OR cv OR resume)',
# '"Juan Perez" (email OR correo OR contacto)',
# '"Juan Perez" filetype:doc OR "Juan Perez" filetype:docx',
# 'site:acme.com "Juan Perez"']
build_search_dorks("empresa.com", tipo="dominio")
# ['"empresa.com"', 'site:empresa.com', 'site:empresa.com filetype:pdf',
# 'site:empresa.com filetype:xlsx', 'site:empresa.com (login OR admin OR dashboard)', ...]
```
## Cuando usarla
Usala cuando ya tengas un target identificado (persona, email, dominio o alias) y quieras una bateria de consultas de buscador listas para pegar manualmente y mapear documentos, perfiles y posibles filtraciones. Encaja despues de `guess_email_formats` (dorks de email) o `enumerate_username_sites` (dorks de usuario/persona) en una investigacion autorizada.
## Gotchas
- Funcion pura: solo genera strings de consulta, NO ejecuta busquedas ni toca la red. Los dorks se pegan a mano en el buscador.
- La sintaxis usa operadores de Google (site:, filetype:, intext:, inurl:); otros buscadores soportan un subconjunto distinto y algunos dorks no funcionaran igual.
- Para tipos no reconocidos devuelve unicamente la frase exacta entre comillas (mas los `extra_domains` si se pasan), no falla.
- Uso solo para investigacion OSINT autorizada; los dorks de leaks/breaches/pastebin pueden devolver datos sensibles cuyo tratamiento esta sujeto a ley.
@@ -0,0 +1,81 @@
"""Genera consultas (dorks) de motor de busqueda para investigar un target.
Funcion pura de OSINT pasivo: a partir de un target y su tipo (persona,
email, dominio o usuario) produce una lista de cadenas de busqueda listas
para pegar en un buscador. No toca la red.
"""
def build_search_dorks(
target: str,
tipo: str = "persona",
extra_domains: list | None = None,
) -> list:
"""Genera dorks de motor de busqueda adaptados al tipo de target.
Segun el tipo seleccionado produce las consultas mas utiles para
investigar:
- persona: nombre exacto, en linkedin, con cv/curriculum, en ficheros.
- email: el email entre comillas, en pastebin/breaches, en filetypes.
- dominio: site:dominio, ficheros expuestos, subdominios, paneles.
- usuario: el alias en redes sociales y foros.
`extra_domains` añade dorks `site:<dominio> "<target>"` para acotar la
busqueda a dominios concretos, independientemente del tipo.
Args:
target: cadena objetivo (nombre, email, dominio o username).
tipo: uno de "persona", "email", "dominio", "usuario". Cualquier
otro valor cae al conjunto generico (solo la frase exacta).
extra_domains: lista opcional de dominios para acotar via site:.
Returns:
lista de strings de dork en orden de generacion (sin deduplicar
salvo el dork de frase exacta, que es la base comun).
"""
q = f'"{target}"' # frase exacta, base de casi todo dork
dorks = [q]
t = tipo.strip().lower()
if t == "persona":
dorks += [
f"{q} filetype:pdf",
f'site:linkedin.com/in {q}',
f'site:twitter.com {q}',
f'intext:{q} (curriculum OR cv OR resume)',
f'{q} (email OR correo OR contacto)',
f'{q} filetype:doc OR {q} filetype:docx',
]
elif t == "email":
dorks += [
f'{q} site:pastebin.com',
f'{q} (leak OR breach OR dump OR password)',
f'{q} filetype:txt',
f'{q} filetype:csv',
f'{q} site:github.com',
]
elif t == "dominio":
dorks += [
f'site:{target}',
f'site:{target} filetype:pdf',
f'site:{target} filetype:xlsx',
f'site:{target} (login OR admin OR dashboard)',
f'site:{target} intext:(password OR contraseña)',
f'site:*.{target}',
f'-www site:{target}',
]
elif t == "usuario":
dorks += [
f'{q} site:github.com',
f'{q} site:reddit.com',
f'{q} (profile OR perfil OR user OR usuario)',
f'inurl:{target}',
]
# Cualquier otro tipo: solo la frase exacta (ya añadida).
if extra_domains:
for dom in extra_domains:
dorks.append(f'site:{dom} {q}')
return dorks
@@ -0,0 +1,56 @@
"""Tests para build_search_dorks."""
from build_search_dorks import build_search_dorks
def test_persona_genera_dorks_esperados():
"""El tipo persona produce frase exacta, linkedin, cv y filetypes."""
out = build_search_dorks("Juan Perez", tipo="persona")
assert '"Juan Perez"' in out
assert '"Juan Perez" filetype:pdf' in out
assert any("linkedin.com" in d for d in out)
assert any("curriculum OR cv OR resume" in d for d in out)
def test_email_genera_dorks_esperados():
"""El tipo email busca en pastebin, breaches y filetypes."""
out = build_search_dorks("bob@corp.com", tipo="email")
assert '"bob@corp.com"' in out
assert any("pastebin.com" in d for d in out)
assert any("leak OR breach OR dump OR password" in d for d in out)
assert '"bob@corp.com" filetype:csv' in out
def test_dominio_genera_site_dorks():
"""El tipo dominio produce site: y ficheros expuestos."""
out = build_search_dorks("empresa.com", tipo="dominio")
assert "site:empresa.com" in out
assert "site:empresa.com filetype:xlsx" in out
assert any("login OR admin OR dashboard" in d for d in out)
assert "site:*.empresa.com" in out
def test_usuario_genera_redes_sociales():
"""El tipo usuario busca en github, reddit y por inurl."""
out = build_search_dorks("jdoe", tipo="usuario")
assert '"jdoe"' in out
assert any("github.com" in d for d in out)
assert any("reddit.com" in d for d in out)
assert "inurl:jdoe" in out
def test_extra_domains_acota_con_site():
"""extra_domains añade dorks site:<dominio> con la frase exacta."""
out = build_search_dorks(
"Ana Ruiz", tipo="persona", extra_domains=["uni.edu", "gov.es"]
)
assert 'site:uni.edu "Ana Ruiz"' in out
assert 'site:gov.es "Ana Ruiz"' in out
def test_tipo_desconocido_solo_frase_exacta():
"""Un tipo no reconocido devuelve solo la frase exacta (mas extras)."""
out = build_search_dorks("algo", tipo="otracosa")
assert out[0] == '"algo"'
# Sin extras y tipo desconocido, solo la frase base.
assert out == ['"algo"']
@@ -0,0 +1,58 @@
---
name: dns_records
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def dns_records(dominio: str, types: list | None = None) -> dict"
description: "Recoleccion OSINT pasiva de registros DNS de un dominio ejecutando `dig +short <tipo> <dominio>` por subprocess para cada tipo (default A, AAAA, MX, TXT, NS, CNAME). Parsea la salida (una linea por valor) y devuelve un dict {tipo: [valores]}. Pasivo: solo consulta DNS publico."
tags: [osint-passive, dns, recon, cybersecurity, dig]
params:
- name: dominio
desc: "Dominio a resolver, ej. organic-machine.com. Vacio lanza RuntimeError."
- name: types
desc: "Lista de tipos de registro DNS (ej. ['A','MX']). None usa los 6 defaults: A, AAAA, MX, TXT, NS, CNAME."
output: "dict {tipo: [valores]} con una clave por tipo consultado; cada valor es la lista de lineas devueltas por `dig +short` para ese tipo (lista vacia si no hay registro o el dominio no existe)."
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_parsea_salida_de_dig", "test_dominio_inexistente_listas_vacias", "test_usa_tipos_default", "test_timeout_devuelve_lista_vacia", "test_dominio_vacio_lanza_error"]
test_file_path: "python/functions/cybersecurity/dns_records_test.py"
file_path: "python/functions/cybersecurity/dns_records.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from cybersecurity import dns_records
# Resolver todos los tipos por defecto
records = dns_records("organic-machine.com")
print(records["A"]) # ['135.125.201.30']
print(records["MX"]) # ['10 mail.organic-machine.com.', ...]
# Solo los tipos que interesan
solo_a_mx = dns_records("organic-machine.com", types=["A", "MX"])
```
## Cuando usarla
Usala al iniciar el reconocimiento pasivo de un dominio para mapear su
infraestructura DNS publica (IPs, servidores de correo, nameservers, TXT con
SPF/DKIM/verificaciones). Es el primer paso barato antes de enumerar
subdominios o consultar RDAP.
## Gotchas
- Requiere el binario `dig` en PATH (paquete `dnsutils`/`bind-utils`). Si falta, lanza `RuntimeError`.
- Cada consulta tiene timeout de 10s; si una expira, esa clave queda como lista vacia y el resto continua.
- La salida es la de `dig +short` cruda: los MX incluyen prioridad ("10 mail..."), los nombres pueden venir con punto final (FQDN), y los TXT entre comillas. No se normaliza para mantener fidelidad.
- Un dominio inexistente o sin un registro concreto devuelve lista vacia (no error): distingue "sin datos" mirando las listas vacias.
- Resuelve contra el resolver configurado en el sistema; resultados pueden variar segun el DNS recursivo usado.
@@ -0,0 +1,63 @@
"""Recoleccion OSINT pasiva de registros DNS via el binario `dig`.
Funcion IMPURA: ejecuta `dig +short <tipo> <dominio>` como subprocess para
cada tipo de registro y parsea la salida (una linea por valor). Es OSINT
pasivo: consulta DNS publico, no envia trafico intrusivo al objetivo.
"""
import subprocess
DEFAULT_TYPES = ["A", "AAAA", "MX", "TXT", "NS", "CNAME"]
def dns_records(dominio: str, types: list | None = None) -> dict:
"""Resuelve registros DNS de un dominio ejecutando `dig +short`.
Para cada tipo en ``types`` ejecuta ``dig +short <tipo> <dominio>`` y
parsea la salida: cada linea no vacia es un valor del registro. Un
dominio inexistente (o un registro ausente) produce una lista vacia.
Args:
dominio: Dominio a resolver (ej. ``"organic-machine.com"``).
types: Lista de tipos de registro DNS (ej. ``["A", "MX"]``). Si es
None usa los defaults: A, AAAA, MX, TXT, NS, CNAME.
Returns:
Dict ``{tipo: [valores]}`` con una clave por tipo consultado. Cada
valor es la lista de lineas devueltas por dig para ese tipo.
Raises:
RuntimeError: Si el binario `dig` no esta instalado o el dominio
esta vacio.
"""
if not dominio or not dominio.strip():
raise RuntimeError("dns_records: dominio vacio")
query_types = types if types is not None else list(DEFAULT_TYPES)
result: dict = {}
for record_type in query_types:
try:
proc = subprocess.run(
["dig", "+short", record_type, dominio],
capture_output=True,
text=True,
timeout=10.0,
)
except FileNotFoundError as e:
raise RuntimeError(
"dns_records: binario `dig` no encontrado en PATH"
) from e
except subprocess.TimeoutExpired:
# Timeout en una consulta concreta: dejamos lista vacia y seguimos.
result[record_type] = []
continue
values = [
line.strip()
for line in proc.stdout.splitlines()
if line.strip()
]
result[record_type] = values
return result
@@ -0,0 +1,90 @@
"""Tests para dns_records."""
import os
import subprocess
import sys
sys.path.insert(0, os.path.dirname(__file__))
from dns_records import dns_records
class _FakeProc:
def __init__(self, stdout: str):
self.stdout = stdout
self.returncode = 0
def test_parsea_salida_de_dig(monkeypatch):
"""Cada linea no vacia de dig se convierte en un valor del registro."""
fixtures = {
"A": "135.125.201.30\n",
"MX": "10 mail.organic-machine.com.\n20 mail2.organic-machine.com.\n",
"TXT": '"v=spf1 -all"\n',
}
def fake_run(cmd, **kwargs):
record_type = cmd[2]
return _FakeProc(fixtures.get(record_type, ""))
monkeypatch.setattr(subprocess, "run", fake_run)
result = dns_records("organic-machine.com", types=["A", "MX", "TXT"])
assert result["A"] == ["135.125.201.30"]
assert result["MX"] == [
"10 mail.organic-machine.com.",
"20 mail2.organic-machine.com.",
]
assert result["TXT"] == ['"v=spf1 -all"']
def test_dominio_inexistente_listas_vacias(monkeypatch):
"""Salida vacia de dig (dominio inexistente) produce listas vacias."""
def fake_run(cmd, **kwargs):
return _FakeProc("")
monkeypatch.setattr(subprocess, "run", fake_run)
result = dns_records("nope-no-existe-xyz.invalid", types=["A", "AAAA"])
assert result == {"A": [], "AAAA": []}
def test_usa_tipos_default(monkeypatch):
"""Sin types consulta los 6 tipos por defecto."""
consultados = []
def fake_run(cmd, **kwargs):
consultados.append(cmd[2])
return _FakeProc("")
monkeypatch.setattr(subprocess, "run", fake_run)
result = dns_records("organic-machine.com")
assert set(result.keys()) == {"A", "AAAA", "MX", "TXT", "NS", "CNAME"}
assert consultados == ["A", "AAAA", "MX", "TXT", "NS", "CNAME"]
def test_timeout_devuelve_lista_vacia(monkeypatch):
"""Un timeout en una consulta deja lista vacia y no aborta el resto."""
def fake_run(cmd, **kwargs):
raise subprocess.TimeoutExpired(cmd, 10.0)
monkeypatch.setattr(subprocess, "run", fake_run)
result = dns_records("organic-machine.com", types=["A"])
assert result == {"A": []}
def test_dominio_vacio_lanza_error():
"""Dominio vacio lanza RuntimeError."""
try:
dns_records("")
assert False, "deberia haber lanzado RuntimeError"
except RuntimeError:
pass
@@ -0,0 +1,52 @@
---
name: enrich_org_passive
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def enrich_org_passive(dominio: str) -> dict"
description: "Orquestador OSINT pasivo: perfil de una organizacion por su dominio usando solo fuentes publicas. Compone whois_lookup (registro WHOIS), dns_records (registros DNS) y enum_subdomains_crtsh (subdominios via Certificate Transparency / crt.sh). Devuelve whois + dns + subdomains."
tags: [osint-enrich, osint-passive, cybersecurity, org, whois, dns, subdomains]
uses_functions: [whois_lookup_py_cybersecurity, dns_records_py_cybersecurity, enum_subdomains_crtsh_py_cybersecurity]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
params:
- name: dominio
desc: "dominio de la organizacion, p.ej. organic-machine.com. No puede estar vacio (ValueError). Se hace strip de espacios"
output: "dict con whois (salida de whois_lookup(dominio)), dns (salida de dns_records(dominio)) y subdomains (salida de enum_subdomains_crtsh(dominio))"
tested: true
tests: ["test_golden_compone_whois_dns_subdomains", "test_dominio_vacio_lanza", "test_dominio_con_espacios_se_normaliza"]
test_file_path: "python/functions/cybersecurity/enrich_org_passive_test.py"
file_path: "python/functions/cybersecurity/enrich_org_passive.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from cybersecurity import enrich_org_passive
res = enrich_org_passive("organic-machine.com")
print(res["whois"]["registrar"]) # registrar segun WHOIS
print(res["dns"].get("A")) # registros A del dominio
print(res["subdomains"][:5]) # primeros subdominios vistos en crt.sh
```
## Cuando usarla
- Cuando arrancas la ficha OSINT de una organizacion y quieres su perfil pasivo (quien registro el dominio, su DNS y su superficie de subdominios) sin enviar trafico ofensivo a la infraestructura.
- Como reconocimiento previo, completamente pasivo, antes de cualquier evaluacion activa (que requeriria otra autorizacion explicita).
- Para mapear de un golpe la huella publica de un dominio: WHOIS + DNS + Certificate Transparency en una sola llamada.
## Gotchas
- **Uso solo para investigacion autorizada.** El reconocimiento de infraestructura ajena debe contar con permiso del propietario o ampararse en una base legitima.
- Funcion IMPURA: hace consultas de red (WHOIS, DNS y HTTP a crt.sh). Es pasiva respecto al objetivo (no le envia trafico ofensivo), pero **deja huella** en los servicios consultados (logs de crt.sh, resolvers DNS, servidores WHOIS).
- La salida depende de la disponibilidad y rate-limiting de las fuentes: WHOIS puede venir truncado/redactado (GDPR), crt.sh puede limitar o tardar, y algunos registros DNS pueden no existir. Maneja claves ausentes en el dict resultante.
- crt.sh solo ve subdominios que hayan emitido certificados TLS publicos: la lista NO es exhaustiva (no incluye subdominios sin cert o con certs privados).
@@ -0,0 +1,47 @@
"""Orquestador OSINT pasivo: perfil de una organizacion por su dominio.
Compone funciones atomicas del registro (`whois_lookup`, `dns_records`,
`enum_subdomains_crtsh`) para construir un perfil pasivo de una organizacion a
partir de su dominio, usando solo fuentes publicas (WHOIS, DNS, Certificate
Transparency via crt.sh) sin contactar directamente con la infraestructura del
objetivo mas alla de las consultas DNS estandar.
Funcion IMPURA: hace consultas de red (WHOIS, DNS, HTTP a crt.sh).
"""
from cybersecurity import dns_records, enum_subdomains_crtsh, whois_lookup
def enrich_org_passive(dominio: str) -> dict:
"""Construye un perfil OSINT pasivo de una organizacion por su dominio.
Agrega tres fuentes publicas: el registro WHOIS del dominio, sus registros
DNS y los subdominios observados en Certificate Transparency (crt.sh).
Args:
dominio: dominio de la organizacion, p.ej. `organic-machine.com`.
Returns:
dict con las claves:
- whois: salida de whois_lookup(dominio).
- dns: salida de dns_records(dominio).
- subdomains: salida de enum_subdomains_crtsh(dominio).
Raises:
ValueError: si el dominio esta vacio.
"""
if not (dominio or "").strip():
raise ValueError("dominio no puede estar vacio")
dominio = dominio.strip()
# Resiliente a fallo parcial: si una fuente externa falla (p.ej. crt.sh lento o
# rate-limitado), se registra el error y se devuelven las demas igualmente.
result = {"whois": {}, "dns": {}, "subdomains": [], "errors": {}}
for key, fn in (("whois", whois_lookup), ("dns", dns_records),
("subdomains", enum_subdomains_crtsh)):
try:
result[key] = fn(dominio)
except Exception as exc: # noqa: BLE001 — captura amplia a proposito por fuente
result["errors"][key] = f"{type(exc).__name__}: {exc}"
return result
@@ -0,0 +1,87 @@
"""Tests para enrich_org_passive.
Las funciones compuestas (whois_lookup, dns_records, enum_subdomains_crtsh) se
monkeypatchean para no tocar la red. Solo se verifica la orquestacion: que
ensambla las tres fuentes bajo las claves correctas y normaliza el dominio.
"""
import importlib
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from cybersecurity import enrich_org_passive
# El paquete re-exporta la funcion bajo el mismo nombre que el submodulo; para
# parchear los globals del modulo lo tomamos via importlib.
mod = importlib.import_module("cybersecurity.enrich_org_passive")
def test_golden_compone_whois_dns_subdomains(monkeypatch):
monkeypatch.setattr(mod, "whois_lookup", lambda d: {"domain": d, "registrar": "OVH"})
monkeypatch.setattr(mod, "dns_records", lambda d: {"A": ["1.2.3.4"], "MX": ["mx.x"]})
monkeypatch.setattr(mod, "enum_subdomains_crtsh", lambda d: [f"www.{d}", f"api.{d}"])
res = enrich_org_passive("organic-machine.com")
assert res["whois"] == {"domain": "organic-machine.com", "registrar": "OVH"}
assert res["dns"] == {"A": ["1.2.3.4"], "MX": ["mx.x"]}
assert res["subdomains"] == ["www.organic-machine.com", "api.organic-machine.com"]
assert set(res.keys()) == {"whois", "dns", "subdomains", "errors"}
assert res["errors"] == {} # sin fallos cuando todas las fuentes responden
def test_fuente_que_falla_se_captura_en_errors(monkeypatch):
"""Si una fuente externa peta (p.ej. crt.sh), se registra en errors y el resto se devuelve."""
monkeypatch.setattr(mod, "whois_lookup", lambda d: {"registrar": "OVH"})
monkeypatch.setattr(mod, "dns_records", lambda d: {"A": ["1.2.3.4"]})
def boom(d):
raise RuntimeError("crt.sh 404")
monkeypatch.setattr(mod, "enum_subdomains_crtsh", boom)
res = enrich_org_passive("organic-machine.com")
assert res["whois"] == {"registrar": "OVH"}
assert res["dns"] == {"A": ["1.2.3.4"]}
assert res["subdomains"] == []
assert "subdomains" in res["errors"] and "crt.sh 404" in res["errors"]["subdomains"]
def test_dominio_vacio_lanza(monkeypatch):
monkeypatch.setattr(mod, "whois_lookup", lambda d: {})
monkeypatch.setattr(mod, "dns_records", lambda d: {})
monkeypatch.setattr(mod, "enum_subdomains_crtsh", lambda d: [])
with pytest.raises(ValueError):
enrich_org_passive(" ")
def test_dominio_con_espacios_se_normaliza(monkeypatch):
seen = {}
def whois(d):
seen["whois"] = d
return {}
def dns(d):
seen["dns"] = d
return {}
def subs(d):
seen["subs"] = d
return []
monkeypatch.setattr(mod, "whois_lookup", whois)
monkeypatch.setattr(mod, "dns_records", dns)
monkeypatch.setattr(mod, "enum_subdomains_crtsh", subs)
enrich_org_passive(" organic-machine.com ")
# Las tres funciones reciben el dominio ya sin espacios.
assert seen == {
"whois": "organic-machine.com",
"dns": "organic-machine.com",
"subs": "organic-machine.com",
}
@@ -0,0 +1,64 @@
---
name: enrich_person_passive
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def enrich_person_passive(nombre: str, apellidos: str, dominios: list | None = None, usernames: list | None = None) -> dict"
description: "Orquestador OSINT pasivo: genera candidatos de enriquecimiento de una persona SIN tocar al objetivo. Compone guess_email_formats (emails candidatos por cada dominio dado, o gmail/outlook por defecto), enumerate_username_sites (comprobacion de usernames en servicios publicos) y build_search_dorks (dorks tipo persona, que NO se ejecutan, solo se generan)."
tags: [osint-enrich, osint-passive, cybersecurity, person, email, username, dorks]
uses_functions: [guess_email_formats_py_cybersecurity, enumerate_username_sites_py_cybersecurity, build_search_dorks_py_cybersecurity]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
params:
- name: nombre
desc: "nombre de pila de la persona"
- name: apellidos
desc: "apellido(s) de la persona. nombre y apellidos no pueden estar ambos vacios (ValueError)"
- name: dominios
desc: "lista de dominios de correo donde generar formatos de email candidatos. None o vacia => usa los dominios comunes gmail.com/outlook.com"
- name: usernames
desc: "lista de usernames a comprobar en sitios publicos via enumerate_username_sites. None o vacia => no se comprueba ningun username"
output: "dict con email_candidates (lista de emails candidatos NO verificados, deduplicada y ordenada), username_hits (lista de {username, hits} con el resultado de enumerate_username_sites por username) y dorks (lista de dorks de busqueda tipo persona generados pero NO ejecutados)"
tested: true
tests: ["test_golden_compone_emails_usernames_y_dorks", "test_sin_dominios_usa_comunes", "test_sin_usernames_no_comprueba", "test_nombre_y_apellidos_vacios_lanza", "test_emails_deduplicados"]
test_file_path: "python/functions/cybersecurity/enrich_person_passive_test.py"
file_path: "python/functions/cybersecurity/enrich_person_passive.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from cybersecurity import enrich_person_passive
res = enrich_person_passive(
"Juan", "Perez",
dominios=["organic-machine.com"],
usernames=["jperez", "juanp"],
)
print(res["email_candidates"]) # ['juan.perez@organic-machine.com', 'jperez@organic-machine.com', ...]
for u in res["username_hits"]:
print(u["username"], len(u["hits"])) # sitios publicos donde aparece cada username
print(res["dorks"]) # dorks listos para pegar en un buscador (no se ejecutan aqui)
```
## Cuando usarla
- Cuando arrancas la ficha OSINT de una persona y quieres una primera tanda de candidatos (emails probables, presencia de usernames, dorks) sin enviar nada al objetivo ni a su correo.
- Antes de verificar manualmente: genera el espacio de candidatos para que despues confirmes cuales son reales.
- Como paso pasivo previo a cualquier accion activa (que requeriria otra autorizacion y otras funciones).
## Gotchas
- **Uso solo para investigacion autorizada.** Generar candidatos sobre una persona sin base legitima puede vulnerar privacidad/leyes de proteccion de datos.
- Los `email_candidates` son **candidatos NO verificados**: son permutaciones plausibles del nombre, NO emails confirmados. No asumas que existen ni los uses para envio.
- Funcion IMPURA: `enumerate_username_sites` consulta servicios publicos por red, lo que **deja una huella minima** (requests a esos sitios). `build_search_dorks` y `guess_email_formats` son locales.
- Los dorks se **generan pero NO se ejecutan** aqui: ejecutarlos en un buscador es un paso aparte y deja su propia huella.
- Si no aportas `dominios`, se usan gmail.com/outlook.com como heuristica; ajusta la lista a los dominios reales del entorno de la persona para candidatos utiles.
@@ -0,0 +1,79 @@
"""Orquestador OSINT pasivo: genera candidatos de enriquecimiento de una persona.
Compone funciones atomicas del registro (`guess_email_formats`,
`enumerate_username_sites`, `build_search_dorks`) para producir candidatos OSINT
de una persona SIN contactar ni atacar al objetivo. Los dorks se generan pero
NO se ejecutan.
Funcion IMPURA: `enumerate_username_sites` consulta servicios publicos (red).
"""
from cybersecurity import (
build_search_dorks,
enumerate_username_sites,
guess_email_formats,
)
# Dominios de correo comunes usados cuando el caller no aporta dominios propios.
_COMMON_EMAIL_DOMAINS = ("gmail.com", "outlook.com")
def enrich_person_passive(
nombre: str,
apellidos: str,
dominios: list | None = None,
usernames: list | None = None,
) -> dict:
"""Genera candidatos OSINT pasivos para una persona.
Para cada dominio aportado (o los dominios comunes gmail/outlook si no se da
ninguno) genera los formatos de email candidatos. Para cada username
aportado comprueba en que sitios publicos existe. Ademas genera dorks de
busqueda de tipo persona (que NO se ejecutan, solo se devuelven).
Args:
nombre: nombre de pila de la persona.
apellidos: apellido(s) de la persona.
dominios: lista de dominios de correo donde buscar formatos de email.
Si es None o vacia, se usan los dominios comunes gmail/outlook.
usernames: lista de usernames a comprobar en sitios publicos. Si es None
o vacia, no se realiza ninguna comprobacion de username.
Returns:
dict con las claves:
- email_candidates: lista de emails candidatos (no verificados).
- username_hits: lista de resultados de enumerate_username_sites por
cada username comprobado.
- dorks: lista de dorks de busqueda generados (no ejecutados).
Raises:
ValueError: si nombre y apellidos estan ambos vacios.
"""
if not (nombre or "").strip() and not (apellidos or "").strip():
raise ValueError("nombre y apellidos no pueden estar ambos vacios")
target_domains = [d for d in (dominios or []) if d] or list(_COMMON_EMAIL_DOMAINS)
email_candidates: list = []
for dominio in target_domains:
candidates = guess_email_formats(nombre, apellidos, dominio)
if candidates:
email_candidates.extend(candidates)
# Deduplicar preservando orden.
email_candidates = list(dict.fromkeys(email_candidates))
username_hits: list = []
for username in usernames or []:
if not username:
continue
hits = enumerate_username_sites(username)
username_hits.append({"username": username, "hits": hits})
target = f"{nombre} {apellidos}".strip()
dorks = build_search_dorks(target, "persona")
return {
"email_candidates": email_candidates,
"username_hits": username_hits,
"dorks": dorks,
}
@@ -0,0 +1,97 @@
"""Tests para enrich_person_passive.
Las funciones compuestas (guess_email_formats, enumerate_username_sites,
build_search_dorks) se monkeypatchean. Solo se verifica la orquestacion: que
itera por dominios/usernames, deduplica emails, usa dominios comunes por
defecto y genera (sin ejecutar) los dorks.
"""
import importlib
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from cybersecurity import enrich_person_passive
# El paquete re-exporta la funcion bajo el mismo nombre que el submodulo; para
# parchear los globals del modulo lo tomamos via importlib.
mod = importlib.import_module("cybersecurity.enrich_person_passive")
def _patch(monkeypatch, *, emails_fn=None, usernames_fn=None, dorks_fn=None):
monkeypatch.setattr(
mod,
"guess_email_formats",
emails_fn or (lambda n, a, d: [f"{n.lower()}.{a.lower()}@{d}", f"{n[0].lower()}{a.lower()}@{d}"]),
)
monkeypatch.setattr(
mod,
"enumerate_username_sites",
usernames_fn or (lambda u: [{"site": "github", "url": f"https://github.com/{u}", "exists": True}]),
)
monkeypatch.setattr(
mod,
"build_search_dorks",
dorks_fn or (lambda target, tipo: [f'"{target}" {tipo}', f'intext:"{target}"']),
)
def test_golden_compone_emails_usernames_y_dorks(monkeypatch):
_patch(monkeypatch)
res = enrich_person_passive(
"Juan", "Perez",
dominios=["organic-machine.com"],
usernames=["jperez", "juanp"],
)
assert res["email_candidates"] == [
"juan.perez@organic-machine.com",
"jperez@organic-machine.com",
]
assert [u["username"] for u in res["username_hits"]] == ["jperez", "juanp"]
assert res["username_hits"][0]["hits"][0]["site"] == "github"
assert res["dorks"] == ['"Juan Perez" persona', 'intext:"Juan Perez"']
def test_sin_dominios_usa_comunes(monkeypatch):
seen_domains = []
def emails(n, a, d):
seen_domains.append(d)
return [f"{n}@{d}"]
_patch(monkeypatch, emails_fn=emails)
res = enrich_person_passive("Ana", "Lopez")
assert seen_domains == ["gmail.com", "outlook.com"]
assert res["email_candidates"] == ["Ana@gmail.com", "Ana@outlook.com"]
def test_sin_usernames_no_comprueba(monkeypatch):
called = []
_patch(monkeypatch, usernames_fn=lambda u: called.append(u) or [])
res = enrich_person_passive("Ana", "Lopez", dominios=["x.com"])
assert res["username_hits"] == []
assert called == [] # enumerate_username_sites no se invoca
def test_nombre_y_apellidos_vacios_lanza(monkeypatch):
_patch(monkeypatch)
with pytest.raises(ValueError):
enrich_person_passive("", " ")
def test_emails_deduplicados(monkeypatch):
# Dos dominios distintos que devuelven el mismo email -> debe deduplicar.
_patch(monkeypatch, emails_fn=lambda n, a, d: ["dup@same.com", "dup@same.com"])
res = enrich_person_passive("Juan", "Perez", dominios=["a.com", "b.com"])
assert res["email_candidates"] == ["dup@same.com"]
@@ -0,0 +1,66 @@
---
name: enum_subdomains_crtsh
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def enum_subdomains_crtsh(dominio: str, timeout_s: float = 20.0) -> list"
description: "Enumeracion OSINT pasiva de subdominios desde Certificate Transparency (crt.sh). Consulta https://crt.sh/?q=%25.<dominio>&output=json con http_get_json, extrae name_value de cada certificado, separa por saltos de linea, deduplica, filtra wildcards (*.) y devuelve la lista ordenada de subdominios unicos. Pasivo: no toca al objetivo, consulta logs CT publicos."
tags: [osint-passive, subdomains, crtsh, certificate-transparency, recon, cybersecurity]
params:
- name: dominio
desc: "Dominio base a enumerar, ej. organic-machine.com. Vacio lanza RuntimeError."
- name: timeout_s
desc: "Segundos maximo de espera de la peticion HTTP a crt.sh (default 20.0)."
output: "Lista ordenada de subdominios unicos (en minusculas, sin wildcards) que aparecen en certificados emitidos para el dominio. Lista vacia si crt.sh no devuelve resultados."
uses_functions: ["http_get_json_py_infra"]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_dedup_y_orden", "test_filtra_wildcards", "test_respuesta_vacia", "test_respuesta_no_lista_lanza_error", "test_dominio_vacio_lanza_error"]
test_file_path: "python/functions/cybersecurity/enum_subdomains_crtsh_test.py"
file_path: "python/functions/cybersecurity/enum_subdomains_crtsh.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from cybersecurity import enum_subdomains_crtsh
subs = enum_subdomains_crtsh("organic-machine.com")
for s in subs:
print(s)
# api.organic-machine.com
# mail.organic-machine.com
# organic-machine.com
# www.organic-machine.com
```
## Cuando usarla
Usala para descubrir la superficie de subdominios de un objetivo sin enviarle
trafico: los logs de Certificate Transparency listan todos los nombres para
los que se han emitido certificados TLS. Complementa a `dns_records`
(resolucion) y `whois_lookup` (registro) en el reconocimiento pasivo inicial.
## Gotchas
- Es OSINT **pasivo**: consulta los logs CT publicos via crt.sh, NO toca al
dominio objetivo ni resuelve los subdominios encontrados (algunos pueden
estar muertos o no resolver).
- crt.sh suele ir lento o rate-limitear bajo carga; para dominios grandes la
respuesta puede tardar varios segundos o agotar el `timeout_s`. Subir el
timeout o reintentar si falla.
- Solo encuentra subdominios que han tenido certificado TLS emitido y logueado
en CT; subdominios internos sin certificado publico no apareceran.
- Los wildcards (`*.dominio`) se filtran porque no son hosts concretos.
- crt.sh devuelve un array JSON (no un objeto); por eso si la respuesta no es
una lista se lanza `RuntimeError`.
- Puede incluir subdominios de niveles arbitrarios y dominios relacionados que
compartieron certificado SAN; revisa la salida antes de usarla como verdad.
@@ -0,0 +1,68 @@
"""Enumeracion OSINT pasiva de subdominios via Certificate Transparency (crt.sh).
Funcion IMPURA: consulta los logs publicos de Certificate Transparency a
traves de crt.sh y extrae los subdominios que han aparecido en certificados
emitidos para el dominio. Es OSINT pasivo: no toca al dominio objetivo, solo
consulta registros CT publicos.
"""
import os
import sys
sys.path.insert(
0, os.path.join(os.path.dirname(__file__), "..", "..", "functions")
)
from infra.http_get_json import http_get_json # noqa: E402
def enum_subdomains_crtsh(dominio: str, timeout_s: float = 20.0) -> list:
"""Enumera subdominios de un dominio desde Certificate Transparency.
Consulta ``https://crt.sh/?q=%25.<dominio>&output=json`` (``%25`` = ``%``,
el wildcard de crt.sh) usando ``http_get_json`` del registry. crt.sh
devuelve un array JSON de certificados; de cada uno se toma el campo
``name_value`` (que puede contener varios nombres separados por saltos de
linea, uno por SAN). Se separan, deduplican, se filtran los wildcards
(``*.``) y se devuelve la lista ordenada de subdominios unicos.
Args:
dominio: Dominio base a enumerar (ej. ``"organic-machine.com"``).
timeout_s: Segundos maximo de espera de la peticion HTTP (default 20).
Returns:
Lista ordenada de subdominios unicos (sin wildcards). Lista vacia si
crt.sh no devuelve resultados.
Raises:
RuntimeError: Si el dominio esta vacio o la peticion HTTP falla.
"""
if not dominio or not dominio.strip():
raise RuntimeError("enum_subdomains_crtsh: dominio vacio")
dom = dominio.strip().lower()
# %25 es el wildcard '%' de crt.sh: busca cualquier nombre que termine en .<dominio>
url = f"https://crt.sh/?q=%25.{dom}&output=json"
data = http_get_json(url, timeout=timeout_s)
if not isinstance(data, list):
raise RuntimeError(
f"enum_subdomains_crtsh: respuesta crt.sh inesperada "
f"(tipo {type(data).__name__})"
)
found: set = set()
for cert in data:
if not isinstance(cert, dict):
continue
name_value = cert.get("name_value", "")
for raw_name in name_value.split("\n"):
name = raw_name.strip().lower()
if not name:
continue
if name.startswith("*."):
continue
found.add(name)
return sorted(found)
@@ -0,0 +1,81 @@
"""Tests para enum_subdomains_crtsh."""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
import enum_subdomains_crtsh as esc
from enum_subdomains_crtsh import enum_subdomains_crtsh
def _crtsh_sample() -> list:
# crt.sh devuelve un array; name_value puede traer varios SAN separados
# por '\n', con duplicados y wildcards.
return [
{"name_value": "www.organic-machine.com\norganic-machine.com"},
{"name_value": "api.organic-machine.com"},
{"name_value": "www.organic-machine.com"}, # duplicado
{"name_value": "*.organic-machine.com"}, # wildcard, se filtra
{"name_value": "MAIL.Organic-Machine.com"}, # case distinto
]
def test_dedup_y_orden(monkeypatch):
"""Subdominios deduplicados, en minusculas y ordenados."""
monkeypatch.setattr(
esc, "http_get_json", lambda url, timeout=20.0: _crtsh_sample()
)
result = enum_subdomains_crtsh("organic-machine.com")
assert result == [
"api.organic-machine.com",
"mail.organic-machine.com",
"organic-machine.com",
"www.organic-machine.com",
]
def test_filtra_wildcards(monkeypatch):
"""Las entradas '*.dominio' se descartan."""
monkeypatch.setattr(
esc,
"http_get_json",
lambda url, timeout=20.0: [{"name_value": "*.organic-machine.com"}],
)
result = enum_subdomains_crtsh("organic-machine.com")
assert result == []
def test_respuesta_vacia(monkeypatch):
"""crt.sh sin resultados devuelve lista vacia."""
monkeypatch.setattr(esc, "http_get_json", lambda url, timeout=20.0: [])
result = enum_subdomains_crtsh("organic-machine.com")
assert result == []
def test_respuesta_no_lista_lanza_error(monkeypatch):
"""Una respuesta que no es lista lanza RuntimeError."""
monkeypatch.setattr(
esc, "http_get_json", lambda url, timeout=20.0: {"unexpected": "obj"}
)
try:
enum_subdomains_crtsh("organic-machine.com")
assert False, "deberia haber lanzado RuntimeError"
except RuntimeError:
pass
def test_dominio_vacio_lanza_error():
"""Dominio vacio lanza RuntimeError."""
try:
enum_subdomains_crtsh("")
assert False, "deberia haber lanzado RuntimeError"
except RuntimeError:
pass
@@ -0,0 +1,60 @@
---
name: enumerate_username_sites
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def enumerate_username_sites(username: str, timeout_s: float = 8.0, sites: list | None = None) -> list"
description: "Comprueba si un username existe en ~12 sitios publicos (github, twitter/x, instagram, tiktok, reddit, gitlab, keybase, medium, telegram, youtube, pinterest, about.me) consultando la URL de perfil estilo sherlock ligero. Detecta por codigo HTTP (200 existe, 404 no existe). Cada sitio se consulta aislado en try/except con User-Agent de navegador y allow_redirects: un timeout no aborta el resto. OSINT pasivo."
tags: [osint-passive, username, enumeration, recon, identity, sherlock, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [requests]
params:
- name: username
desc: "Nombre de usuario a buscar, sin arroba ni URL (ej. 'jdoe')."
- name: timeout_s
desc: "Timeout en segundos por peticion HTTP. Default 8.0."
- name: sites
desc: "Lista opcional de dicts {'site','url'} donde 'url' lleva el placeholder '{u}'. Si es None usa DEFAULT_SITES (~12 sitios)."
output: "Lista de dicts {'site','url','exists','status'} en el orden de los sitios. 'exists' es True/False/None y 'status' el codigo HTTP (int) o None si la peticion fallo."
tested: true
tests:
- "test_status_200_es_existe"
- "test_status_404_es_no_existe"
- "test_status_otro_es_indeterminado"
- "test_excepcion_por_sitio_no_aborta_el_resto"
- "test_estructura_y_url_formateada"
- "test_lista_por_defecto_tiene_doce_sitios"
test_file_path: "python/functions/cybersecurity/enumerate_username_sites_test.py"
file_path: "python/functions/cybersecurity/enumerate_username_sites.py"
---
## Ejemplo
```python
enumerate_username_sites("torvalds", timeout_s=8.0)
# [{"site": "github", "url": "https://github.com/torvalds", "exists": True, "status": 200},
# {"site": "twitter", "url": "https://x.com/torvalds", "exists": None, "status": 403},
# {"site": "instagram","url": "https://www.instagram.com/torvalds/","exists": False, "status": 404},
# ...]
# Lista propia de sitios:
enumerate_username_sites("jdoe", sites=[{"site": "github", "url": "https://github.com/{u}"}])
```
## Cuando usarla
Usala cuando tengas un username (o alias) y quieras un barrido rapido de presencia en redes/plataformas publicas para mapear la huella digital de un objetivo en una investigacion autorizada. Util tras `guess_email_formats` para pivotar de identidad a perfiles, o como entrada para construir dorks con `build_search_dorks`.
## Gotchas
- **Deja huella**: aunque es recoleccion "pasiva" desde el punto de vista del objetivo, lanza una peticion HTTP real a cada sitio. Esas peticiones quedan en logs del sitio y pueden asociarse a tu IP. Usa proxy/VPN si la investigacion lo requiere.
- **Falsos positivos/negativos por anti-bot**: muchos sitios (instagram, tiktok, x) devuelven 200 con paginas de login/captcha o bloquean por User-Agent, dando exists=True erroneo o status indeterminado. El 200/404 no es garantia; verifica manualmente los hits relevantes.
- **Respeta rate limits**: lanzar muchas comprobaciones seguidas puede activar bloqueos o baneos temporales por IP. Espacia las consultas en barridos grandes.
- **Estados intermedios**: cualquier codigo distinto de 200/404 (301, 403, 429, 5xx) deja `exists=None`; un fallo de red por sitio deja `status=None` y `exists=None` sin abortar el resto.
- **Solo para investigacion autorizada.** No uses esta funcion para acoso, doxing ni vigilancia sin base legal.
@@ -0,0 +1,91 @@
"""Comprueba la existencia de un username en sitios publicos (sherlock ligero).
OSINT pasivo: para un username dado, consulta la URL de perfil de una lista
de sitios conocidos y deduce si la cuenta existe por el codigo de estado
HTTP (200 = existe, 404 = no existe). Cada sitio se consulta de forma
aislada: un fallo (timeout, error de red) no aborta el resto.
"""
import requests
# Cada entrada describe un sitio: como construir la URL de perfil a partir
# del username. La deteccion es por codigo de estado (200 existe / 404 no).
DEFAULT_SITES = [
{"site": "github", "url": "https://github.com/{u}"},
{"site": "twitter", "url": "https://x.com/{u}"},
{"site": "instagram", "url": "https://www.instagram.com/{u}/"},
{"site": "tiktok", "url": "https://www.tiktok.com/@{u}"},
{"site": "reddit", "url": "https://www.reddit.com/user/{u}"},
{"site": "gitlab", "url": "https://gitlab.com/{u}"},
{"site": "keybase", "url": "https://keybase.io/{u}"},
{"site": "medium", "url": "https://medium.com/@{u}"},
{"site": "telegram", "url": "https://t.me/{u}"},
{"site": "youtube", "url": "https://www.youtube.com/@{u}"},
{"site": "pinterest", "url": "https://www.pinterest.com/{u}/"},
{"site": "about_me", "url": "https://about.me/{u}"},
]
_BROWSER_UA = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
def enumerate_username_sites(
username: str,
timeout_s: float = 8.0,
sites: list | None = None,
) -> list:
"""Comprueba si un username existe en una lista de sitios publicos.
Para cada sitio construye la URL de perfil con el username y hace un
GET con User-Agent de navegador siguiendo redirecciones. Interpreta el
codigo de estado final: 200 -> existe, 404 -> no existe, cualquier otro
-> indeterminado (exists=None). Los errores de red por sitio (timeout,
conexion) se capturan y se reportan con status=None y exists=None sin
interrumpir la enumeracion del resto.
Args:
username: nombre de usuario a buscar (sin arroba ni URL).
timeout_s: timeout en segundos por peticion. Default 8.0.
sites: lista opcional de dicts {"site", "url"} donde "url" contiene
el placeholder "{u}". Si es None se usa DEFAULT_SITES.
Returns:
lista de dicts {"site", "url", "exists", "status"} en el mismo orden
que la lista de sitios. "exists" es True/False/None y "status" es el
codigo HTTP (int) o None si la peticion fallo.
"""
targets = sites if sites is not None else DEFAULT_SITES
headers = {"User-Agent": _BROWSER_UA}
results = []
for entry in targets:
site = entry.get("site", "")
url = entry["url"].format(u=username)
status = None
exists = None
try:
resp = requests.get(
url,
headers=headers,
timeout=timeout_s,
allow_redirects=True,
)
status = resp.status_code
if status == 200:
exists = True
elif status == 404:
exists = False
else:
exists = None
except requests.RequestException:
# Timeout, error de conexion, demasiados redirects, etc.
status = None
exists = None
results.append(
{"site": site, "url": url, "exists": exists, "status": status}
)
return results
@@ -0,0 +1,109 @@
"""Tests para enumerate_username_sites (red mockeada con monkeypatch)."""
import requests
import enumerate_username_sites as mod
from enumerate_username_sites import enumerate_username_sites
class _FakeResp:
"""Respuesta HTTP minima con solo status_code."""
def __init__(self, status_code):
self.status_code = status_code
def _make_get(status_by_url):
"""Construye un fake de requests.get que devuelve status por URL."""
def _fake_get(url, **kwargs):
return _FakeResp(status_by_url[url])
return _fake_get
SITES = [
{"site": "alpha", "url": "https://alpha.test/{u}"},
{"site": "beta", "url": "https://beta.test/{u}"},
{"site": "gamma", "url": "https://gamma.test/{u}"},
]
def test_status_200_es_existe(monkeypatch):
"""Un 200 marca exists=True."""
status_map = {
"https://alpha.test/bob": 200,
"https://beta.test/bob": 200,
"https://gamma.test/bob": 200,
}
monkeypatch.setattr(requests, "get", _make_get(status_map))
out = enumerate_username_sites("bob", sites=SITES)
assert all(r["exists"] is True for r in out)
assert all(r["status"] == 200 for r in out)
def test_status_404_es_no_existe(monkeypatch):
"""Un 404 marca exists=False."""
status_map = {
"https://alpha.test/ghost": 404,
"https://beta.test/ghost": 404,
"https://gamma.test/ghost": 404,
}
monkeypatch.setattr(requests, "get", _make_get(status_map))
out = enumerate_username_sites("ghost", sites=SITES)
assert all(r["exists"] is False for r in out)
assert all(r["status"] == 404 for r in out)
def test_status_otro_es_indeterminado(monkeypatch):
"""Un codigo distinto de 200/404 deja exists=None pero conserva status."""
status_map = {
"https://alpha.test/x": 301,
"https://beta.test/x": 403,
"https://gamma.test/x": 500,
}
monkeypatch.setattr(requests, "get", _make_get(status_map))
out = enumerate_username_sites("x", sites=SITES)
assert [r["exists"] for r in out] == [None, None, None]
assert [r["status"] for r in out] == [301, 403, 500]
def test_excepcion_por_sitio_no_aborta_el_resto(monkeypatch):
"""Un fallo de red en un sitio no interrumpe la enumeracion."""
def _fake_get(url, **kwargs):
if "beta" in url:
raise requests.Timeout("simulado")
return _FakeResp(200)
monkeypatch.setattr(requests, "get", _fake_get)
out = enumerate_username_sites("u", sites=SITES)
assert len(out) == 3
by_site = {r["site"]: r for r in out}
assert by_site["alpha"]["exists"] is True
assert by_site["alpha"]["status"] == 200
# El sitio que fallo queda con status/exists None.
assert by_site["beta"]["exists"] is None
assert by_site["beta"]["status"] is None
assert by_site["gamma"]["exists"] is True
def test_estructura_y_url_formateada(monkeypatch):
"""Cada resultado lleva las claves esperadas y la URL con el username."""
status_map = {
"https://alpha.test/neo": 200,
"https://beta.test/neo": 404,
"https://gamma.test/neo": 200,
}
monkeypatch.setattr(requests, "get", _make_get(status_map))
out = enumerate_username_sites("neo", sites=SITES)
for r in out:
assert set(r.keys()) == {"site", "url", "exists", "status"}
assert "neo" in r["url"]
def test_lista_por_defecto_tiene_doce_sitios():
"""La lista DEFAULT_SITES cubre ~12 sitios publicos."""
assert len(mod.DEFAULT_SITES) == 12
sites = {s["site"] for s in mod.DEFAULT_SITES}
assert {"github", "instagram", "reddit", "telegram"} <= sites
@@ -0,0 +1,66 @@
---
name: extract_exif_metadata
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def extract_exif_metadata(image_path: str) -> dict"
description: "Lee los metadatos EXIF de una imagen con Pillow y los devuelve normalizados (fecha, camara, software, GPS en grados decimales) mas el volcado completo de tags en `raw`. OSINT pasiva sobre documentos propios: revela cuando, con que dispositivo y donde se tomo una foto."
tags: [osint-passive, exif, metadata, image, gps, forensics, pillow, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [PIL]
params:
- name: image_path
desc: "ruta al archivo de imagen en disco (JPEG, PNG, TIFF, ...)"
output: "dict con datetime, camera_make, camera_model, software, gps_lat, gps_lon (grados decimales o None) y raw (dict con todos los tags EXIF legibles por nombre). Si la imagen no tiene EXIF, los campos van a None y raw={}."
tested: true
tests:
- "imagen sin EXIF (PNG) devuelve campos None y raw vacio"
- "imagen con EXIF devuelve camara, software y fecha"
- "GPSInfo DMS se convierte a grados decimales con signo por hemisferio"
test_file_path: "python/functions/cybersecurity/extract_exif_metadata_test.py"
file_path: "python/functions/cybersecurity/extract_exif_metadata.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from cybersecurity.extract_exif_metadata import extract_exif_metadata
meta = extract_exif_metadata(
"/home/enmanuel/Obsidian/osint/attachments/personas/objetivo_01.jpg"
)
print(meta["datetime"]) # '2024:08:12 19:43:07' (cuando se tomo)
print(meta["camera_model"]) # 'iPhone 13 Pro' (con que dispositivo)
print(meta["gps_lat"], meta["gps_lon"]) # 40.4168 -3.7038 (donde) o None, None
```
## Cuando usarla
Cuando recolectes inteligencia pasiva sobre una imagen propia o de un objetivo
y necesites saber cuando, con que dispositivo y desde donde se capturo, antes de
publicarla o compartirla. Usala tambien para auditar tus propios documentos y
detectar fugas de metadatos (geolocalizacion, modelo de telefono, software de
edicion) antes de subirlos a un sitio publico.
## Gotchas
- Funcion impura: abre el archivo del disco. Lanza si la ruta no existe o no es
una imagen valida (Pillow `UnidentifiedImageError` / `FileNotFoundError`).
- JPEG y HEIC de moviles suelen traer GPS embebido; PNG normalmente no lleva
EXIF y devolvera todos los campos en None con `raw={}`.
- El GPS revela la ubicacion fisica donde se tomo la foto — dato sensible. No
lo loguees ni lo compartas sin consentimiento.
- `DateTimeOriginal` vive en el sub-IFD EXIF, no en el IFD raiz; algunas
imagenes solo tienen `DateTime` (fecha de modificacion del archivo), que se
usa como fallback.
- Las coordenadas se convierten de DMS (grados/minutos/segundos) a grados
decimales y se les aplica el signo segun `GPSLatitudeRef`/`GPSLongitudeRef`
(S y W => negativo).
@@ -0,0 +1,99 @@
"""Extrae metadatos EXIF de una imagen (OSINT pasiva sobre documentos propios)."""
from PIL import ExifTags, Image
# Mapa inverso nombre -> id no hace falta: usamos ExifTags.TAGS (id -> nombre).
_GPS_TAGS = ExifTags.GPSTAGS # id -> nombre para el sub-IFD de GPS.
def _to_degrees(value) -> float | None:
"""Convierte una coordenada GPS en formato DMS (grados, minutos, segundos) a grados decimales.
Pillow devuelve cada componente como un IFDRational o una tupla (num, den).
"""
try:
d, m, s = value
return float(d) + float(m) / 60.0 + float(s) / 3600.0
except (TypeError, ValueError, ZeroDivisionError):
return None
def _extract_gps(gps_info: dict) -> tuple[float | None, float | None]:
"""Devuelve (lat, lon) en grados decimales desde el sub-IFD GPSInfo, o (None, None)."""
if not gps_info:
return None, None
named = {_GPS_TAGS.get(k, k): v for k, v in gps_info.items()}
lat = _to_degrees(named.get("GPSLatitude")) if "GPSLatitude" in named else None
lon = _to_degrees(named.get("GPSLongitude")) if "GPSLongitude" in named else None
if lat is not None and str(named.get("GPSLatitudeRef", "N")).upper() == "S":
lat = -lat
if lon is not None and str(named.get("GPSLongitudeRef", "E")).upper() == "W":
lon = -lon
return lat, lon
def extract_exif_metadata(image_path: str) -> dict:
"""Lee los metadatos EXIF de una imagen y los devuelve normalizados.
Abre la imagen con Pillow y extrae los tags EXIF. Normaliza los campos
mas relevantes para OSINT (fecha, camara, software, GPS) y adjunta el
diccionario completo de tags legibles por nombre en `raw`.
Args:
image_path: ruta al archivo de imagen (JPEG, PNG, TIFF, ...).
Returns:
dict con las claves: datetime, camera_make, camera_model, software,
gps_lat, gps_lon (grados decimales o None) y raw (dict tag->valor).
Si la imagen no tiene EXIF, los campos van a None y raw queda {}.
"""
result = {
"datetime": None,
"camera_make": None,
"camera_model": None,
"software": None,
"gps_lat": None,
"gps_lon": None,
"raw": {},
}
with Image.open(image_path) as img:
exif = img.getexif()
if not exif:
return result
# Tags de nivel raiz por nombre.
raw = {ExifTags.TAGS.get(tag_id, tag_id): value for tag_id, value in exif.items()}
# Sub-IFD EXIF (DateTimeOriginal vive aqui, no en el IFD raiz).
try:
exif_ifd = exif.get_ifd(ExifTags.IFD.Exif)
except (AttributeError, KeyError, ValueError):
exif_ifd = {}
for tag_id, value in exif_ifd.items():
raw[ExifTags.TAGS.get(tag_id, tag_id)] = value
# GPS IFD.
try:
gps_ifd = exif.get_ifd(ExifTags.IFD.GPSInfo)
except (AttributeError, KeyError, ValueError):
gps_ifd = {}
if gps_ifd:
raw["GPSInfo"] = {_GPS_TAGS.get(k, k): v for k, v in gps_ifd.items()}
result["raw"] = raw
result["datetime"] = raw.get("DateTimeOriginal") or raw.get("DateTime")
result["camera_make"] = raw.get("Make")
result["camera_model"] = raw.get("Model")
result["software"] = raw.get("Software")
lat, lon = _extract_gps(gps_ifd)
result["gps_lat"] = lat
result["gps_lon"] = lon
return result
@@ -0,0 +1,110 @@
"""Tests para extract_exif_metadata."""
import os
import sys
from PIL import Image
from PIL.ExifTags import Base, GPS, IFD
from PIL.TiffImagePlugin import IFDRational
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from cybersecurity.extract_exif_metadata import extract_exif_metadata
def _make_png_without_exif(path: str) -> None:
"""Crea un PNG pequeño sin EXIF."""
Image.new("RGB", (4, 4), (10, 20, 30)).save(path, format="PNG")
def _make_jpeg_with_exif(path: str) -> None:
"""Crea un JPEG pequeño con EXIF de camara/software/fecha."""
img = Image.new("RGB", (4, 4), (200, 100, 50))
exif = img.getexif()
exif[Base.Make.value] = "TestCam"
exif[Base.Model.value] = "Model X"
exif[Base.Software.value] = "PyTestRig 1.0"
exif[Base.DateTime.value] = "2024:08:12 19:43:07"
# DateTimeOriginal vive en el sub-IFD EXIF.
exif_ifd = exif.get_ifd(IFD.Exif)
exif_ifd[Base.DateTimeOriginal.value] = "2024:08:12 19:43:07"
img.save(path, format="JPEG", exif=exif)
def _make_jpeg_with_gps(path: str) -> None:
"""Crea un JPEG con GPSInfo en DMS, hemisferio sur y oeste."""
img = Image.new("RGB", (4, 4), (0, 128, 64))
exif = img.getexif()
gps_ifd = exif.get_ifd(IFD.GPSInfo)
# 40 deg, 25 min, 0.48 seg => 40.41680 grados.
gps_ifd[GPS.GPSLatitude.value] = (
IFDRational(40, 1),
IFDRational(25, 1),
IFDRational(48, 100),
)
gps_ifd[GPS.GPSLatitudeRef.value] = "S" # hemisferio sur => negativo
# 3 deg, 42 min, 13.68 seg => 3.7038 grados.
gps_ifd[GPS.GPSLongitude.value] = (
IFDRational(3, 1),
IFDRational(42, 1),
IFDRational(1368, 100),
)
gps_ifd[GPS.GPSLongitudeRef.value] = "W" # oeste => negativo
img.save(path, format="JPEG", exif=exif)
def test_imagen_sin_exif_png_devuelve_none(tmp_path):
"""imagen sin EXIF (PNG) devuelve campos None y raw vacio."""
p = str(tmp_path / "plain.png")
_make_png_without_exif(p)
meta = extract_exif_metadata(p)
assert meta["datetime"] is None
assert meta["camera_make"] is None
assert meta["camera_model"] is None
assert meta["software"] is None
assert meta["gps_lat"] is None
assert meta["gps_lon"] is None
assert meta["raw"] == {}
def test_imagen_con_exif_devuelve_camara_software_fecha(tmp_path):
"""imagen con EXIF devuelve camara, software y fecha."""
p = str(tmp_path / "withexif.jpg")
_make_jpeg_with_exif(p)
meta = extract_exif_metadata(p)
assert meta["camera_make"] == "TestCam"
assert meta["camera_model"] == "Model X"
assert meta["software"] == "PyTestRig 1.0"
assert meta["datetime"] == "2024:08:12 19:43:07"
assert meta["raw"] # no vacio
def test_gps_dms_a_grados_decimales_con_signo(tmp_path):
"""GPSInfo DMS se convierte a grados decimales con signo por hemisferio."""
p = str(tmp_path / "withgps.jpg")
_make_jpeg_with_gps(p)
meta = extract_exif_metadata(p)
assert meta["gps_lat"] is not None
assert meta["gps_lon"] is not None
# Sur y Oeste => ambos negativos.
assert meta["gps_lat"] < 0
assert meta["gps_lon"] < 0
assert abs(meta["gps_lat"] - (-40.41680)) < 1e-4
assert abs(meta["gps_lon"] - (-3.7038)) < 1e-4
if __name__ == "__main__":
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as d:
test_imagen_sin_exif_png_devuelve_none(Path(d))
test_imagen_con_exif_devuelve_camara_software_fecha(Path(d))
test_gps_dms_a_grados_decimales_con_signo(Path(d))
print("Todos los tests pasaron.")
@@ -0,0 +1,66 @@
---
name: extract_pdf_metadata
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def extract_pdf_metadata(pdf_path: str) -> dict"
description: "Lee los metadatos del Document Info de un PDF con pypdf (titulo, autor, creador, productor, fechas, numero de paginas) mas el volcado completo en `raw`. OSINT pasiva sobre documentos propios: revela quien y con que software genero el documento. Tolerante a PDFs cifrados (no falla, rellena `error`)."
tags: [osint-passive, pdf, metadata, document, forensics, pypdf, extract, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [pypdf]
params:
- name: pdf_path
desc: "ruta al archivo PDF en disco"
output: "dict con title, author, creator, producer, creation_date, mod_date (ISO 8601 si parseables, sino valor crudo), num_pages, raw (todo el doc info) y error (None si todo fue bien, mensaje en caso contrario)."
tested: true
tests:
- "PDF con metadatos devuelve titulo, autor y num_pages"
- "PDF sin doc info devuelve campos None sin petar"
- "fechas parseables se devuelven en ISO 8601"
test_file_path: "python/functions/cybersecurity/extract_pdf_metadata_test.py"
file_path: "python/functions/cybersecurity/extract_pdf_metadata.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from cybersecurity.extract_pdf_metadata import extract_pdf_metadata
meta = extract_pdf_metadata(
"/home/enmanuel/Obsidian/osint/attachments/personas/cv_objetivo.pdf"
)
print(meta["author"]) # 'Enmanuel G.' (quien lo creo)
print(meta["producer"]) # 'Microsoft Word 2021' (con que software)
print(meta["creation_date"]) # '2024-03-11T10:22:00+01:00'
print(meta["num_pages"]) # 3
```
## Cuando usarla
Cuando recolectes inteligencia pasiva sobre un PDF propio o de un objetivo y
necesites saber quien lo creo, con que herramienta y cuando. Usala tambien para
auditar tus propios documentos antes de publicarlos: el campo `author` y
`producer` suelen filtrar el nombre real del usuario, la version del software y
la organizacion, datos que no quieres exponer.
## Gotchas
- Funcion impura: abre el archivo del disco. Captura la excepcion en lugar de
propagarla — si el PDF esta corrupto, cifrado o no es un PDF, devuelve el dict
con lo que pudo leer y un mensaje en `error` (no lanza).
- PDFs cifrados: intenta abrir con password vacio (caso comun de "restriccion
de copia"). Si requiere password real, `error` empieza por `encrypted:` y los
campos pueden quedar None.
- Muchas fechas de PDF vienen en formato `D:YYYYMMDDHHmmSS+ZZ`; se convierten a
ISO 8601 cuando pypdf las parsea, sino se devuelven crudas.
- `raw` serializa los valores a string para evitar tipos no JSON-friendly
(IndirectObject, ByteStringObject). Los campos normalizados conservan el
contenido textual.
@@ -0,0 +1,86 @@
"""Extrae metadatos de un PDF con pypdf (OSINT pasiva sobre documentos propios)."""
from pypdf import PdfReader
def _iso_or_raw(reader: PdfReader, getter_name: str, raw_value):
"""Devuelve una fecha en ISO 8601 si pypdf la sabe parsear, sino el valor crudo.
pypdf expone `creation_date`/`modification_date` (datetime) ademas del
string crudo `/CreationDate`/`/ModDate` en formato `D:YYYYMMDDHHmmSS`.
"""
try:
dt = getattr(reader.metadata, getter_name, None)
except Exception:
dt = None
if dt is not None:
try:
return dt.isoformat()
except Exception:
pass
return str(raw_value) if raw_value is not None else None
def extract_pdf_metadata(pdf_path: str) -> dict:
"""Lee los metadatos del Document Info de un PDF.
Abre el PDF con pypdf y extrae los campos estandar del diccionario de
informacion (titulo, autor, creador, productor, fechas) mas el numero de
paginas. Las fechas se devuelven en ISO 8601 cuando son parseables, en su
valor crudo en caso contrario. No falla si el PDF esta cifrado: captura la
excepcion, devuelve lo que pueda y rellena el campo `error`.
Args:
pdf_path: ruta al archivo PDF en disco.
Returns:
dict con las claves: title, author, creator, producer, creation_date,
mod_date, num_pages, raw (dict con todo el doc info) y error (None si
todo fue bien, mensaje de error en caso contrario).
"""
result = {
"title": None,
"author": None,
"creator": None,
"producer": None,
"creation_date": None,
"mod_date": None,
"num_pages": None,
"raw": {},
"error": None,
}
try:
reader = PdfReader(pdf_path)
# PDF cifrado: intentar abrir con password vacio (caso comun).
if getattr(reader, "is_encrypted", False):
try:
reader.decrypt("")
except Exception as exc:
result["error"] = f"encrypted: {exc}"
try:
result["num_pages"] = len(reader.pages)
except Exception as exc:
result["error"] = result["error"] or f"pages: {exc}"
meta = reader.metadata
if meta is not None:
result["title"] = str(meta.title) if meta.title is not None else None
result["author"] = str(meta.author) if meta.author is not None else None
result["creator"] = str(meta.creator) if meta.creator is not None else None
result["producer"] = (
str(meta.producer) if meta.producer is not None else None
)
result["creation_date"] = _iso_or_raw(
reader, "creation_date", meta.get("/CreationDate")
)
result["mod_date"] = _iso_or_raw(
reader, "modification_date", meta.get("/ModDate")
)
result["raw"] = {str(k): str(v) for k, v in meta.items()}
except Exception as exc:
result["error"] = result["error"] or str(exc)
return result
@@ -0,0 +1,92 @@
"""Tests para extract_pdf_metadata."""
import os
import sys
from pypdf import PdfWriter
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from cybersecurity.extract_pdf_metadata import extract_pdf_metadata
def _make_pdf_with_metadata(path: str) -> None:
"""Crea un PDF de 2 paginas con doc info (titulo, autor, fechas)."""
writer = PdfWriter()
writer.add_blank_page(width=200, height=200)
writer.add_blank_page(width=200, height=200)
writer.add_metadata(
{
"/Title": "Documento OSINT",
"/Author": "Enmanuel G.",
"/Creator": "PyTestRig",
"/Producer": "pypdf",
"/CreationDate": "D:20240311102200+01'00'",
"/ModDate": "D:20240312113000+01'00'",
}
)
with open(path, "wb") as fh:
writer.write(fh)
def _make_pdf_without_metadata(path: str) -> None:
"""Crea un PDF de 1 pagina sin doc info."""
writer = PdfWriter()
writer.add_blank_page(width=100, height=100)
with open(path, "wb") as fh:
writer.write(fh)
def test_pdf_con_metadatos_devuelve_titulo_autor_paginas(tmp_path):
"""PDF con metadatos devuelve titulo, autor y num_pages."""
p = str(tmp_path / "withmeta.pdf")
_make_pdf_with_metadata(p)
meta = extract_pdf_metadata(p)
assert meta["error"] is None
assert meta["title"] == "Documento OSINT"
assert meta["author"] == "Enmanuel G."
assert meta["creator"] == "PyTestRig"
assert meta["producer"] == "pypdf"
assert meta["num_pages"] == 2
assert meta["raw"] # no vacio
def test_pdf_sin_doc_info_devuelve_none_sin_petar(tmp_path):
"""PDF sin doc info devuelve campos None sin petar."""
p = str(tmp_path / "nometa.pdf")
_make_pdf_without_metadata(p)
meta = extract_pdf_metadata(p)
assert meta["error"] is None
assert meta["num_pages"] == 1
assert meta["title"] is None
assert meta["author"] is None
def test_fechas_parseables_en_iso_8601(tmp_path):
"""fechas parseables se devuelven en ISO 8601."""
p = str(tmp_path / "dates.pdf")
_make_pdf_with_metadata(p)
meta = extract_pdf_metadata(p)
# pypdf parsea D:YYYYMMDDHHmmSS a datetime; isoformat() lleva 'T'.
assert meta["creation_date"] is not None
assert "2024-03-11" in meta["creation_date"]
assert "T" in meta["creation_date"]
assert meta["mod_date"] is not None
assert "2024-03-12" in meta["mod_date"]
if __name__ == "__main__":
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as d:
test_pdf_con_metadatos_devuelve_titulo_autor_paginas(Path(d))
test_pdf_sin_doc_info_devuelve_none_sin_petar(Path(d))
test_fechas_parseables_en_iso_8601(Path(d))
print("Todos los tests pasaron.")
@@ -0,0 +1,63 @@
---
name: guess_email_formats
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def guess_email_formats(nombre: str, apellidos: str, dominio: str) -> list"
description: "Genera candidatos de email comunes (nombre.apellido, n.apellido, apellido.nombre, inicial+apellido, variantes con dos apellidos, etc.) a partir de nombre, apellidos y dominio. Normaliza acentos y ñ a ASCII en minusculas y deduplica preservando el orden. OSINT pasivo puro, sin red."
tags: [osint-passive, email, enumeration, recon, identity, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [unicodedata]
params:
- name: nombre
desc: "Nombre de pila. Si tiene varios tokens se usa el primero como nombre principal."
- name: apellidos
desc: "Uno o dos apellidos separados por espacio. Con dos apellidos se generan variantes que los unen."
- name: dominio
desc: "Dominio de correo sin arroba (ej. 'empresa.com'). Si viene con '@' delante se limpia."
output: "Lista de strings '<local>@<dominio>' con los candidatos de email en orden de generacion, sin duplicados."
tested: true
tests:
- "test_nombre_simple_un_apellido"
- "test_acentos_y_enie_normalizados"
- "test_dos_apellidos_genera_variantes_unidas"
- "test_dedup_preserva_orden"
- "test_dominio_con_arroba_se_limpia"
test_file_path: "python/functions/cybersecurity/guess_email_formats_test.py"
file_path: "python/functions/cybersecurity/guess_email_formats.py"
---
## Ejemplo
```python
guess_email_formats("José", "García López", "empresa.com")
# ['jose@empresa.com',
# 'jose.garcia@empresa.com',
# 'josegarcia@empresa.com',
# 'j.garcia@empresa.com',
# 'jgarcia@empresa.com',
# 'jose_garcia@empresa.com',
# 'garcia.jose@empresa.com',
# 'joseg@empresa.com',
# 'jose.garcialopez@empresa.com',
# 'josegarcialopez@empresa.com',
# 'jose.lopez@empresa.com',
# 'jgarcialopez@empresa.com']
```
## Cuando usarla
Usala al arrancar una investigacion de identidad cuando conoces nombre + apellidos + dominio de una organizacion y quieres una lista de direcciones probables para verificar despues (MX, catch-all, validacion SMTP, breach lookup). Es el primer paso antes de cualquier comprobacion activa.
## Gotchas
- Funcion pura: NO valida que el email exista ni hace ninguna comprobacion de red. Solo genera candidatos sintacticos.
- La lista de patrones es heuristica (los formatos mas comunes), no exhaustiva: organizaciones con esquemas propios (ej. id numerico) no quedaran cubiertas.
- La normalizacion translitera acentos y ñ a ASCII y elimina cualquier caracter no alfanumerico del local part; nombres con guiones o apostrofes pierden esos separadores.
- Solo usa el primer token del nombre como nombre principal; nombres compuestos (ej. "Maria Jose") no generan variantes con el segundo nombre.
@@ -0,0 +1,84 @@
"""Genera candidatos de email a partir de nombre, apellidos y dominio.
Funcion pura de OSINT pasivo: produce los patrones de direccion de correo
mas habituales en organizaciones, sin tocar la red. Util como punto de
partida para verificacion posterior (MX, catch-all, validacion SMTP, etc.).
"""
import unicodedata
def _ascii_lower(text: str) -> str:
"""Normaliza un texto a ASCII en minusculas.
Translitera acentos y caracteres latinos (a, e, n, ...) a su forma
ASCII, pasa a minusculas y elimina cualquier caracter que no sea
alfanumerico. No introduce separadores (a diferencia de un slug).
Args:
text: texto de entrada (puede contener acentos, ñ, mayusculas).
Returns:
cadena ASCII en minusculas formada solo por [a-z0-9].
"""
# NFKD descompone los caracteres acentuados en base + diacritico.
decomposed = unicodedata.normalize("NFKD", text)
stripped = "".join(c for c in decomposed if not unicodedata.combining(c))
lowered = stripped.lower()
return "".join(c for c in lowered if c.isalnum())
def guess_email_formats(nombre: str, apellidos: str, dominio: str) -> list:
"""Genera candidatos de email comunes a partir de identidad y dominio.
Combina el nombre y los apellidos en los patrones de direccion mas
frecuentes (nombre.apellido, inicial+apellido, apellido.nombre, etc.),
normalizando acentos/ñ a ASCII y minusculas. Si hay dos apellidos
tambien genera variantes que los unen. Deduplica preservando el orden
de aparicion.
Args:
nombre: nombre de pila (puede incluir varios tokens separados por
espacio; se usa el primero como nombre principal).
apellidos: uno o dos apellidos separados por espacio.
dominio: dominio de correo sin arroba (ej. "empresa.com").
Returns:
lista de strings "<local>@<dominio>" con los candidatos, en el
orden en que se generan los patrones y sin duplicados.
"""
n = _ascii_lower(nombre.split()[0]) if nombre.split() else ""
apellido_tokens = [_ascii_lower(a) for a in apellidos.split() if _ascii_lower(a)]
a1 = apellido_tokens[0] if apellido_tokens else ""
a2 = apellido_tokens[1] if len(apellido_tokens) > 1 else ""
dom = dominio.strip().lstrip("@").lower()
ni = n[0] if n else ""
a1i = a1[0] if a1 else ""
apellido_full = "".join(apellido_tokens) # apellido1apellido2 unidos
locals_raw = [
n, # nombre
f"{n}.{a1}" if n and a1 else "", # nombre.apellido
f"{n}{a1}" if n and a1 else "", # nombreapellido
f"{ni}.{a1}" if ni and a1 else "", # n.apellido
f"{ni}{a1}" if ni and a1 else "", # napellido
f"{n}_{a1}" if n and a1 else "", # nombre_apellido
f"{a1}.{n}" if a1 and n else "", # apellido.nombre
f"{n}{a1i}" if n and a1i else "", # nombre+inicial_apellido
f"{n}.{apellido_full}" if n and a2 else "", # nombre.apellido1apellido2
f"{n}{apellido_full}" if n and a2 else "", # nombreapellido1apellido2
f"{n}.{a2}" if n and a2 else "", # nombre.apellido2
f"{ni}{apellido_full}" if ni and a2 else "", # n+apellidos unidos
]
seen = set()
out = []
for local in locals_raw:
if not local:
continue
email = f"{local}@{dom}"
if email not in seen:
seen.add(email)
out.append(email)
return out
@@ -0,0 +1,53 @@
"""Tests para guess_email_formats."""
from guess_email_formats import guess_email_formats
def test_nombre_simple_un_apellido():
"""Nombre simple con un apellido produce los patrones base."""
result = guess_email_formats("Juan", "Perez", "empresa.com")
assert "juan.perez@empresa.com" in result
assert "juanperez@empresa.com" in result
assert "j.perez@empresa.com" in result
assert "jperez@empresa.com" in result
assert "juan_perez@empresa.com" in result
assert "perez.juan@empresa.com" in result
assert "juan@empresa.com" in result
# Sin segundo apellido no hay variantes unidas.
assert all("@empresa.com" in e for e in result)
def test_acentos_y_enie_normalizados():
"""Acentos y ñ se transliteran a ASCII en minusculas."""
result = guess_email_formats("José", "Muñoz", "acme.org")
assert "jose.munoz@acme.org" in result
assert "j.munoz@acme.org" in result
# No debe aparecer ningun caracter no ASCII.
for email in result:
assert email == email.encode("ascii", "ignore").decode("ascii")
def test_dos_apellidos_genera_variantes_unidas():
"""Dos apellidos producen variantes que los unen."""
result = guess_email_formats("Maria", "Garcia Lopez", "uni.edu")
assert "maria.garcialopez@uni.edu" in result
assert "mariagarcialopez@uni.edu" in result
assert "maria.lopez@uni.edu" in result
assert "mgarcialopez@uni.edu" in result
# El patron de primer apellido sigue presente.
assert "maria.garcia@uni.edu" in result
def test_dedup_preserva_orden():
"""Los candidatos no se repiten y mantienen el orden de generacion."""
result = guess_email_formats("Ana", "Ruiz", "x.com")
assert len(result) == len(set(result))
# nombre.apellido aparece antes que apellido.nombre.
assert result.index("ana.ruiz@x.com") < result.index("ruiz.ana@x.com")
def test_dominio_con_arroba_se_limpia():
"""Un dominio prefijado con @ se normaliza sin duplicar la arroba."""
result = guess_email_formats("Luis", "Soto", "@corp.io")
assert "luis.soto@corp.io" in result
assert all(e.count("@") == 1 for e in result)
@@ -0,0 +1,57 @@
---
name: scan_ficha_attachments_metadata
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def scan_ficha_attachments_metadata(attachments_dir: str) -> dict"
description: "Orquestador OSINT pasivo: recorre un directorio de attachments de una ficha (imagenes y PDFs), extrae sus metadatos componiendo extract_exif_metadata (imagenes .jpg/.jpeg/.png/.heic) y extract_pdf_metadata (.pdf), y agrega los puntos GPS y las fechas encontradas. Devuelve files + gps_points + dates + summary."
tags: [osint-enrich, osint-passive, cybersecurity, metadata, exif, pdf]
uses_functions: [extract_exif_metadata_py_cybersecurity, extract_pdf_metadata_py_cybersecurity]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [os]
params:
- name: attachments_dir
desc: "ruta a un directorio de attachments, p.ej. /home/enmanuel/Obsidian/osint/attachments/personas/<slug>/. Se recorre recursivamente. Si no es un directorio existente lanza NotADirectoryError"
output: "dict con files (lista de {path, type, metadata} por archivo procesado: type es 'image' o 'pdf'), gps_points (lista de {file, lat, lon} agregando las coordenadas EXIF), dates (lista de fechas str de EXIF y PDF) y summary ({n_files, n_images, n_pdfs, n_gps_points, n_dates, errors}). Los archivos cuya extraccion falla quedan con metadata={'error': ...} y suman a summary.errors"
tested: true
tests: ["test_golden_agrega_gps_y_fechas_de_imagen_y_pdf", "test_directorio_inexistente_lanza", "test_ignora_extensiones_no_soportadas", "test_error_en_archivo_se_captura_en_summary"]
test_file_path: "python/functions/cybersecurity/scan_ficha_attachments_metadata_test.py"
file_path: "python/functions/cybersecurity/scan_ficha_attachments_metadata.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from cybersecurity import scan_ficha_attachments_metadata
# Escanea los attachments de una persona en el vault OSINT.
res = scan_ficha_attachments_metadata(
"/home/enmanuel/Obsidian/osint/attachments/personas/juan-perez/"
)
print(res["summary"]) # {'n_files': 7, 'n_images': 5, 'n_pdfs': 2, 'n_gps_points': 2, 'n_dates': 4, 'errors': 0}
for p in res["gps_points"]:
print(p["file"], p["lat"], p["lon"]) # coordenadas extraidas de las fotos
print(res["dates"]) # fechas EXIF + fechas de creacion/modificacion de los PDFs
```
## Cuando usarla
- Cuando montas la ficha OSINT de una persona u organizacion y quieres saber de un vistazo que huella de metadatos dejan los documentos/fotos que ya tienes guardados (donde y cuando se hicieron).
- Antes de publicar/compartir attachments propios: para auditar que GPS y fechas se filtrarian.
- Como paso de enriquecimiento dentro de un pipeline de investigacion, justo despues de descargar los attachments al directorio de la ficha.
## Gotchas
- **Uso solo para investigacion autorizada.** Extraer metadatos de archivos ajenos sin permiso puede ser ilegal segun jurisdiccion. Limitate a tus propios documentos o a material que estes autorizado a analizar.
- Funcion IMPURA: hace I/O sobre el sistema de archivos (recorrido recursivo con `os.walk`). Si `attachments_dir` no existe lanza `NotADirectoryError`.
- La extraccion por archivo se aisla con try/except: un archivo corrupto no aborta el escaneo, queda con `metadata={"error": ...}` y suma a `summary.errors`.
- Solo procesa imagenes .jpg/.jpeg/.png/.heic y PDFs .pdf; el resto se ignora silenciosamente. El soporte real de HEIC/EXIF depende de lo que `extract_exif_metadata` (Pillow) pueda abrir en el sistema.
- Las fechas se devuelven tal cual las reportan EXIF/PDF (formatos heterogeneos, sin normalizar a ISO).
@@ -0,0 +1,107 @@
"""Orquestador OSINT pasivo: escanea metadatos de los attachments de una ficha.
Recorre un directorio de attachments (imagenes y PDFs) y extrae sus metadatos
componiendo las funciones atomicas del registro (`extract_exif_metadata`,
`extract_pdf_metadata`). Agrega los puntos GPS y las fechas encontradas para
dar una vista rapida de la huella de metadatos de una persona/organizacion.
Funcion IMPURA: hace I/O sobre el sistema de archivos.
"""
import os
from cybersecurity import extract_exif_metadata, extract_pdf_metadata
_IMAGE_EXTS = (".jpg", ".jpeg", ".png", ".heic")
_PDF_EXTS = (".pdf",)
def _iter_files(attachments_dir: str):
"""Recorre recursivamente el directorio devolviendo rutas de archivo ordenadas."""
for root, _dirs, files in os.walk(attachments_dir):
for name in sorted(files):
yield os.path.join(root, name)
def scan_ficha_attachments_metadata(attachments_dir: str) -> dict:
"""Escanea un directorio de attachments y agrega los metadatos extraidos.
Aplica `extract_exif_metadata` a las imagenes (.jpg/.jpeg/.png/.heic) y
`extract_pdf_metadata` a los PDFs (.pdf). Agrega los puntos GPS de las
imagenes y todas las fechas detectadas (EXIF + PDF).
Args:
attachments_dir: ruta a un directorio de attachments, p.ej.
`/home/enmanuel/Obsidian/osint/attachments/personas/<slug>/`.
Se recorre recursivamente.
Returns:
dict con las claves:
- files: lista de {path, type, metadata} por archivo procesado.
- gps_points: lista de {file, lat, lon} con las coordenadas EXIF.
- dates: lista de fechas (str) encontradas en EXIF y PDF.
- summary: {n_files, n_images, n_pdfs, n_gps_points, n_dates,
errors} con conteos agregados.
Raises:
NotADirectoryError: si `attachments_dir` no es un directorio existente.
"""
if not os.path.isdir(attachments_dir):
raise NotADirectoryError(f"no es un directorio: {attachments_dir}")
files: list[dict] = []
gps_points: list[dict] = []
dates: list[str] = []
n_images = 0
n_pdfs = 0
errors = 0
for path in _iter_files(attachments_dir):
ext = os.path.splitext(path)[1].lower()
if ext in _IMAGE_EXTS:
ftype = "image"
elif ext in _PDF_EXTS:
ftype = "pdf"
else:
continue
try:
if ftype == "image":
metadata = extract_exif_metadata(path)
n_images += 1
lat = metadata.get("gps_lat")
lon = metadata.get("gps_lon")
if lat is not None and lon is not None:
gps_points.append({"file": path, "lat": lat, "lon": lon})
dt = metadata.get("datetime")
if dt:
dates.append(str(dt))
else:
metadata = extract_pdf_metadata(path)
n_pdfs += 1
for key in ("creation_date", "modification_date", "creationDate", "modDate"):
val = metadata.get(key)
if val:
dates.append(str(val))
except Exception as exc: # noqa: BLE001 - I/O sobre archivos heterogeneos
errors += 1
metadata = {"error": f"{type(exc).__name__}: {exc}"}
files.append({"path": path, "type": ftype, "metadata": metadata})
summary = {
"n_files": len(files),
"n_images": n_images,
"n_pdfs": n_pdfs,
"n_gps_points": len(gps_points),
"n_dates": len(dates),
"errors": errors,
}
return {
"files": files,
"gps_points": gps_points,
"dates": dates,
"summary": summary,
}
@@ -0,0 +1,113 @@
"""Tests para scan_ficha_attachments_metadata.
Las funciones compuestas (extract_exif_metadata / extract_pdf_metadata) se
monkeypatchean para no depender ni de archivos reales ni de Pillow. Solo se
verifica la orquestacion y la agregacion (GPS, fechas, summary, manejo de
errores y filtrado de extensiones).
"""
import importlib
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from cybersecurity import scan_ficha_attachments_metadata
# El paquete re-exporta la funcion bajo el mismo nombre que el submodulo, asi
# que `cybersecurity.scan_ficha_attachments_metadata` resuelve a la funcion.
# Para parchear los globals del modulo orquestador lo tomamos via importlib.
mod = importlib.import_module("cybersecurity.scan_ficha_attachments_metadata")
def _make_tree(tmp_path, names):
"""Crea archivos vacios en tmp_path; devuelve el directorio."""
d = tmp_path / "ficha"
d.mkdir()
for name in names:
(d / name).write_bytes(b"")
return str(d)
def test_golden_agrega_gps_y_fechas_de_imagen_y_pdf(tmp_path, monkeypatch):
attachments = _make_tree(tmp_path, ["foto1.jpg", "foto2.png", "doc.pdf"])
exif_by_name = {
"foto1.jpg": {"datetime": "2024:01:02 10:11:12", "gps_lat": 40.4, "gps_lon": -3.7},
"foto2.png": {"datetime": None, "gps_lat": None, "gps_lon": None},
}
def fake_exif(path):
return exif_by_name[os.path.basename(path)]
def fake_pdf(path):
return {"creation_date": "D:20230101000000", "modification_date": "D:20230202000000"}
monkeypatch.setattr(mod, "extract_exif_metadata", fake_exif)
monkeypatch.setattr(mod, "extract_pdf_metadata", fake_pdf)
res = scan_ficha_attachments_metadata(attachments)
# 3 archivos procesados (2 imagenes + 1 pdf).
assert res["summary"] == {
"n_files": 3,
"n_images": 2,
"n_pdfs": 1,
"n_gps_points": 1,
"n_dates": 3, # 1 de la imagen + 2 del pdf
"errors": 0,
}
# GPS solo de foto1.
assert res["gps_points"] == [
{"file": os.path.join(attachments, "foto1.jpg"), "lat": 40.4, "lon": -3.7}
]
# Fechas agregadas de EXIF + PDF.
assert "2024:01:02 10:11:12" in res["dates"]
assert "D:20230101000000" in res["dates"]
assert "D:20230202000000" in res["dates"]
# files lleva path/type/metadata por cada uno.
types = {os.path.basename(f["path"]): f["type"] for f in res["files"]}
assert types == {"foto1.jpg": "image", "foto2.png": "image", "doc.pdf": "pdf"}
def test_directorio_inexistente_lanza(tmp_path):
with pytest.raises(NotADirectoryError):
scan_ficha_attachments_metadata(str(tmp_path / "no-existe"))
def test_ignora_extensiones_no_soportadas(tmp_path, monkeypatch):
attachments = _make_tree(tmp_path, ["nota.txt", "video.mp4", "foto.jpg"])
monkeypatch.setattr(mod, "extract_exif_metadata", lambda p: {"gps_lat": None, "gps_lon": None, "datetime": None})
monkeypatch.setattr(mod, "extract_pdf_metadata", lambda p: {})
res = scan_ficha_attachments_metadata(attachments)
# Solo la imagen cuenta; txt y mp4 se ignoran.
assert res["summary"]["n_files"] == 1
assert res["summary"]["n_images"] == 1
assert res["summary"]["n_pdfs"] == 0
assert [os.path.basename(f["path"]) for f in res["files"]] == ["foto.jpg"]
def test_error_en_archivo_se_captura_en_summary(tmp_path, monkeypatch):
attachments = _make_tree(tmp_path, ["roto.jpg", "ok.pdf"])
def boom(path):
raise OSError("imagen corrupta")
monkeypatch.setattr(mod, "extract_exif_metadata", boom)
monkeypatch.setattr(mod, "extract_pdf_metadata", lambda p: {"creation_date": "D:20200101"})
res = scan_ficha_attachments_metadata(attachments)
assert res["summary"]["errors"] == 1
assert res["summary"]["n_files"] == 2 # ambos archivos quedan registrados
roto = next(f for f in res["files"] if os.path.basename(f["path"]) == "roto.jpg")
assert "error" in roto["metadata"]
assert "imagen corrupta" in roto["metadata"]["error"]
@@ -0,0 +1,68 @@
---
name: whois_lookup
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: impure
signature: "def whois_lookup(dominio: str, timeout_s: float = 15.0) -> dict"
description: "Recoleccion OSINT pasiva de datos de registro de dominio via RDAP (reemplazo moderno de WHOIS sobre HTTP/JSON). Consulta https://rdap.org/domain/<dominio> con http_get_json y normaliza registrar, fechas de creacion/expiracion/ultimo cambio, nameservers, estados y entidades. Devuelve {found: False} si el dominio no existe (404)."
tags: [osint-passive, whois, rdap, recon, cybersecurity]
params:
- name: dominio
desc: "Dominio a consultar, ej. organic-machine.com. Vacio lanza RuntimeError."
- name: timeout_s
desc: "Segundos maximo de espera de la peticion HTTP a rdap.org (default 15.0)."
output: "dict normalizado con found (bool), registrar, creation_date, expiration_date, last_changed, nameservers (lista), status (lista), entities (lista de {handle, roles}) y raw (RDAP completo). Si el dominio no existe (HTTP 404) devuelve {found: False}."
uses_functions: ["http_get_json_py_infra"]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_normaliza_respuesta_rdap", "test_dominio_no_encontrado_404", "test_otro_error_http_se_propaga", "test_sin_registrar_ni_fechas", "test_dominio_vacio_lanza_error"]
test_file_path: "python/functions/cybersecurity/whois_lookup_test.py"
file_path: "python/functions/cybersecurity/whois_lookup.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from cybersecurity import whois_lookup
info = whois_lookup("organic-machine.com")
if info["found"]:
print(info["registrar"]) # 'Example Registrar Inc.'
print(info["creation_date"]) # '2020-01-15T10:00:00Z'
print(info["expiration_date"]) # '2027-01-15T10:00:00Z'
print(info["nameservers"]) # ['ns1.example.net', 'ns2.example.net']
print(info["status"]) # ['client transfer prohibited']
else:
print("dominio no registrado")
```
## Cuando usarla
Usala para obtener metadatos de registro de un dominio sin depender del CLI
`whois` (no instalado): edad del dominio, fecha de expiracion (dominios a
punto de caducar), registrar y nameservers autoritativos. Util en perfilado
pasivo, deteccion de dominios recien creados (typosquatting/phishing) y
validacion de propiedad.
## Gotchas
- RDAP no esta uniformemente desplegado en todos los TLD: algunos devuelven
campos vacios o ni siquiera responden. Por eso los campos opcionales pueden
quedar `None` y `nameservers`/`status`/`entities` listas vacias.
- rdap.org actua como bootstrap y redirige al servidor RDAP autoritativo del
TLD; depende de su disponibilidad.
- El registrante (`entities` con rol distinto de `registrar`) suele estar
redactado por privacy/GDPR: casi siempre solo veras `handle` y `roles`, sin
datos personales.
- Un dominio no registrado devuelve `{"found": False}` (HTTP 404); cualquier
otro error HTTP (rate limit 429, 5xx) se propaga como `RuntimeError`.
- Las fechas se devuelven tal cual las da RDAP (ISO 8601 UTC), sin parsear a
objetos `datetime`.
@@ -0,0 +1,114 @@
"""Recoleccion OSINT pasiva de datos de registro de dominio via RDAP.
Funcion IMPURA: consulta el servicio RDAP publico (reemplazo moderno de
WHOIS, sobre HTTP/JSON) y normaliza la respuesta. Es OSINT pasivo: no toca
al dominio objetivo, solo el directorio RDAP publico.
"""
import os
import sys
sys.path.insert(
0, os.path.join(os.path.dirname(__file__), "..", "..", "functions")
)
from infra.http_get_json import http_get_json # noqa: E402
def _events_by_action(raw: dict) -> dict:
"""Indexa la lista RDAP ``events`` por ``eventAction`` -> ``eventDate``."""
out: dict = {}
for event in raw.get("events", []) or []:
action = event.get("eventAction")
date = event.get("eventDate")
if action and date:
out[action] = date
return out
def _extract_registrar(raw: dict) -> str | None:
"""Busca la entidad con rol ``registrar`` y devuelve su nombre vCard."""
for entity in raw.get("entities", []) or []:
roles = entity.get("roles", []) or []
if "registrar" not in roles:
continue
vcard = entity.get("vcardArray")
if isinstance(vcard, list) and len(vcard) == 2:
for field in vcard[1]:
if isinstance(field, list) and field and field[0] == "fn":
return field[3]
return entity.get("handle")
return None
def _extract_nameservers(raw: dict) -> list:
"""Extrae los ldhName de los nameservers RDAP, ordenados."""
servers = []
for ns in raw.get("nameservers", []) or []:
name = ns.get("ldhName")
if name:
servers.append(name.lower())
return sorted(set(servers))
def whois_lookup(dominio: str, timeout_s: float = 15.0) -> dict:
"""Consulta RDAP de un dominio y normaliza la informacion de registro.
Usa ``http_get_json`` del registry contra ``https://rdap.org/domain/<dominio>``
(rdap.org redirige al servidor RDAP autoritativo del TLD). Normaliza
registrar, fechas (creacion / expiracion / ultimo cambio), nameservers,
estados y entidades, e incluye la respuesta cruda en ``raw``.
Args:
dominio: Dominio a consultar (ej. ``"organic-machine.com"``).
timeout_s: Segundos maximo de espera de la peticion HTTP (default 15).
Returns:
Dict normalizado con claves: ``found`` (bool), ``registrar``,
``creation_date``, ``expiration_date``, ``last_changed``,
``nameservers`` (lista), ``status`` (lista), ``entities`` (lista de
roles/handles) y ``raw`` (respuesta RDAP completa). Si el dominio no
existe (HTTP 404) devuelve ``{"found": False}``.
Raises:
RuntimeError: Si el dominio esta vacio o la peticion falla por una
razon distinta de 404.
"""
if not dominio or not dominio.strip():
raise RuntimeError("whois_lookup: dominio vacio")
url = f"https://rdap.org/domain/{dominio.strip()}"
try:
raw = http_get_json(url, timeout=timeout_s)
except RuntimeError as e:
# http_get_json envuelve los HTTPError como "HTTP <code>".
if "HTTP 404" in str(e):
return {"found": False}
raise
if not isinstance(raw, dict):
raise RuntimeError(
f"whois_lookup: respuesta RDAP inesperada (tipo {type(raw).__name__})"
)
events = _events_by_action(raw)
entities = [
{
"handle": ent.get("handle"),
"roles": ent.get("roles", []) or [],
}
for ent in raw.get("entities", []) or []
]
return {
"found": True,
"registrar": _extract_registrar(raw),
"creation_date": events.get("registration"),
"expiration_date": events.get("expiration"),
"last_changed": events.get("last changed"),
"nameservers": _extract_nameservers(raw),
"status": raw.get("status", []) or [],
"entities": entities,
"raw": raw,
}
@@ -0,0 +1,109 @@
"""Tests para whois_lookup."""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
import whois_lookup as wl
from whois_lookup import whois_lookup
def _rdap_sample() -> dict:
return {
"ldhName": "organic-machine.com",
"status": ["client transfer prohibited"],
"events": [
{"eventAction": "registration", "eventDate": "2020-01-15T10:00:00Z"},
{"eventAction": "expiration", "eventDate": "2027-01-15T10:00:00Z"},
{"eventAction": "last changed", "eventDate": "2026-01-10T08:30:00Z"},
],
"nameservers": [
{"ldhName": "ns1.example.net"},
{"ldhName": "NS2.EXAMPLE.NET"},
],
"entities": [
{
"handle": "REG-123",
"roles": ["registrar"],
"vcardArray": [
"vcard",
[
["version", {}, "text", "4.0"],
["fn", {}, "text", "Example Registrar Inc."],
],
],
},
{"handle": "REGISTRANT-9", "roles": ["registrant"]},
],
}
def test_normaliza_respuesta_rdap(monkeypatch):
"""Extrae registrar, fechas, nameservers, status y entities."""
monkeypatch.setattr(wl, "http_get_json", lambda url, timeout=15.0: _rdap_sample())
result = whois_lookup("organic-machine.com")
assert result["found"] is True
assert result["registrar"] == "Example Registrar Inc."
assert result["creation_date"] == "2020-01-15T10:00:00Z"
assert result["expiration_date"] == "2027-01-15T10:00:00Z"
assert result["last_changed"] == "2026-01-10T08:30:00Z"
assert result["nameservers"] == ["ns1.example.net", "ns2.example.net"]
assert result["status"] == ["client transfer prohibited"]
assert {"handle": "REGISTRANT-9", "roles": ["registrant"]} in result["entities"]
assert result["raw"]["ldhName"] == "organic-machine.com"
def test_dominio_no_encontrado_404(monkeypatch):
"""Un HTTP 404 de http_get_json devuelve {'found': False}."""
def fake(url, timeout=15.0):
raise RuntimeError("http_get_json: HTTP 404 at 'rdap.org' — not found")
monkeypatch.setattr(wl, "http_get_json", fake)
result = whois_lookup("nope-no-existe-xyz.invalid")
assert result == {"found": False}
def test_otro_error_http_se_propaga(monkeypatch):
"""Un error HTTP distinto de 404 se propaga como RuntimeError."""
def fake(url, timeout=15.0):
raise RuntimeError("http_get_json: HTTP 500 at 'rdap.org' — boom")
monkeypatch.setattr(wl, "http_get_json", fake)
try:
whois_lookup("organic-machine.com")
assert False, "deberia haberse propagado el error 500"
except RuntimeError as e:
assert "HTTP 500" in str(e)
def test_sin_registrar_ni_fechas(monkeypatch):
"""RDAP minimo: campos opcionales quedan None / listas vacias."""
monkeypatch.setattr(
wl, "http_get_json", lambda url, timeout=15.0: {"ldhName": "x.com"}
)
result = whois_lookup("x.com")
assert result["found"] is True
assert result["registrar"] is None
assert result["creation_date"] is None
assert result["nameservers"] == []
assert result["status"] == []
assert result["entities"] == []
def test_dominio_vacio_lanza_error():
"""Dominio vacio lanza RuntimeError."""
try:
whois_lookup("")
assert False, "deberia haber lanzado RuntimeError"
except RuntimeError:
pass