Files
fn_registry/python/functions/infra/safe_extract_zip.py
T
egutierrez 5a324f6554 feat: funciones Python infra y tipos Python (core, datascience, infra)
Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json,
http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory,
setup_logger, normalize_zip_filenames.
Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...),
6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:43 +02:00

81 lines
2.6 KiB
Python

"""Safe ZIP extraction with Zip Slip protection and filename normalization."""
import os
import zipfile
from pathlib import Path
def normalize_zip_filenames(zipf: zipfile.ZipFile) -> None:
"""Repara nombres de archivos UTF-8 en ZIPs sin el flag UTF-8 seteado.
Args:
zipf: Objeto ZipFile abierto en modo lectura.
Returns:
None. Modifica los infolist del ZipFile in-place.
"""
def _is_cjk(s: str) -> bool:
return any(
"\u3400" <= c <= "\u4dbf"
or "\u4e00" <= c <= "\u9fff"
or "\u3000" <= c <= "\u303f"
or "\uff00" <= c <= "\uffef"
for c in s
)
def _is_mojibake(s: str) -> bool:
return any(
"\u0370" <= c <= "\u03ff" # Greek
or "\u2200" <= c <= "\u22ff" # Math
or "\u2500" <= c <= "\u257f" # Box Drawing
for c in s
)
repaired = False
for info in zipf.infolist():
# Flag 0x800 indica que el filename ya esta en UTF-8
if info.flag_bits & 0x800:
continue
try:
repaired_name = info.filename.encode("cp437").decode("utf-8")
if _is_cjk(repaired_name) and _is_mojibake(info.filename):
info.filename = repaired_name
repaired = True
except (UnicodeEncodeError, UnicodeDecodeError):
pass
if repaired:
zipf.metadata_encoding = "utf-8"
def safe_extract_zip(zip_path: str, dest_dir: str) -> None:
"""Extrae un archivo ZIP con proteccion contra Zip Slip (path traversal).
Valida que cada archivo extraido quede dentro del directorio destino antes
de extraerlo. Normaliza los nombres de archivo UTF-8 antes de extraer.
Args:
zip_path: Ruta al archivo ZIP a extraer.
dest_dir: Directorio de destino para la extraccion.
Raises:
ValueError: Si se detecta un intento de Zip Slip (path traversal).
zipfile.BadZipFile: Si el archivo no es un ZIP valido.
FileNotFoundError: Si zip_path no existe.
"""
dest = Path(dest_dir).resolve()
with zipfile.ZipFile(zip_path, "r") as zipf:
normalize_zip_filenames(zipf)
for member in zipf.infolist():
member_path = (dest / member.filename).resolve()
# Verificar que el path resultante este dentro de dest_dir
if not str(member_path).startswith(str(dest) + os.sep):
raise ValueError(
f"Zip Slip attempt detected: {member.filename!r} would extract to {member_path}"
)
zipf.extract(member, dest)