9fd0ca9cac
Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json, http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory, setup_logger, normalize_zip_filenames. Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...), 6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
81 lines
2.6 KiB
Python
81 lines
2.6 KiB
Python
"""Safe ZIP extraction with Zip Slip protection and filename normalization."""
|
|
|
|
import os
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
|
|
def normalize_zip_filenames(zipf: zipfile.ZipFile) -> None:
|
|
"""Repara nombres de archivos UTF-8 en ZIPs sin el flag UTF-8 seteado.
|
|
|
|
Args:
|
|
zipf: Objeto ZipFile abierto en modo lectura.
|
|
|
|
Returns:
|
|
None. Modifica los infolist del ZipFile in-place.
|
|
"""
|
|
def _is_cjk(s: str) -> bool:
|
|
return any(
|
|
"\u3400" <= c <= "\u4dbf"
|
|
or "\u4e00" <= c <= "\u9fff"
|
|
or "\u3000" <= c <= "\u303f"
|
|
or "\uff00" <= c <= "\uffef"
|
|
for c in s
|
|
)
|
|
|
|
def _is_mojibake(s: str) -> bool:
|
|
return any(
|
|
"\u0370" <= c <= "\u03ff" # Greek
|
|
or "\u2200" <= c <= "\u22ff" # Math
|
|
or "\u2500" <= c <= "\u257f" # Box Drawing
|
|
for c in s
|
|
)
|
|
|
|
repaired = False
|
|
for info in zipf.infolist():
|
|
# Flag 0x800 indica que el filename ya esta en UTF-8
|
|
if info.flag_bits & 0x800:
|
|
continue
|
|
try:
|
|
repaired_name = info.filename.encode("cp437").decode("utf-8")
|
|
if _is_cjk(repaired_name) and _is_mojibake(info.filename):
|
|
info.filename = repaired_name
|
|
repaired = True
|
|
except (UnicodeEncodeError, UnicodeDecodeError):
|
|
pass
|
|
|
|
if repaired:
|
|
zipf.metadata_encoding = "utf-8"
|
|
|
|
|
|
def safe_extract_zip(zip_path: str, dest_dir: str) -> None:
|
|
"""Extrae un archivo ZIP con proteccion contra Zip Slip (path traversal).
|
|
|
|
Valida que cada archivo extraido quede dentro del directorio destino antes
|
|
de extraerlo. Normaliza los nombres de archivo UTF-8 antes de extraer.
|
|
|
|
Args:
|
|
zip_path: Ruta al archivo ZIP a extraer.
|
|
dest_dir: Directorio de destino para la extraccion.
|
|
|
|
Raises:
|
|
ValueError: Si se detecta un intento de Zip Slip (path traversal).
|
|
zipfile.BadZipFile: Si el archivo no es un ZIP valido.
|
|
FileNotFoundError: Si zip_path no existe.
|
|
"""
|
|
dest = Path(dest_dir).resolve()
|
|
|
|
with zipfile.ZipFile(zip_path, "r") as zipf:
|
|
normalize_zip_filenames(zipf)
|
|
|
|
for member in zipf.infolist():
|
|
member_path = (dest / member.filename).resolve()
|
|
|
|
# Verificar que el path resultante este dentro de dest_dir
|
|
if not str(member_path).startswith(str(dest) + os.sep):
|
|
raise ValueError(
|
|
f"Zip Slip attempt detected: {member.filename!r} would extract to {member_path}"
|
|
)
|
|
|
|
zipf.extract(member, dest)
|