9fd0ca9cac
Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json, http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory, setup_logger, normalize_zip_filenames. Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...), 6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
207 lines
7.0 KiB
Python
207 lines
7.0 KiB
Python
"""Tests para safe_extract_zip y normalize_zip_filenames."""
|
|
|
|
import io
|
|
import os
|
|
import struct
|
|
import tempfile
|
|
import zipfile
|
|
|
|
from safe_extract_zip import normalize_zip_filenames, safe_extract_zip
|
|
|
|
|
|
def _make_zip_with_raw_filename(raw_filename_bytes: bytes, content: bytes) -> bytes:
|
|
"""Crea un ZIP minimal con bytes de filename raw y sin flag 0x800.
|
|
|
|
Simula un ZIP creado en Windows donde el filename tiene bytes UTF-8
|
|
pero sin el flag de UTF-8 (0x800), causando que zipfile lo lea como CP437.
|
|
"""
|
|
crc = zipfile.crc32(content) & 0xFFFFFFFF
|
|
fname_len = len(raw_filename_bytes)
|
|
buf = io.BytesIO()
|
|
|
|
# Local file header
|
|
local_header = struct.pack(
|
|
"<4sHHHHHIIIHH",
|
|
b"PK\x03\x04", # signature
|
|
20, # version needed
|
|
0, # general purpose bit flag — sin 0x800
|
|
0, # compression: stored
|
|
0, # last mod time
|
|
0, # last mod date
|
|
crc,
|
|
len(content), # compressed size
|
|
len(content), # uncompressed size
|
|
fname_len,
|
|
0, # extra field length
|
|
)
|
|
buf.write(local_header)
|
|
buf.write(raw_filename_bytes)
|
|
buf.write(content)
|
|
|
|
# Central directory header
|
|
cd_offset = buf.tell()
|
|
cd_header = struct.pack(
|
|
"<4sHHHHHHIIIHHHHHII",
|
|
b"PK\x01\x02",
|
|
20, # version made by
|
|
20, # version needed
|
|
0, # flag — sin 0x800
|
|
0, # compression
|
|
0, # mod time
|
|
0, # mod date
|
|
crc,
|
|
len(content), # compressed size
|
|
len(content), # uncompressed size
|
|
fname_len,
|
|
0, # extra length
|
|
0, # comment length
|
|
0, # disk start
|
|
0, # internal attr
|
|
0, # external attr
|
|
0, # local header offset
|
|
)
|
|
buf.write(cd_header)
|
|
buf.write(raw_filename_bytes)
|
|
|
|
# End of central directory
|
|
eocd = struct.pack(
|
|
"<4sHHHHIIH",
|
|
b"PK\x05\x06",
|
|
0, 0, 1, 1,
|
|
len(cd_header) + fname_len,
|
|
cd_offset,
|
|
0,
|
|
)
|
|
buf.write(eocd)
|
|
return buf.getvalue()
|
|
|
|
|
|
def _make_zip(members: dict[str, bytes]) -> str:
|
|
"""Crea un ZIP temporal con los miembros dados {filename: content}."""
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
|
|
with zipfile.ZipFile(tmp, "w") as zipf:
|
|
for name, content in members.items():
|
|
zipf.writestr(name, content)
|
|
tmp.close()
|
|
return tmp.name
|
|
|
|
|
|
def _make_zip_with_traversal(traversal_name: str) -> str:
|
|
"""Crea un ZIP con un miembro cuyo nombre intenta path traversal."""
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w") as zipf:
|
|
info = zipfile.ZipInfo(traversal_name)
|
|
zipf.writestr(info, b"malicious content")
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
|
|
tmp.write(buf.getvalue())
|
|
tmp.close()
|
|
return tmp.name
|
|
|
|
|
|
def test_zip_normal():
|
|
"""ZIP normal extrae correctamente dentro del destino."""
|
|
zip_path = _make_zip({"hello.txt": b"hello world", "subdir/file.py": b"# code"})
|
|
try:
|
|
with tempfile.TemporaryDirectory() as dest:
|
|
safe_extract_zip(zip_path, dest)
|
|
assert os.path.isfile(os.path.join(dest, "hello.txt"))
|
|
assert os.path.isfile(os.path.join(dest, "subdir", "file.py"))
|
|
with open(os.path.join(dest, "hello.txt"), "rb") as f:
|
|
assert f.read() == b"hello world"
|
|
finally:
|
|
os.unlink(zip_path)
|
|
|
|
|
|
def test_zip_con_path_traversal():
|
|
"""ZIP con path traversal lanza ValueError."""
|
|
zip_path = _make_zip_with_traversal("../../etc/passwd")
|
|
try:
|
|
with tempfile.TemporaryDirectory() as dest:
|
|
raised = False
|
|
try:
|
|
safe_extract_zip(zip_path, dest)
|
|
except ValueError as e:
|
|
raised = True
|
|
assert "Zip Slip" in str(e)
|
|
assert raised, "Expected ValueError for path traversal"
|
|
finally:
|
|
os.unlink(zip_path)
|
|
|
|
|
|
def test_zip_con_paths_absolutos():
|
|
"""ZIP con paths absolutos lanza ValueError."""
|
|
zip_path = _make_zip_with_traversal("/etc/passwd")
|
|
try:
|
|
with tempfile.TemporaryDirectory() as dest:
|
|
raised = False
|
|
try:
|
|
safe_extract_zip(zip_path, dest)
|
|
except ValueError as e:
|
|
raised = True
|
|
assert "Zip Slip" in str(e)
|
|
assert raised, "Expected ValueError for absolute path"
|
|
finally:
|
|
os.unlink(zip_path)
|
|
|
|
|
|
def test_normalize_utf8_correctos_no_cambian():
|
|
"""ZIP con nombres UTF-8 correctos (flag 0x800) no se modifican."""
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w") as zipf:
|
|
info = zipfile.ZipInfo("archivo_normal.txt")
|
|
info.flag_bits |= 0x800 # marcar como UTF-8
|
|
zipf.writestr(info, b"content")
|
|
buf.seek(0)
|
|
with zipfile.ZipFile(buf, "r") as zipf:
|
|
original_name = zipf.infolist()[0].filename
|
|
normalize_zip_filenames(zipf)
|
|
assert zipf.infolist()[0].filename == original_name
|
|
|
|
|
|
def test_normalize_cjk_mojibake_repara():
|
|
"""ZIP con nombres CJK en mojibake (UTF-8 bytes leidos como CP437) se reparan.
|
|
|
|
Simula un ZIP donde los bytes del filename son UTF-8 valido de un nombre CJK,
|
|
pero el flag 0x800 no esta seteado, asi que zipfile los decodifica como CP437
|
|
produciendo mojibake. normalize_zip_filenames debe detectarlo y repararlo.
|
|
"""
|
|
cjk_name = "\u6587\u4ef6.txt" # 文件.txt
|
|
|
|
# Construir ZIP con bytes UTF-8 crudos en el campo filename, sin flag 0x800.
|
|
# Python no permite esto via ZipInfo (fuerza 0x800 para non-ASCII), por eso
|
|
# construimos el ZIP manualmente con _make_zip_with_raw_filename.
|
|
utf8_bytes = cjk_name.encode("utf-8")
|
|
zip_bytes = _make_zip_with_raw_filename(utf8_bytes, b"cjk content")
|
|
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zipf:
|
|
member = zipf.infolist()[0]
|
|
# Sin el flag, zipfile lee los bytes como CP437: debe ser mojibake
|
|
assert not (member.flag_bits & 0x800), "Flag 0x800 no deberia estar seteado"
|
|
assert member.filename != cjk_name, "El nombre aun no debe estar reparado"
|
|
|
|
normalize_zip_filenames(zipf)
|
|
repaired = zipf.infolist()[0].filename
|
|
has_cjk = any(
|
|
"\u4e00" <= c <= "\u9fff" or "\u3400" <= c <= "\u4dbf" for c in repaired
|
|
)
|
|
assert has_cjk, f"Esperaba CJK en nombre reparado, got: {repaired!r}"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_zip_normal()
|
|
print("PASS: ZIP normal extrae correctamente dentro del destino")
|
|
|
|
test_zip_con_path_traversal()
|
|
print("PASS: ZIP con path traversal lanza ValueError")
|
|
|
|
test_zip_con_paths_absolutos()
|
|
print("PASS: ZIP con paths absolutos lanza ValueError")
|
|
|
|
test_normalize_utf8_correctos_no_cambian()
|
|
print("PASS: ZIP con nombres UTF-8 correctos no se modifican")
|
|
|
|
test_normalize_cjk_mojibake_repara()
|
|
print("PASS: ZIP con nombres CJK mojibake se reparan")
|
|
|
|
print("\nAll tests passed.")
|