Files
fn_registry/python/functions/cybersecurity/grab_service_banner.py
T
egutierrez 935008ec3f feat(recon): grupo de reconocimiento de red + servicios + fingerprint web
Añade el capability group `recon` (dominio cybersecurity + pipelines, Python),
con la política de archivado OSINT y página madre docs/capabilities/recon.md.

Lookups y sondeo (wrappers de CLI):
- whois_lookup, rdap_lookup, dns_records, ping_host, traceroute_host, nmap_scan
- save_scan_to_osint (sink común) + recon_osint (pipeline one-shot scan+archivado)

Escaneo de puertos/servicios nativo (stdlib, sin nmap ni sudo):
- scan_tcp_ports: connect-scan TCP concurrente (open/closed/filtered)
- grab_service_banner: banner grab + identificación de servicio/versión real
- identify_port_service: puro, puerto -> servicio IANA esperado (~120 puertos)
- scan_port_services: pipeline one-shot (scan -> identify + banner por puerto abierto)

Fingerprint de tecnología web (estilo Wappalyzer), patrón pura/impura:
- fetch_http_fingerprint: GET stdlib, recoge headers/html/cookies (solo nombres)
- detect_web_tech: puro, matchea ~50 firmas regex -> tecnologías por categoría
- fingerprint_web_stack: pipeline one-shot url -> tecnologías

Todas devuelven dict {status} sin lanzar. Tests: 43 verdes, sin red externa.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 15:12:07 +02:00

335 lines
12 KiB
Python

