Files
fn_registry/python/functions/infra/safe_extract_zip_test.py
T
egutierrez 5a324f6554 feat: funciones Python infra y tipos Python (core, datascience, infra)
Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json,
http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory,
setup_logger, normalize_zip_filenames.
Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...),
6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:43 +02:00

207 lines
7.0 KiB
Python

"""Tests para safe_extract_zip y normalize_zip_filenames."""
import io
import os
import struct
import tempfile
import zipfile
from safe_extract_zip import normalize_zip_filenames, safe_extract_zip
def _make_zip_with_raw_filename(raw_filename_bytes: bytes, content: bytes) -> bytes:
"""Crea un ZIP minimal con bytes de filename raw y sin flag 0x800.
Simula un ZIP creado en Windows donde el filename tiene bytes UTF-8
pero sin el flag de UTF-8 (0x800), causando que zipfile lo lea como CP437.
"""
crc = zipfile.crc32(content) & 0xFFFFFFFF
fname_len = len(raw_filename_bytes)
buf = io.BytesIO()
# Local file header
local_header = struct.pack(
"<4sHHHHHIIIHH",
b"PK\x03\x04", # signature
20, # version needed
0, # general purpose bit flag — sin 0x800
0, # compression: stored
0, # last mod time
0, # last mod date
crc,
len(content), # compressed size
len(content), # uncompressed size
fname_len,
0, # extra field length
)
buf.write(local_header)
buf.write(raw_filename_bytes)
buf.write(content)
# Central directory header
cd_offset = buf.tell()
cd_header = struct.pack(
"<4sHHHHHHIIIHHHHHII",
b"PK\x01\x02",
20, # version made by
20, # version needed
0, # flag — sin 0x800
0, # compression
0, # mod time
0, # mod date
crc,
len(content), # compressed size
len(content), # uncompressed size
fname_len,
0, # extra length
0, # comment length
0, # disk start
0, # internal attr
0, # external attr
0, # local header offset
)
buf.write(cd_header)
buf.write(raw_filename_bytes)
# End of central directory
eocd = struct.pack(
"<4sHHHHIIH",
b"PK\x05\x06",
0, 0, 1, 1,
len(cd_header) + fname_len,
cd_offset,
0,
)
buf.write(eocd)
return buf.getvalue()
def _make_zip(members: dict[str, bytes]) -> str:
"""Crea un ZIP temporal con los miembros dados {filename: content}."""
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
with zipfile.ZipFile(tmp, "w") as zipf:
for name, content in members.items():
zipf.writestr(name, content)
tmp.close()
return tmp.name
def _make_zip_with_traversal(traversal_name: str) -> str:
"""Crea un ZIP con un miembro cuyo nombre intenta path traversal."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zipf:
info = zipfile.ZipInfo(traversal_name)
zipf.writestr(info, b"malicious content")
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
tmp.write(buf.getvalue())
tmp.close()
return tmp.name
def test_zip_normal():
"""ZIP normal extrae correctamente dentro del destino."""
zip_path = _make_zip({"hello.txt": b"hello world", "subdir/file.py": b"# code"})
try:
with tempfile.TemporaryDirectory() as dest:
safe_extract_zip(zip_path, dest)
assert os.path.isfile(os.path.join(dest, "hello.txt"))
assert os.path.isfile(os.path.join(dest, "subdir", "file.py"))
with open(os.path.join(dest, "hello.txt"), "rb") as f:
assert f.read() == b"hello world"
finally:
os.unlink(zip_path)
def test_zip_con_path_traversal():
"""ZIP con path traversal lanza ValueError."""
zip_path = _make_zip_with_traversal("../../etc/passwd")
try:
with tempfile.TemporaryDirectory() as dest:
raised = False
try:
safe_extract_zip(zip_path, dest)
except ValueError as e:
raised = True
assert "Zip Slip" in str(e)
assert raised, "Expected ValueError for path traversal"
finally:
os.unlink(zip_path)
def test_zip_con_paths_absolutos():
"""ZIP con paths absolutos lanza ValueError."""
zip_path = _make_zip_with_traversal("/etc/passwd")
try:
with tempfile.TemporaryDirectory() as dest:
raised = False
try:
safe_extract_zip(zip_path, dest)
except ValueError as e:
raised = True
assert "Zip Slip" in str(e)
assert raised, "Expected ValueError for absolute path"
finally:
os.unlink(zip_path)
def test_normalize_utf8_correctos_no_cambian():
"""ZIP con nombres UTF-8 correctos (flag 0x800) no se modifican."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zipf:
info = zipfile.ZipInfo("archivo_normal.txt")
info.flag_bits |= 0x800 # marcar como UTF-8
zipf.writestr(info, b"content")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zipf:
original_name = zipf.infolist()[0].filename
normalize_zip_filenames(zipf)
assert zipf.infolist()[0].filename == original_name
def test_normalize_cjk_mojibake_repara():
"""ZIP con nombres CJK en mojibake (UTF-8 bytes leidos como CP437) se reparan.
Simula un ZIP donde los bytes del filename son UTF-8 valido de un nombre CJK,
pero el flag 0x800 no esta seteado, asi que zipfile los decodifica como CP437
produciendo mojibake. normalize_zip_filenames debe detectarlo y repararlo.
"""
cjk_name = "\u6587\u4ef6.txt" # 文件.txt
# Construir ZIP con bytes UTF-8 crudos en el campo filename, sin flag 0x800.
# Python no permite esto via ZipInfo (fuerza 0x800 para non-ASCII), por eso
# construimos el ZIP manualmente con _make_zip_with_raw_filename.
utf8_bytes = cjk_name.encode("utf-8")
zip_bytes = _make_zip_with_raw_filename(utf8_bytes, b"cjk content")
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zipf:
member = zipf.infolist()[0]
# Sin el flag, zipfile lee los bytes como CP437: debe ser mojibake
assert not (member.flag_bits & 0x800), "Flag 0x800 no deberia estar seteado"
assert member.filename != cjk_name, "El nombre aun no debe estar reparado"
normalize_zip_filenames(zipf)
repaired = zipf.infolist()[0].filename
has_cjk = any(
"\u4e00" <= c <= "\u9fff" or "\u3400" <= c <= "\u4dbf" for c in repaired
)
assert has_cjk, f"Esperaba CJK en nombre reparado, got: {repaired!r}"
if __name__ == "__main__":
test_zip_normal()
print("PASS: ZIP normal extrae correctamente dentro del destino")
test_zip_con_path_traversal()
print("PASS: ZIP con path traversal lanza ValueError")
test_zip_con_paths_absolutos()
print("PASS: ZIP con paths absolutos lanza ValueError")
test_normalize_utf8_correctos_no_cambian()
print("PASS: ZIP con nombres UTF-8 correctos no se modifican")
test_normalize_cjk_mojibake_repara()
print("PASS: ZIP con nombres CJK mojibake se reparan")
print("\nAll tests passed.")