"""Captura e identificacion heuristica del banner de un servicio TCP. Funcion IMPURA: abre un socket TCP a host:port, opcionalmente envia un probe (por ejemplo `HEAD / HTTP/1.0` para puertos HTTP), lee el banner inicial que emite el servicio y lo identifica heuristicamente (ssh, ftp, smtp, http, mysql, redis, telnet, pop3, imap, ...). Solo usa la stdlib (`socket`, `re`, `struct`). Complementa a un escaneo de puertos: mientras un port scan solo dice si el puerto esta abierto, esta funcion dice QUE servicio (y a menudo que producto y version) habla detras del puerto, sin depender de `nmap -sV`. NO lanza excepciones: devuelve SIEMPRE un dict con `status` "ok" o "error" y un campo `raw` con el banner crudo en forma segura (repr). Solo conectar a hosts propios o con autorizacion explicita. """ import re import socket import struct # Probes activos por puerto bien conocido. Si el puerto esta aqui y # send_probe=True, se envia el probe tras conectar para provocar respuesta de # servicios que no emiten banner pasivo (HTTP es el caso tipico). El resto de # servicios (SSH/FTP/SMTP/POP3/IMAP) suelen emitir banner solo con conectar, asi # que para ellos no se envia nada. _HTTP_PORTS = (80, 8080, 8000, 8888, 8081, 8008) _HTTP_PROBE = b"HEAD / HTTP/1.0\r\n\r\n" # Mapa de probes por puerto. Permite anadir probes especificos por puerto. _PROBES: dict[int, bytes] = {p: _HTTP_PROBE for p in _HTTP_PORTS} def _decode_best_effort(data: bytes) -> str: """Decodifica bytes a str probando utf-8 y cayendo a latin-1 (nunca falla).""" if not data: return "" try: return data.decode("utf-8") except UnicodeDecodeError: # latin-1 mapea todos los bytes 0-255: nunca lanza, puede dar mojibake. return data.decode("latin-1", errors="replace") def _parse_http(text: str) -> tuple[str, str]: """Extrae (product, version) de una respuesta HTTP best-effort. Lee la cabecera `Server:` si esta presente (ej. "Server: nginx/1.18.0"). """ m = re.search(r"^Server:\s*(.+)$", text, re.IGNORECASE | re.MULTILINE) if not m: return "", "" server = m.group(1).strip() # "nginx/1.18.0 (Ubuntu)" -> product "nginx", version "1.18.0". vm = re.match(r"([^/\s]+)/([^\s;]+)", server) if vm: return vm.group(1), vm.group(2) return server, "" def _parse_ssh(text: str) -> tuple[str, str]: """Extrae (product, version) de un banner SSH (ej. SSH-2.0-OpenSSH_8.9p1).""" m = re.search(r"SSH-[\d.]+-([A-Za-z0-9_.+-]+)", text) if not m: return "", "" impl = m.group(1) # "OpenSSH_8.9p1" -> product "OpenSSH", version "8.9p1". vm = re.match(r"([A-Za-z]+)[_/-]([\d][\w.+-]*)", impl) if vm: return vm.group(1), vm.group(2) return impl, "" def _parse_ftp(text: str) -> tuple[str, str]: """Extrae (product, version) de un banner FTP (ej. 220 vsFTPd 3.0.3).""" for product, rx in ( ("vsFTPd", r"vsFTPd\s+([\d][\w.]*)"), ("ProFTPD", r"ProFTPD\s+([\d][\w.]*)"), ("Pure-FTPd", r"Pure-FTPd"), ("FileZilla", r"FileZilla\s+Server\s*(?:version\s*)?([\d][\w.]*)?"), ): m = re.search(rx, text, re.IGNORECASE) if m: try: ver = m.group(1) or "" except IndexError: ver = "" return product, ver or "" return "", "" def _parse_smtp(text: str) -> tuple[str, str]: """Extrae (product, version) de un banner SMTP (ej. 220 mail ESMTP Postfix).""" for product, rx in ( ("Postfix", r"Postfix"), ("Exim", r"Exim\s+([\d][\w.]*)"), ("Sendmail", r"Sendmail\s+([\d][\w.+/]*)"), ("Microsoft ESMTP", r"Microsoft\s+ESMTP"), ): m = re.search(rx, text, re.IGNORECASE) if m: try: ver = m.group(1) or "" except IndexError: ver = "" return product, ver or "" return "", "" def _parse_mysql(data: bytes) -> tuple[str, str]: """Extrae la version del server MySQL/MariaDB del handshake binario. El primer paquete del protocolo MySQL es: [3 bytes length][1 byte seq][1 byte protocol version][server version NUL-terminated]... """ if len(data) < 6: return "", "" try: # struct: longitud (3 bytes little-endian) + seq (1 byte). proto_ver = data[4] if proto_ver != 10: # protocolo handshake v10 (el comun) return "", "" # La version del server empieza en el byte 5 y termina en NUL. end = data.index(b"\x00", 5) version = data[5:end].decode("latin-1", errors="replace") product = "MariaDB" if "mariadb" in version.lower() else "MySQL" # Limpia version a algo tipo "8.0.32" / "10.6.12-MariaDB". vm = re.match(r"([\d][\w.+-]*)", version) return product, (vm.group(1) if vm else version) except (ValueError, IndexError, struct.error): return "", "" def _identify(text: str, raw_bytes: bytes) -> tuple[str, str, str]: """Identifica (service, product, version) a partir del banner. Heuristica por substring/regex sobre el texto decodificado y, para MySQL, sobre los bytes crudos del handshake binario. """ # SSH: banner empieza por "SSH-". if text.startswith("SSH-") or "SSH-2.0" in text or "SSH-1." in text: product, version = _parse_ssh(text) return "ssh", product or "SSH", version # HTTP: linea de estado "HTTP/x.y NNN". if re.match(r"HTTP/\d", text) or "\nHTTP/" in text: product, version = _parse_http(text) return "http", product, version # MySQL/MariaDB: handshake binario (protocolo 10). Detectar por bytes. if len(raw_bytes) >= 6 and raw_bytes[4] == 10: product, version = _parse_mysql(raw_bytes) if product: return "mysql", product, version # Redis: responde a comandos con "-ERR"/"+OK"/"+PONG"; INFO empieza "# Server". if text.startswith(("-ERR", "+PONG", "+OK", "# Server")) or "redis_version" in text: vm = re.search(r"redis_version:([\d][\w.]*)", text) return "redis", "Redis", (vm.group(1) if vm else "") # FTP: respuesta de bienvenida "220 ..." con marcas FTP conocidas. if text.startswith("220") and re.search(r"ftp|vsftpd|proftpd|pure-ftpd|filezilla", text, re.IGNORECASE): product, version = _parse_ftp(text) return "ftp", product, version # SMTP: "220 ..." con "SMTP"/"ESMTP". if text.startswith("220") and re.search(r"e?smtp", text, re.IGNORECASE): product, version = _parse_smtp(text) return "smtp", product, version # POP3: respuesta de bienvenida "+OK ...". if text.startswith("+OK"): return "pop3", "", "" # IMAP: respuesta de bienvenida "* OK ...". if text.startswith("* OK") or "IMAP" in text.upper()[:40]: return "imap", "", "" # Generico "220 " sin marca clara -> probablemente FTP/SMTP sin identificar. if text.startswith("220"): return "ftp-or-smtp", "", "" # Telnet: a menudo negocia con bytes IAC (0xFF) al conectar. if raw_bytes.startswith(b"\xff"): return "telnet", "", "" return "unknown", "", "" def grab_service_banner( host: str, port: int, timeout_s: float = 3.0, send_probe: bool = True, ) -> dict: """Conecta por TCP a host:port, lee el banner del servicio y lo identifica. Abre un socket TCP, opcionalmente envia un probe (HTTP para puertos web), lee hasta ~4096 bytes con timeout, decodifica best-effort e identifica el servicio por heuristica (ssh, ftp, smtp, http, mysql, redis, pop3, imap, telnet, ...). Extrae producto y version cuando es posible. Args: host: Hostname o IP del objetivo (ej. "scanme.nmap.org", "127.0.0.1"). Vacio devuelve status error. port: Puerto TCP (ej. 22, 80, 3306). Fuera de 1..65535 devuelve error. timeout_s: Timeout de conexion y de lectura en segundos. Default 3.0. send_probe: Si True y el puerto esta en el mapa interno de probes (los puertos HTTP tipicos: 80/8080/8000/8888/...), envia el probe HTTP `HEAD / HTTP/1.0` para provocar respuesta. Para el resto de puertos no envia nada e intenta leer el banner pasivo (SSH/FTP/SMTP/POP3/IMAP emiten banner al conectar). Si False, nunca envia probe. Returns: Dict de estado. Nunca lanza. ok: {"status":"ok", "host", "port":int, "service":str, "product":str, "version":str, "banner":str (banner limpio), "raw":str (repr seguro del banner crudo)} error: {"status":"error", "error":str, "host", "port":int} """ if not host or not host.strip(): return {"status": "error", "error": "grab_service_banner: host vacio", "host": host, "port": port} try: port = int(port) except (TypeError, ValueError): return { "status": "error", "error": f"grab_service_banner: port invalido: {port!r}", "host": host, "port": port, } if not (1 <= port <= 65535): return { "status": "error", "error": f"grab_service_banner: port fuera de rango 1..65535: {port}", "host": host, "port": port, } host = host.strip() sock = None try: sock = socket.create_connection((host, port), timeout=timeout_s) sock.settimeout(timeout_s) # Probe activo solo si procede (puerto HTTP) y send_probe=True. if send_probe and port in _PROBES: try: sock.sendall(_PROBES[port]) except OSError: pass # algunos servicios cierran ante un probe inesperado chunks: list[bytes] = [] total = 0 try: while total < 4096: data = sock.recv(4096 - total) if not data: break chunks.append(data) total += len(data) # La mayoria de banners caben en un recv; si llega un salto de # linea de fin de banner, paramos para no bloquear en el timeout. if b"\n" in data and port not in _PROBES: break except socket.timeout: pass # timeout de lectura: usamos lo recibido hasta ahora raw_bytes = b"".join(chunks) except socket.timeout: return { "status": "error", "error": f"grab_service_banner: timeout conectando a {host}:{port} ({timeout_s}s)", "host": host, "port": port, } except ConnectionRefusedError: return { "status": "error", "error": f"grab_service_banner: connection refused {host}:{port}", "host": host, "port": port, } except socket.gaierror as e: return { "status": "error", "error": f"grab_service_banner: no se pudo resolver host '{host}': {e}", "host": host, "port": port, } except OSError as e: return { "status": "error", "error": f"grab_service_banner: error de socket {host}:{port}: {e}", "host": host, "port": port, } finally: if sock is not None: try: sock.close() except OSError: pass text = _decode_best_effort(raw_bytes) service, product, version = _identify(text, raw_bytes) banner = text.strip() return { "status": "ok", "host": host, "port": port, "service": service, "product": product, "version": version, "banner": banner, "raw": repr(raw_bytes), } if __name__ == "__main__": # Smoke: intenta capturar el banner SSH del host oficial de pruebas de nmap. # Tolera cualquier fallo de red sin romper (exit 0 siempre). try: result = grab_service_banner("scanme.nmap.org", 22, timeout_s=5) print(result["status"]) if result["status"] == "ok": print(f"service={result['service']} product={result['product']} version={result['version']}") print(f"banner: {result['banner']}") else: print("error tolerado:", result.get("error")) except Exception as exc: # noqa: BLE001 - smoke nunca debe romper print("smoke fallo (tolerado):", exc)