"""Captura e identificacion heuristica del banner de un servicio TCP.
Funcion IMPURA: abre un socket TCP a host:port, opcionalmente envia un probe
(por ejemplo `HEAD / HTTP/1.0` para puertos HTTP), lee el banner inicial que
emite el servicio y lo identifica heuristicamente (ssh, ftp, smtp, http, mysql,
redis, telnet, pop3, imap, ...). Solo usa la stdlib (`socket`, `re`, `struct`).
Complementa a un escaneo de puertos: mientras un port scan solo dice si el
puerto esta abierto, esta funcion dice QUE servicio (y a menudo que producto y
version) habla detras del puerto, sin depender de `nmap -sV`.
NO lanza excepciones: devuelve SIEMPRE un dict con `status` "ok" o "error" y un
campo `raw` con el banner crudo en forma segura (repr). Solo conectar a hosts
propios o con autorizacion explicita.
"""
import re
import socket
import struct
# Probes activos por puerto bien conocido. Si el puerto esta aqui y
# send_probe=True, se envia el probe tras conectar para provocar respuesta de
# servicios que no emiten banner pasivo (HTTP es el caso tipico). El resto de
# servicios (SSH/FTP/SMTP/POP3/IMAP) suelen emitir banner solo con conectar, asi
# que para ellos no se envia nada.
_HTTP_PORTS = (80, 8080, 8000, 8888, 8081, 8008)
_HTTP_PROBE = b"HEAD / HTTP/1.0\r\n\r\n"
# Mapa de probes por puerto. Permite anadir probes especificos por puerto.
_PROBES: dict[int, bytes] = {p: _HTTP_PROBE for p in _HTTP_PORTS}
def _decode_best_effort(data: bytes) -> str:
"""Decodifica bytes a str probando utf-8 y cayendo a latin-1 (nunca falla)."""
if not data:
return ""
try:
return data.decode("utf-8")
except UnicodeDecodeError:
# latin-1 mapea todos los bytes 0-255: nunca lanza, puede dar mojibake.
return data.decode("latin-1", errors="replace")
def _parse_http(text: str) -> tuple[str, str]:
"""Extrae (product, version) de una respuesta HTTP best-effort.
Lee la cabecera `Server:` si esta presente (ej. "Server: nginx/1.18.0").
"""
m = re.search(r"^Server:\s*(.+)$", text, re.IGNORECASE | re.MULTILINE)
if not m:
return "", ""
server = m.group(1).strip()
# "nginx/1.18.0 (Ubuntu)" -> product "nginx", version "1.18.0".
vm = re.match(r"([^/\s]+)/([^\s;]+)", server)
if vm:
return vm.group(1), vm.group(2)
return server, ""
def _parse_ssh(text: str) -> tuple[str, str]:
"""Extrae (product, version) de un banner SSH (ej. SSH-2.0-OpenSSH_8.9p1)."""
m = re.search(r"SSH-[\d.]+-([A-Za-z0-9_.+-]+)", text)
if not m:
return "", ""
impl = m.group(1)
# "OpenSSH_8.9p1" -> product "OpenSSH", version "8.9p1".
vm = re.match(r"([A-Za-z]+)[_/-]([\d][\w.+-]*)", impl)
if vm:
return vm.group(1), vm.group(2)
return impl, ""
def _parse_ftp(text: str) -> tuple[str, str]:
"""Extrae (product, version) de un banner FTP (ej. 220 vsFTPd 3.0.3)."""
for product, rx in (
("vsFTPd", r"vsFTPd\s+([\d][\w.]*)"),
("ProFTPD", r"ProFTPD\s+([\d][\w.]*)"),
("Pure-FTPd", r"Pure-FTPd"),
("FileZilla", r"FileZilla\s+Server\s*(?:version\s*)?([\d][\w.]*)?"),
):
m = re.search(rx, text, re.IGNORECASE)
if m:
try:
ver = m.group(1) or ""
except IndexError:
ver = ""
return product, ver or ""
return "", ""
def _parse_smtp(text: str) -> tuple[str, str]:
"""Extrae (product, version) de un banner SMTP (ej. 220 mail ESMTP Postfix)."""
for product, rx in (
("Postfix", r"Postfix"),
("Exim", r"Exim\s+([\d][\w.]*)"),
("Sendmail", r"Sendmail\s+([\d][\w.+/]*)"),
("Microsoft ESMTP", r"Microsoft\s+ESMTP"),
):
m = re.search(rx, text, re.IGNORECASE)
if m:
try:
ver = m.group(1) or ""
except IndexError:
ver = ""
return product, ver or ""
return "", ""
def _parse_mysql(data: bytes) -> tuple[str, str]:
"""Extrae la version del server MySQL/MariaDB del handshake binario.
El primer paquete del protocolo MySQL es:
[3 bytes length][1 byte seq][1 byte protocol version][server version NUL-terminated]...
"""
if len(data) < 6:
return "", ""
try:
# struct: longitud (3 bytes little-endian) + seq (1 byte).
proto_ver = data[4]
if proto_ver != 10: # protocolo handshake v10 (el comun)
return "", ""
# La version del server empieza en el byte 5 y termina en NUL.
end = data.index(b"\x00", 5)
version = data[5:end].decode("latin-1", errors="replace")
product = "MariaDB" if "mariadb" in version.lower() else "MySQL"
# Limpia version a algo tipo "8.0.32" / "10.6.12-MariaDB".
vm = re.match(r"([\d][\w.+-]*)", version)
return product, (vm.group(1) if vm else version)
except (ValueError, IndexError, struct.error):
return "", ""
def _identify(text: str, raw_bytes: bytes) -> tuple[str, str, str]:
"""Identifica (service, product, version) a partir del banner.
Heuristica por substring/regex sobre el texto decodificado y, para MySQL,
sobre los bytes crudos del handshake binario.
"""
# SSH: banner empieza por "SSH-".
if text.startswith("SSH-") or "SSH-2.0" in text or "SSH-1." in text:
product, version = _parse_ssh(text)
return "ssh", product or "SSH", version
# HTTP: linea de estado "HTTP/x.y NNN".
if re.match(r"HTTP/\d", text) or "\nHTTP/" in text:
product, version = _parse_http(text)
return "http", product, version
# MySQL/MariaDB: handshake binario (protocolo 10). Detectar por bytes.
if len(raw_bytes) >= 6 and raw_bytes[4] == 10:
product, version = _parse_mysql(raw_bytes)
if product:
return "mysql", product, version
# Redis: responde a comandos con "-ERR"/"+OK"/"+PONG"; INFO empieza "# Server".
if text.startswith(("-ERR", "+PONG", "+OK", "# Server")) or "redis_version" in text:
vm = re.search(r"redis_version:([\d][\w.]*)", text)
return "redis", "Redis", (vm.group(1) if vm else "")
# FTP: respuesta de bienvenida "220 ..." con marcas FTP conocidas.
if text.startswith("220") and re.search(r"ftp|vsftpd|proftpd|pure-ftpd|filezilla", text, re.IGNORECASE):
product, version = _parse_ftp(text)
return "ftp", product, version
# SMTP: "220 ..." con "SMTP"/"ESMTP".
if text.startswith("220") and re.search(r"e?smtp", text, re.IGNORECASE):
product, version = _parse_smtp(text)
return "smtp", product, version
# POP3: respuesta de bienvenida "+OK ...".
if text.startswith("+OK"):
return "pop3", "", ""
# IMAP: respuesta de bienvenida "* OK ...".
if text.startswith("* OK") or "IMAP" in text.upper()[:40]:
return "imap", "", ""
# Generico "220 " sin marca clara -> probablemente FTP/SMTP sin identificar.
if text.startswith("220"):
return "ftp-or-smtp", "", ""
# Telnet: a menudo negocia con bytes IAC (0xFF) al conectar.
if raw_bytes.startswith(b"\xff"):
return "telnet", "", ""
return "unknown", "", ""
def grab_service_banner(
host: str,
port: int,
timeout_s: float = 3.0,
send_probe: bool = True,
) -> dict:
"""Conecta por TCP a host:port, lee el banner del servicio y lo identifica.
Abre un socket TCP, opcionalmente envia un probe (HTTP para puertos web),
lee hasta ~4096 bytes con timeout, decodifica best-effort e identifica el
servicio por heuristica (ssh, ftp, smtp, http, mysql, redis, pop3, imap,
telnet, ...). Extrae producto y version cuando es posible.
Args:
host: Hostname o IP del objetivo (ej. "scanme.nmap.org", "127.0.0.1").
Vacio devuelve status error.
port: Puerto TCP (ej. 22, 80, 3306). Fuera de 1..65535 devuelve error.
timeout_s: Timeout de conexion y de lectura en segundos. Default 3.0.
send_probe: Si True y el puerto esta en el mapa interno de probes (los
puertos HTTP tipicos: 80/8080/8000/8888/...), envia el probe HTTP
`HEAD / HTTP/1.0` para provocar respuesta. Para el resto de puertos
no envia nada e intenta leer el banner pasivo (SSH/FTP/SMTP/POP3/IMAP
emiten banner al conectar). Si False, nunca envia probe.
Returns:
Dict de estado. Nunca lanza.
ok: {"status":"ok", "host", "port":int, "service":str, "product":str,
"version":str, "banner":str (banner limpio), "raw":str (repr seguro
del banner crudo)}
error: {"status":"error", "error":str, "host", "port":int}
"""
if not host or not host.strip():
return {"status": "error", "error": "grab_service_banner: host vacio", "host": host, "port": port}
try:
port = int(port)
except (TypeError, ValueError):
return {
"status": "error",
"error": f"grab_service_banner: port invalido: {port!r}",
"host": host,
"port": port,
}
if not (1 <= port <= 65535):
return {
"status": "error",
"error": f"grab_service_banner: port fuera de rango 1..65535: {port}",
"host": host,
"port": port,
}
host = host.strip()
sock = None
try:
sock = socket.create_connection((host, port), timeout=timeout_s)
sock.settimeout(timeout_s)
# Probe activo solo si procede (puerto HTTP) y send_probe=True.
if send_probe and port in _PROBES:
try:
sock.sendall(_PROBES[port])
except OSError:
pass # algunos servicios cierran ante un probe inesperado
chunks: list[bytes] = []
total = 0
try:
while total < 4096:
data = sock.recv(4096 - total)
if not data:
break
chunks.append(data)
total += len(data)
# La mayoria de banners caben en un recv; si llega un salto de
# linea de fin de banner, paramos para no bloquear en el timeout.
if b"\n" in data and port not in _PROBES:
break
except socket.timeout:
pass # timeout de lectura: usamos lo recibido hasta ahora
raw_bytes = b"".join(chunks)
except socket.timeout:
return {
"status": "error",
"error": f"grab_service_banner: timeout conectando a {host}:{port} ({timeout_s}s)",
"host": host,
"port": port,
}
except ConnectionRefusedError:
return {
"status": "error",
"error": f"grab_service_banner: connection refused {host}:{port}",
"host": host,
"port": port,
}
except socket.gaierror as e:
return {
"status": "error",
"error": f"grab_service_banner: no se pudo resolver host '{host}': {e}",
"host": host,
"port": port,
}
except OSError as e:
return {
"status": "error",
"error": f"grab_service_banner: error de socket {host}:{port}: {e}",
"host": host,
"port": port,
}
finally:
if sock is not None:
try:
sock.close()
except OSError:
pass
text = _decode_best_effort(raw_bytes)
service, product, version = _identify(text, raw_bytes)
banner = text.strip()
return {
"status": "ok",
"host": host,
"port": port,
"service": service,
"product": product,
"version": version,
"banner": banner,
"raw": repr(raw_bytes),
}
if __name__ == "__main__":
# Smoke: intenta capturar el banner SSH del host oficial de pruebas de nmap.
# Tolera cualquier fallo de red sin romper (exit 0 siempre).
try:
result = grab_service_banner("scanme.nmap.org", 22, timeout_s=5)
print(result["status"])
if result["status"] == "ok":
print(f"service={result['service']} product={result['product']} version={result['version']}")
print(f"banner: {result['banner']}")
else:
print("error tolerado:", result.get("error"))
except Exception as exc: # noqa: BLE001 - smoke nunca debe romper
print("smoke fallo (tolerado):", exc)