"""Tests para safe_extract_zip y normalize_zip_filenames.""" import io import os import struct import tempfile import zipfile from safe_extract_zip import normalize_zip_filenames, safe_extract_zip def _make_zip_with_raw_filename(raw_filename_bytes: bytes, content: bytes) -> bytes: """Crea un ZIP minimal con bytes de filename raw y sin flag 0x800. Simula un ZIP creado en Windows donde el filename tiene bytes UTF-8 pero sin el flag de UTF-8 (0x800), causando que zipfile lo lea como CP437. """ crc = zipfile.crc32(content) & 0xFFFFFFFF fname_len = len(raw_filename_bytes) buf = io.BytesIO() # Local file header local_header = struct.pack( "<4sHHHHHIIIHH", b"PK\x03\x04", # signature 20, # version needed 0, # general purpose bit flag — sin 0x800 0, # compression: stored 0, # last mod time 0, # last mod date crc, len(content), # compressed size len(content), # uncompressed size fname_len, 0, # extra field length ) buf.write(local_header) buf.write(raw_filename_bytes) buf.write(content) # Central directory header cd_offset = buf.tell() cd_header = struct.pack( "<4sHHHHHHIIIHHHHHII", b"PK\x01\x02", 20, # version made by 20, # version needed 0, # flag — sin 0x800 0, # compression 0, # mod time 0, # mod date crc, len(content), # compressed size len(content), # uncompressed size fname_len, 0, # extra length 0, # comment length 0, # disk start 0, # internal attr 0, # external attr 0, # local header offset ) buf.write(cd_header) buf.write(raw_filename_bytes) # End of central directory eocd = struct.pack( "<4sHHHHIIH", b"PK\x05\x06", 0, 0, 1, 1, len(cd_header) + fname_len, cd_offset, 0, ) buf.write(eocd) return buf.getvalue() def _make_zip(members: dict[str, bytes]) -> str: """Crea un ZIP temporal con los miembros dados {filename: content}.""" tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False) with zipfile.ZipFile(tmp, "w") as zipf: for name, content in members.items(): zipf.writestr(name, content) tmp.close() return tmp.name def _make_zip_with_traversal(traversal_name: str) -> str: """Crea un ZIP con un miembro cuyo nombre intenta path traversal.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zipf: info = zipfile.ZipInfo(traversal_name) zipf.writestr(info, b"malicious content") tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False) tmp.write(buf.getvalue()) tmp.close() return tmp.name def test_zip_normal(): """ZIP normal extrae correctamente dentro del destino.""" zip_path = _make_zip({"hello.txt": b"hello world", "subdir/file.py": b"# code"}) try: with tempfile.TemporaryDirectory() as dest: safe_extract_zip(zip_path, dest) assert os.path.isfile(os.path.join(dest, "hello.txt")) assert os.path.isfile(os.path.join(dest, "subdir", "file.py")) with open(os.path.join(dest, "hello.txt"), "rb") as f: assert f.read() == b"hello world" finally: os.unlink(zip_path) def test_zip_con_path_traversal(): """ZIP con path traversal lanza ValueError.""" zip_path = _make_zip_with_traversal("../../etc/passwd") try: with tempfile.TemporaryDirectory() as dest: raised = False try: safe_extract_zip(zip_path, dest) except ValueError as e: raised = True assert "Zip Slip" in str(e) assert raised, "Expected ValueError for path traversal" finally: os.unlink(zip_path) def test_zip_con_paths_absolutos(): """ZIP con paths absolutos lanza ValueError.""" zip_path = _make_zip_with_traversal("/etc/passwd") try: with tempfile.TemporaryDirectory() as dest: raised = False try: safe_extract_zip(zip_path, dest) except ValueError as e: raised = True assert "Zip Slip" in str(e) assert raised, "Expected ValueError for absolute path" finally: os.unlink(zip_path) def test_normalize_utf8_correctos_no_cambian(): """ZIP con nombres UTF-8 correctos (flag 0x800) no se modifican.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zipf: info = zipfile.ZipInfo("archivo_normal.txt") info.flag_bits |= 0x800 # marcar como UTF-8 zipf.writestr(info, b"content") buf.seek(0) with zipfile.ZipFile(buf, "r") as zipf: original_name = zipf.infolist()[0].filename normalize_zip_filenames(zipf) assert zipf.infolist()[0].filename == original_name def test_normalize_cjk_mojibake_repara(): """ZIP con nombres CJK en mojibake (UTF-8 bytes leidos como CP437) se reparan. Simula un ZIP donde los bytes del filename son UTF-8 valido de un nombre CJK, pero el flag 0x800 no esta seteado, asi que zipfile los decodifica como CP437 produciendo mojibake. normalize_zip_filenames debe detectarlo y repararlo. """ cjk_name = "\u6587\u4ef6.txt" # 文件.txt # Construir ZIP con bytes UTF-8 crudos en el campo filename, sin flag 0x800. # Python no permite esto via ZipInfo (fuerza 0x800 para non-ASCII), por eso # construimos el ZIP manualmente con _make_zip_with_raw_filename. utf8_bytes = cjk_name.encode("utf-8") zip_bytes = _make_zip_with_raw_filename(utf8_bytes, b"cjk content") with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zipf: member = zipf.infolist()[0] # Sin el flag, zipfile lee los bytes como CP437: debe ser mojibake assert not (member.flag_bits & 0x800), "Flag 0x800 no deberia estar seteado" assert member.filename != cjk_name, "El nombre aun no debe estar reparado" normalize_zip_filenames(zipf) repaired = zipf.infolist()[0].filename has_cjk = any( "\u4e00" <= c <= "\u9fff" or "\u3400" <= c <= "\u4dbf" for c in repaired ) assert has_cjk, f"Esperaba CJK en nombre reparado, got: {repaired!r}" if __name__ == "__main__": test_zip_normal() print("PASS: ZIP normal extrae correctamente dentro del destino") test_zip_con_path_traversal() print("PASS: ZIP con path traversal lanza ValueError") test_zip_con_paths_absolutos() print("PASS: ZIP con paths absolutos lanza ValueError") test_normalize_utf8_correctos_no_cambian() print("PASS: ZIP con nombres UTF-8 correctos no se modifican") test_normalize_cjk_mojibake_repara() print("PASS: ZIP con nombres CJK mojibake se reparan") print("\nAll tests passed.")