Files
fn_registry/python/functions/pipelines/recon_osint.py
T
egutierrez 935008ec3f feat(recon): grupo de reconocimiento de red + servicios + fingerprint web
Añade el capability group `recon` (dominio cybersecurity + pipelines, Python),
con la política de archivado OSINT y página madre docs/capabilities/recon.md.

Lookups y sondeo (wrappers de CLI):
- whois_lookup, rdap_lookup, dns_records, ping_host, traceroute_host, nmap_scan
- save_scan_to_osint (sink común) + recon_osint (pipeline one-shot scan+archivado)

Escaneo de puertos/servicios nativo (stdlib, sin nmap ni sudo):
- scan_tcp_ports: connect-scan TCP concurrente (open/closed/filtered)
- grab_service_banner: banner grab + identificación de servicio/versión real
- identify_port_service: puro, puerto -> servicio IANA esperado (~120 puertos)
- scan_port_services: pipeline one-shot (scan -> identify + banner por puerto abierto)

Fingerprint de tecnología web (estilo Wappalyzer), patrón pura/impura:
- fetch_http_fingerprint: GET stdlib, recoge headers/html/cookies (solo nombres)
- detect_web_tech: puro, matchea ~50 firmas regex -> tecnologías por categoría
- fingerprint_web_stack: pipeline one-shot url -> tecnologías

Todas devuelven dict {status} sin lanzar. Tests: 43 verdes, sin red externa.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 15:12:07 +02:00

229 lines
8.1 KiB
Python

