Files
fn_registry/python/functions/cybersecurity/nmap_scan.py
T
egutierrez 935008ec3f feat(recon): grupo de reconocimiento de red + servicios + fingerprint web
Añade el capability group `recon` (dominio cybersecurity + pipelines, Python),
con la política de archivado OSINT y página madre docs/capabilities/recon.md.

Lookups y sondeo (wrappers de CLI):
- whois_lookup, rdap_lookup, dns_records, ping_host, traceroute_host, nmap_scan
- save_scan_to_osint (sink común) + recon_osint (pipeline one-shot scan+archivado)

Escaneo de puertos/servicios nativo (stdlib, sin nmap ni sudo):
- scan_tcp_ports: connect-scan TCP concurrente (open/closed/filtered)
- grab_service_banner: banner grab + identificación de servicio/versión real
- identify_port_service: puro, puerto -> servicio IANA esperado (~120 puertos)
- scan_port_services: pipeline one-shot (scan -> identify + banner por puerto abierto)

Fingerprint de tecnología web (estilo Wappalyzer), patrón pura/impura:
- fetch_http_fingerprint: GET stdlib, recoge headers/html/cookies (solo nombres)
- detect_web_tech: puro, matchea ~50 firmas regex -> tecnologías por categoría
- fingerprint_web_stack: pipeline one-shot url -> tecnologías

Todas devuelven dict {status} sin lanzar. Tests: 43 verdes, sin red externa.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 15:12:07 +02:00

296 lines
12 KiB
Python

