"""Wrapper de `nmap` para escaneo de red por perfiles, con salida XML parseada. Funcion IMPURA: ejecuta el binario `nmap` como subprocess. Es la funcion estrella de reconocimiento del registry, pensada tanto para escaneos rapidos en primer plano como para escaneos largos en segundo plano (full TCP, vuln, UDP). Siempre pide salida XML (`-oX`) y la parsea con `xml.etree.ElementTree` para devolver puertos abiertos y hosts vivos de forma estructurada. NO lanza excepciones: devuelve un dict con `status` "ok" o "error". Solo escanear hosts autorizados/propios. """ import ipaddress import os import re import subprocess import tempfile import time import xml.etree.ElementTree as ET from datetime import datetime, timezone _LOCAL_SUFFIXES = (".local", ".lan", ".internal", ".home", ".corp") # Perfiles de escaneo: cada uno mapea a una lista de flags de nmap. # Sin sudo por defecto: nmap cae a connect-scan TCP (-sT implicito) sin root. # Los perfiles que requieren privilegios (os, udp-top, parte de aggressive) # se documentan en el .md (Gotchas). PROFILES = { "quick": ["-T4", "-F"], # top 100 puertos, rapido "top1000": ["-T4"], # default nmap (1000 puertos) "full-tcp": ["-p-", "-T4"], # los 65535 TCP — LARGO "service": ["-sV", "-sC", "-T4"], # version + scripts default "udp-top": ["-sU", "--top-ports", "100", "-T4"], # UDP top 100 — LARGO/sudo "vuln": ["-sV", "--script", "vuln", "-T4"], # scripts de vulnerabilidades "discovery": ["-sn"], # ping sweep / host discovery "aggressive": ["-A", "-T4"], # OS+version+script+traceroute "os": ["-O"], # OS detection — REQUIERE sudo } def _sanitize_target(target: str) -> str: """Convierte un target en un fragmento seguro para nombre de archivo.""" return re.sub(r"[^A-Za-z0-9._-]", "_", target.strip()) def _target_is_private(target: str): """True si el target es claramente privado/local (no requiere confirm), False si es claramente publico, None si no se puede decidir (hostname publico).""" t = (target or "").strip() try: net = ipaddress.ip_network(t, strict=False) # acepta IP o CIDR return net.is_private or net.is_loopback or net.is_link_local except ValueError: pass low = t.lower() if low == "localhost" or low.endswith(_LOCAL_SUFFIXES): return True return None # hostname publico/desconocido def _parse_xml(xml_path: str) -> tuple[list, list, str]: """Parsea el XML de nmap. Returns: (open_ports, hosts_up, host_status) donde open_ports es una lista de dicts con detalle de cada puerto open/open|filtered, hosts_up una lista de direcciones de hosts vivos, y host_status el estado del primer host. """ open_ports: list = [] hosts_up: list = [] host_status = "" tree = ET.parse(xml_path) root = tree.getroot() for host in root.findall("host"): status_el = host.find("status") state = status_el.get("state", "") if status_el is not None else "" # Direccion del host (prioriza IPv4, cae a la primera address disponible). addr = "" for addr_el in host.findall("address"): if addr_el.get("addrtype") == "ipv4": addr = addr_el.get("addr", "") break if not addr: first_addr = host.find("address") if first_addr is not None: addr = first_addr.get("addr", "") if state == "up": if addr: hosts_up.append(addr) if not host_status: host_status = state ports_el = host.find("ports") if ports_el is None: continue for port_el in ports_el.findall("port"): state_el = port_el.find("state") port_state = state_el.get("state", "") if state_el is not None else "" if port_state not in ("open", "open|filtered"): continue service_el = port_el.find("service") open_ports.append({ "port": int(port_el.get("portid", "0")), "proto": port_el.get("protocol", ""), "state": port_state, "service": service_el.get("name", "") if service_el is not None else "", "product": service_el.get("product", "") if service_el is not None else "", "version": service_el.get("version", "") if service_el is not None else "", }) return open_ports, hosts_up, host_status def nmap_scan( target: str, profile: str = "quick", ports: str | None = None, extra_args: list[str] | None = None, out_dir: str | None = None, timeout_s: int = 1800, confirm: bool = False, allowlist: list[str] | None = None, ) -> dict: """Ejecuta `nmap` contra un target segun un perfil y devuelve un dict. Construye el comando con los flags del perfil, fuerza salida XML con `-oX`, ejecuta nmap como subprocess y parsea el XML para extraer puertos abiertos y hosts vivos. Args: target: Host, IP o rango CIDR a escanear (ej. "scanme.nmap.org", "192.168.1.10", "192.168.1.0/24" para discovery). profile: Clave de PROFILES. quick (-T4 -F), top1000 (-T4), full-tcp (-p- -T4), service (-sV -sC -T4), udp-top (-sU --top-ports 100 -T4), vuln (-sV --script vuln -T4), discovery (-sn), aggressive (-A -T4), os (-O). Si no esta en PROFILES devuelve status error. ports: Especificacion de puertos para -p (ej. "22,80,443" o "1-1000"). Si se pasa, anade "-p " al comando. extra_args: Lista de flags adicionales de nmap a anadir tal cual. out_dir: Directorio donde guardar el XML. Si se pasa, se crea y el XML se guarda como nmap---.xml. Si no, se usa un archivo temporal. timeout_s: Segundos maximos de ejecucion. Default 1800 (30 min). Para scans largos (full-tcp, vuln, udp-top) subir este valor. confirm: Confirmacion explicita para escanear un target publico o desconocido. Por defecto False: si el target no es claramente privado/local y no esta en allowlist, el escaneo se rechaza con status error y needs_confirm=True (proteccion anti-escaneo no autorizado). Pasar True solo cuando el escaneo este autorizado. allowlist: Lista de targets autorizados. Un target pasa el guard sin confirm si coincide exactamente con una entrada o termina en ella (ej. allowlist=["scanme.nmap.org"] o ["example.com"]). None o lista vacia no autoriza nada. Returns: Dict con status "ok" o "error". Nunca lanza. ok: {"status":"ok","target","profile","command","open_ports":[...], "hosts_up":[...],"xml_path","raw","elapsed_s","started"} error: {"status":"error","error":str} """ started_dt = datetime.now(timezone.utc) started_iso = started_dt.isoformat() start_perf = time.monotonic() if not target or not target.strip(): return {"status": "error", "error": "nmap_scan: target vacio"} if profile not in PROFILES: valid = ", ".join(sorted(PROFILES.keys())) return { "status": "error", "error": f"nmap_scan: perfil '{profile}' invalido. Validos: {valid}", } # Guard de seguridad: el escaneo activo contra targets publicos/desconocidos # requiere confirmacion explicita o allowlist (anti-escaneo no autorizado). if not confirm: t = target.strip() allowed = bool(allowlist) and any(t == a or t.endswith(a) for a in allowlist) if _target_is_private(t) is not True and not allowed: return { "status": "error", "error": ( f"nmap_scan: target '{target}' no es privado/local; el escaneo activo " "requiere confirm=True o que el target este en allowlist " "(solo objetivos propios o con autorizacion explicita)" ), "needs_confirm": True, } # Resolver path del XML de salida. xml_path = "" try: if out_dir: os.makedirs(out_dir, exist_ok=True) ts = started_dt.strftime("%Y%m%d-%H%M%S") fname = f"nmap-{profile}-{_sanitize_target(target)}-{ts}.xml" xml_path = os.path.join(out_dir, fname) else: fd, xml_path = tempfile.mkstemp(prefix="nmap-", suffix=".xml") os.close(fd) except OSError as e: return {"status": "error", "error": f"nmap_scan: no se pudo preparar XML: {e}"} # Construir comando: nmap [-p ports] [extra_args] -oX cmd = ["nmap"] cmd.extend(PROFILES[profile]) if ports: cmd.extend(["-p", ports]) if extra_args: cmd.extend(extra_args) cmd.extend(["-oX", xml_path, target]) command_str = " ".join(cmd) try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout_s, ) except FileNotFoundError: return {"status": "error", "error": "nmap_scan: binario `nmap` no encontrado en PATH"} except subprocess.TimeoutExpired: return { "status": "error", "error": ( f"nmap_scan: nmap excedio timeout_s={timeout_s}; usa un perfil mas " "ligero o sube timeout_s para scans largos (full-tcp/vuln/udp-top)" ), } except OSError as e: return {"status": "error", "error": f"nmap_scan: error ejecutando nmap: {e}"} if proc.returncode != 0: stderr = (proc.stderr or "").strip() return { "status": "error", "error": f"nmap_scan: nmap salio con codigo {proc.returncode}: {stderr}", } # Parsear el XML generado. try: open_ports, hosts_up, host_status = _parse_xml(xml_path) except (ET.ParseError, FileNotFoundError, OSError) as e: return { "status": "error", "error": f"nmap_scan: nmap ejecuto pero no se pudo parsear el XML: {e}", } elapsed_s = round(time.monotonic() - start_perf, 3) return { "status": "ok", "target": target, "profile": profile, "command": command_str, "open_ports": open_ports, "hosts_up": hosts_up, "host_status": host_status, "xml_path": xml_path, "raw": proc.stdout, "elapsed_s": elapsed_s, "started": started_iso, } if __name__ == "__main__": # Smoke: escaneo rapido contra el host oficial de pruebas de nmap. # Tolera fallo de red sin romper (exit 0 siempre). try: # scanme.nmap.org es el host oficial de pruebas de nmap: legal escanear. # Pasa por el guard via allowlist. result = nmap_scan( "scanme.nmap.org", profile="quick", timeout_s=120, allowlist=["scanme.nmap.org"], ) if result["status"] == "ok": print(f"[ok] {result['target']} ({result['profile']}) en {result['elapsed_s']}s") print(f"command: {result['command']}") print(f"open_ports ({len(result['open_ports'])}):") for p in result["open_ports"]: print(f" {p['port']}/{p['proto']} {p['state']} {p['service']} " f"{p['product']} {p['version']}".rstrip()) print(f"xml_path: {result['xml_path']}") else: print(f"[error tolerado] {result['error']}") except Exception as e: # noqa: BLE001 - smoke nunca debe romper print(f"[excepcion tolerada en smoke] {e}")