"""Pipeline recon_osint.
One-shot que ejecuta UN escaneo de red atomico y SIEMPRE lo archiva en OSINT.
Materializa la politica "todo escaneo que lancemos se guarda en osint":
convierte el patron de dos llamadas (scan atomico + sink a OSINT) en una sola
invocacion. Compone funciones del registry del dominio cybersecurity; no
reescribe ninguna logica de escaneo ni de persistencia.
Funciones del registry compuestas (importadas, no reimplementadas):
whois_lookup, rdap_lookup, dns_records, ping_host, traceroute_host,
nmap_scan, save_scan_to_osint
"""
from cybersecurity import (
whois_lookup,
rdap_lookup,
dns_records,
ping_host,
traceroute_host,
nmap_scan,
save_scan_to_osint,
)
VALID_SCAN_TYPES = ("whois", "rdap", "dns", "ping", "traceroute", "nmap")
def recon_osint(
target: str,
scan_type: str = "whois",
save: bool = True,
profile: str = "quick",
record_types: list[str] | None = None,
count: int = 4,
max_hops: int = 30,
timeout_s: int | None = None,
confirm: bool = False,
) -> dict:
"""Ejecuta un escaneo de red atomico y lo archiva en OSINT en un solo paso.
Despacha al escaneo atomico correspondiente segun ``scan_type``, captura su
resultado y, si el escaneo tuvo exito y ``save`` es True, lo guarda en el
vault OSINT via ``save_scan_to_osint``. Nunca lanza excepciones: cualquier
fallo se refleja en la clave ``status`` del dict devuelto.
Args:
target: dominio, host o IP objetivo del escaneo.
scan_type: tipo de escaneo, uno de whois, rdap, dns, ping, traceroute,
nmap.
save: si True (default) archiva el resultado en OSINT; si False solo
ejecuta el escaneo.
profile: perfil de nmap (solo aplica a scan_type='nmap'). Ej. quick,
full-tcp, vuln.
record_types: tipos de registro DNS a consultar (solo scan_type='dns').
None usa los defaults de dns_records.
count: numero de paquetes ICMP (solo scan_type='ping').
max_hops: numero maximo de saltos (solo scan_type='traceroute').
timeout_s: timeout en segundos. Si se pasa, se propaga al escaneo
atomico; si es None, cada atomico usa su default.
confirm: confirmacion explicita para el escaneo activo de nmap (solo
aplica a scan_type='nmap'). Por defecto False: nmap rechaza targets
publicos/desconocidos no autorizados. Pasar True solo cuando el
escaneo este autorizado. Se ignora para los demas scan_type.
Returns:
dict con:
- status: 'ok' | 'error'.
- target: el objetivo escaneado.
- scan_type: el tipo de escaneo ejecutado.
- scan: dict crudo devuelto por la funcion atomica.
- osint: dict devuelto por save_scan_to_osint, o None si save=False.
En caso de scan_type invalido devuelve
{"status": "error", "stage": "validate", ...} con la lista de tipos
validos. Si el escaneo falla devuelve
{"status": "error", "stage": "scan", "scan": <dict>} sin intentar guardar.
"""
if scan_type not in VALID_SCAN_TYPES:
return {
"status": "error",
"stage": "validate",
"target": target,
"scan_type": scan_type,
"error": f"invalid scan_type '{scan_type}'",
"valid": list(VALID_SCAN_TYPES),
}
# 1. Despacho al escaneo atomico correspondiente.
if scan_type == "whois":
kwargs = {} if timeout_s is None else {"timeout_s": timeout_s}
scan = whois_lookup(target, **kwargs)
elif scan_type == "rdap":
kwargs = {} if timeout_s is None else {"timeout_s": timeout_s}
scan = rdap_lookup(target, **kwargs)
elif scan_type == "dns":
kwargs = {"record_types": record_types}
if timeout_s is not None:
kwargs["timeout_s"] = timeout_s
scan = dns_records(target, **kwargs)
elif scan_type == "ping":
kwargs = {"count": count}
if timeout_s is not None:
kwargs["timeout_s"] = timeout_s
scan = ping_host(target, **kwargs)
elif scan_type == "traceroute":
kwargs = {"max_hops": max_hops}
if timeout_s is not None:
kwargs["timeout_s"] = timeout_s
scan = traceroute_host(target, **kwargs)
else: # nmap
kwargs = {"profile": profile, "confirm": confirm}
if timeout_s is not None:
kwargs["timeout_s"] = timeout_s
scan = nmap_scan(target, **kwargs)
# 2. Si el escaneo fallo, no intentamos guardar.
if scan.get("status") == "error":
return {"status": "error", "stage": "scan", "scan": scan}
# 3. Construye el summary segun el tipo de escaneo.
if scan_type == "whois":
summary = {
"registrar": scan.get("registrar"),
"expiry_date": scan.get("expiry_date"),
}
elif scan_type == "rdap":
summary = {
"handle": scan.get("handle"),
"ldhName": scan.get("ldhName"),
}
elif scan_type == "dns":
summary = {"records": scan.get("records")}
elif scan_type == "ping":
summary = {
"loss_pct": scan.get("loss_pct"),
"rtt_avg_ms": scan.get("rtt_avg_ms"),
}
elif scan_type == "traceroute":
summary = {"hop_count": len(scan.get("hops") or [])}
else: # nmap
summary = {
"open_ports": scan.get("open_ports"),
"hosts_up": scan.get("hosts_up"),
"profile": profile,
}
# 4. Archiva en OSINT si procede.
osint = None
if save:
osint = save_scan_to_osint(
target,
scan_type,
scan.get("raw", ""),
summary=summary,
tool=scan_type,
)
return {
"status": "ok",
"target": target,
"scan_type": scan_type,
"scan": scan,
"osint": osint,
}
def _parse_cli(argv: list[str]) -> dict:
"""Parsea los args de CLI: <target> [scan_type] [--confirm] [--allowlist a,b,c].
Mantiene compatibilidad: sin args usa el smoke por defecto; sin --confirm,
confirm=False. Devuelve un dict de kwargs para recon_osint mas la allowlist.
"""
positional: list[str] = []
confirm = False
allowlist: list[str] | None = None
i = 0
while i < len(argv):
arg = argv[i]
if arg == "--confirm":
confirm = True
elif arg == "--allowlist":
if i + 1 < len(argv):
allowlist = [a.strip() for a in argv[i + 1].split(",") if a.strip()]
i += 1
elif arg.startswith("--allowlist="):
raw = arg.split("=", 1)[1]
allowlist = [a.strip() for a in raw.split(",") if a.strip()]
else:
positional.append(arg)
i += 1
return {"positional": positional, "confirm": confirm, "allowlist": allowlist}
if __name__ == "__main__":
import sys
parsed = _parse_cli(sys.argv[1:])
positional = parsed["positional"]
target = positional[0] if len(positional) >= 1 else "example.com"
scan_type = positional[1] if len(positional) >= 2 else "whois"
# --confirm activa confirm directamente. --allowlist activa confirm cuando
# el target esta autorizado en la lista (mismo criterio que nmap_scan).
confirm = parsed["confirm"]
allowlist = parsed["allowlist"]
if not confirm and allowlist:
t = target.strip()
if any(t == a or t.endswith(a) for a in allowlist):
confirm = True
try:
result = recon_osint(
target,
scan_type,
confirm=confirm,
)
print("status:", result.get("status"))
# Para nmap rechazado por el guard, el dict trae stage=scan + scan.needs_confirm.
scan = result.get("scan") or {}
if scan.get("needs_confirm"):
print("needs_confirm:", True)
print("error:", scan.get("error"))
osint = result.get("osint") or {}
print("note_path:", osint.get("note_path"))
print("registered:", osint.get("registered"))
except Exception as exc: # smoke tolerante a red / servicios caidos
print("smoke exception (tolerada):", repr(exc))