"""Wrapper de `nmap` para escaneo de red por perfiles, con salida XML parseada.
Funcion IMPURA: ejecuta el binario `nmap` como subprocess. Es la funcion
estrella de reconocimiento del registry, pensada tanto para escaneos rapidos
en primer plano como para escaneos largos en segundo plano (full TCP, vuln,
UDP). Siempre pide salida XML (`-oX`) y la parsea con `xml.etree.ElementTree`
para devolver puertos abiertos y hosts vivos de forma estructurada.
NO lanza excepciones: devuelve un dict con `status` "ok" o "error". Solo escanear
hosts autorizados/propios.
"""
import ipaddress
import os
import re
import subprocess
import tempfile
import time
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
_LOCAL_SUFFIXES = (".local", ".lan", ".internal", ".home", ".corp")
# Perfiles de escaneo: cada uno mapea a una lista de flags de nmap.
# Sin sudo por defecto: nmap cae a connect-scan TCP (-sT implicito) sin root.
# Los perfiles que requieren privilegios (os, udp-top, parte de aggressive)
# se documentan en el .md (Gotchas).
PROFILES = {
"quick": ["-T4", "-F"], # top 100 puertos, rapido
"top1000": ["-T4"], # default nmap (1000 puertos)
"full-tcp": ["-p-", "-T4"], # los 65535 TCP — LARGO
"service": ["-sV", "-sC", "-T4"], # version + scripts default
"udp-top": ["-sU", "--top-ports", "100", "-T4"], # UDP top 100 — LARGO/sudo
"vuln": ["-sV", "--script", "vuln", "-T4"], # scripts de vulnerabilidades
"discovery": ["-sn"], # ping sweep / host discovery
"aggressive": ["-A", "-T4"], # OS+version+script+traceroute
"os": ["-O"], # OS detection — REQUIERE sudo
}
def _sanitize_target(target: str) -> str:
"""Convierte un target en un fragmento seguro para nombre de archivo."""
return re.sub(r"[^A-Za-z0-9._-]", "_", target.strip())
def _target_is_private(target: str):
"""True si el target es claramente privado/local (no requiere confirm),
False si es claramente publico, None si no se puede decidir (hostname publico)."""
t = (target or "").strip()
try:
net = ipaddress.ip_network(t, strict=False) # acepta IP o CIDR
return net.is_private or net.is_loopback or net.is_link_local
except ValueError:
pass
low = t.lower()
if low == "localhost" or low.endswith(_LOCAL_SUFFIXES):
return True
return None # hostname publico/desconocido
def _parse_xml(xml_path: str) -> tuple[list, list, str]:
"""Parsea el XML de nmap.
Returns:
(open_ports, hosts_up, host_status) donde open_ports es una lista de
dicts con detalle de cada puerto open/open|filtered, hosts_up una lista
de direcciones de hosts vivos, y host_status el estado del primer host.
"""
open_ports: list = []
hosts_up: list = []
host_status = ""
tree = ET.parse(xml_path)
root = tree.getroot()
for host in root.findall("host"):
status_el = host.find("status")
state = status_el.get("state", "") if status_el is not None else ""
# Direccion del host (prioriza IPv4, cae a la primera address disponible).
addr = ""
for addr_el in host.findall("address"):
if addr_el.get("addrtype") == "ipv4":
addr = addr_el.get("addr", "")
break
if not addr:
first_addr = host.find("address")
if first_addr is not None:
addr = first_addr.get("addr", "")
if state == "up":
if addr:
hosts_up.append(addr)
if not host_status:
host_status = state
ports_el = host.find("ports")
if ports_el is None:
continue
for port_el in ports_el.findall("port"):
state_el = port_el.find("state")
port_state = state_el.get("state", "") if state_el is not None else ""
if port_state not in ("open", "open|filtered"):
continue
service_el = port_el.find("service")
open_ports.append({
"port": int(port_el.get("portid", "0")),
"proto": port_el.get("protocol", ""),
"state": port_state,
"service": service_el.get("name", "") if service_el is not None else "",
"product": service_el.get("product", "") if service_el is not None else "",
"version": service_el.get("version", "") if service_el is not None else "",
})
return open_ports, hosts_up, host_status
def nmap_scan(
target: str,
profile: str = "quick",
ports: str | None = None,
extra_args: list[str] | None = None,
out_dir: str | None = None,
timeout_s: int = 1800,
confirm: bool = False,
allowlist: list[str] | None = None,
) -> dict:
"""Ejecuta `nmap` contra un target segun un perfil y devuelve un dict.
Construye el comando con los flags del perfil, fuerza salida XML con `-oX`,
ejecuta nmap como subprocess y parsea el XML para extraer puertos abiertos
y hosts vivos.
Args:
target: Host, IP o rango CIDR a escanear (ej. "scanme.nmap.org",
"192.168.1.10", "192.168.1.0/24" para discovery).
profile: Clave de PROFILES. quick (-T4 -F), top1000 (-T4), full-tcp
(-p- -T4), service (-sV -sC -T4), udp-top (-sU --top-ports 100 -T4),
vuln (-sV --script vuln -T4), discovery (-sn), aggressive (-A -T4),
os (-O). Si no esta en PROFILES devuelve status error.
ports: Especificacion de puertos para -p (ej. "22,80,443" o "1-1000").
Si se pasa, anade "-p <ports>" al comando.
extra_args: Lista de flags adicionales de nmap a anadir tal cual.
out_dir: Directorio donde guardar el XML. Si se pasa, se crea y el XML
se guarda como nmap-<profile>-<target>-<timestamp>.xml. Si no, se
usa un archivo temporal.
timeout_s: Segundos maximos de ejecucion. Default 1800 (30 min). Para
scans largos (full-tcp, vuln, udp-top) subir este valor.
confirm: Confirmacion explicita para escanear un target publico o
desconocido. Por defecto False: si el target no es claramente
privado/local y no esta en allowlist, el escaneo se rechaza con
status error y needs_confirm=True (proteccion anti-escaneo no
autorizado). Pasar True solo cuando el escaneo este autorizado.
allowlist: Lista de targets autorizados. Un target pasa el guard sin
confirm si coincide exactamente con una entrada o termina en ella
(ej. allowlist=["scanme.nmap.org"] o ["example.com"]). None o lista
vacia no autoriza nada.
Returns:
Dict con status "ok" o "error". Nunca lanza.
ok: {"status":"ok","target","profile","command","open_ports":[...],
"hosts_up":[...],"xml_path","raw","elapsed_s","started"}
error: {"status":"error","error":str}
"""
started_dt = datetime.now(timezone.utc)
started_iso = started_dt.isoformat()
start_perf = time.monotonic()
if not target or not target.strip():
return {"status": "error", "error": "nmap_scan: target vacio"}
if profile not in PROFILES:
valid = ", ".join(sorted(PROFILES.keys()))
return {
"status": "error",
"error": f"nmap_scan: perfil '{profile}' invalido. Validos: {valid}",
}
# Guard de seguridad: el escaneo activo contra targets publicos/desconocidos
# requiere confirmacion explicita o allowlist (anti-escaneo no autorizado).
if not confirm:
t = target.strip()
allowed = bool(allowlist) and any(t == a or t.endswith(a) for a in allowlist)
if _target_is_private(t) is not True and not allowed:
return {
"status": "error",
"error": (
f"nmap_scan: target '{target}' no es privado/local; el escaneo activo "
"requiere confirm=True o que el target este en allowlist "
"(solo objetivos propios o con autorizacion explicita)"
),
"needs_confirm": True,
}
# Resolver path del XML de salida.
xml_path = ""
try:
if out_dir:
os.makedirs(out_dir, exist_ok=True)
ts = started_dt.strftime("%Y%m%d-%H%M%S")
fname = f"nmap-{profile}-{_sanitize_target(target)}-{ts}.xml"
xml_path = os.path.join(out_dir, fname)
else:
fd, xml_path = tempfile.mkstemp(prefix="nmap-", suffix=".xml")
os.close(fd)
except OSError as e:
return {"status": "error", "error": f"nmap_scan: no se pudo preparar XML: {e}"}
# Construir comando: nmap <profile_flags> [-p ports] [extra_args] -oX <xml> <target>
cmd = ["nmap"]
cmd.extend(PROFILES[profile])
if ports:
cmd.extend(["-p", ports])
if extra_args:
cmd.extend(extra_args)
cmd.extend(["-oX", xml_path, target])
command_str = " ".join(cmd)
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout_s,
)
except FileNotFoundError:
return {"status": "error", "error": "nmap_scan: binario `nmap` no encontrado en PATH"}
except subprocess.TimeoutExpired:
return {
"status": "error",
"error": (
f"nmap_scan: nmap excedio timeout_s={timeout_s}; usa un perfil mas "
"ligero o sube timeout_s para scans largos (full-tcp/vuln/udp-top)"
),
}
except OSError as e:
return {"status": "error", "error": f"nmap_scan: error ejecutando nmap: {e}"}
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
return {
"status": "error",
"error": f"nmap_scan: nmap salio con codigo {proc.returncode}: {stderr}",
}
# Parsear el XML generado.
try:
open_ports, hosts_up, host_status = _parse_xml(xml_path)
except (ET.ParseError, FileNotFoundError, OSError) as e:
return {
"status": "error",
"error": f"nmap_scan: nmap ejecuto pero no se pudo parsear el XML: {e}",
}
elapsed_s = round(time.monotonic() - start_perf, 3)
return {
"status": "ok",
"target": target,
"profile": profile,
"command": command_str,
"open_ports": open_ports,
"hosts_up": hosts_up,
"host_status": host_status,
"xml_path": xml_path,
"raw": proc.stdout,
"elapsed_s": elapsed_s,
"started": started_iso,
}
if __name__ == "__main__":
# Smoke: escaneo rapido contra el host oficial de pruebas de nmap.
# Tolera fallo de red sin romper (exit 0 siempre).
try:
# scanme.nmap.org es el host oficial de pruebas de nmap: legal escanear.
# Pasa por el guard via allowlist.
result = nmap_scan(
"scanme.nmap.org",
profile="quick",
timeout_s=120,
allowlist=["scanme.nmap.org"],
)
if result["status"] == "ok":
print(f"[ok] {result['target']} ({result['profile']}) en {result['elapsed_s']}s")
print(f"command: {result['command']}")
print(f"open_ports ({len(result['open_ports'])}):")
for p in result["open_ports"]:
print(f" {p['port']}/{p['proto']} {p['state']} {p['service']} "
f"{p['product']} {p['version']}".rstrip())
print(f"xml_path: {result['xml_path']}")
else:
print(f"[error tolerado] {result['error']}")
except Exception as e: # noqa: BLE001 - smoke nunca debe romper
print(f"[excepcion tolerada en smoke] {e}")