feat: funciones Python infra y tipos Python (core, datascience, infra)
Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json, http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory, setup_logger, normalize_zip_filenames. Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...), 6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,206 @@
|
||||
"""Tests para safe_extract_zip y normalize_zip_filenames."""
|
||||
|
||||
import io
|
||||
import os
|
||||
import struct
|
||||
import tempfile
|
||||
import zipfile
|
||||
|
||||
from safe_extract_zip import normalize_zip_filenames, safe_extract_zip
|
||||
|
||||
|
||||
def _make_zip_with_raw_filename(raw_filename_bytes: bytes, content: bytes) -> bytes:
|
||||
"""Crea un ZIP minimal con bytes de filename raw y sin flag 0x800.
|
||||
|
||||
Simula un ZIP creado en Windows donde el filename tiene bytes UTF-8
|
||||
pero sin el flag de UTF-8 (0x800), causando que zipfile lo lea como CP437.
|
||||
"""
|
||||
crc = zipfile.crc32(content) & 0xFFFFFFFF
|
||||
fname_len = len(raw_filename_bytes)
|
||||
buf = io.BytesIO()
|
||||
|
||||
# Local file header
|
||||
local_header = struct.pack(
|
||||
"<4sHHHHHIIIHH",
|
||||
b"PK\x03\x04", # signature
|
||||
20, # version needed
|
||||
0, # general purpose bit flag — sin 0x800
|
||||
0, # compression: stored
|
||||
0, # last mod time
|
||||
0, # last mod date
|
||||
crc,
|
||||
len(content), # compressed size
|
||||
len(content), # uncompressed size
|
||||
fname_len,
|
||||
0, # extra field length
|
||||
)
|
||||
buf.write(local_header)
|
||||
buf.write(raw_filename_bytes)
|
||||
buf.write(content)
|
||||
|
||||
# Central directory header
|
||||
cd_offset = buf.tell()
|
||||
cd_header = struct.pack(
|
||||
"<4sHHHHHHIIIHHHHHII",
|
||||
b"PK\x01\x02",
|
||||
20, # version made by
|
||||
20, # version needed
|
||||
0, # flag — sin 0x800
|
||||
0, # compression
|
||||
0, # mod time
|
||||
0, # mod date
|
||||
crc,
|
||||
len(content), # compressed size
|
||||
len(content), # uncompressed size
|
||||
fname_len,
|
||||
0, # extra length
|
||||
0, # comment length
|
||||
0, # disk start
|
||||
0, # internal attr
|
||||
0, # external attr
|
||||
0, # local header offset
|
||||
)
|
||||
buf.write(cd_header)
|
||||
buf.write(raw_filename_bytes)
|
||||
|
||||
# End of central directory
|
||||
eocd = struct.pack(
|
||||
"<4sHHHHIIH",
|
||||
b"PK\x05\x06",
|
||||
0, 0, 1, 1,
|
||||
len(cd_header) + fname_len,
|
||||
cd_offset,
|
||||
0,
|
||||
)
|
||||
buf.write(eocd)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
def _make_zip(members: dict[str, bytes]) -> str:
|
||||
"""Crea un ZIP temporal con los miembros dados {filename: content}."""
|
||||
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
|
||||
with zipfile.ZipFile(tmp, "w") as zipf:
|
||||
for name, content in members.items():
|
||||
zipf.writestr(name, content)
|
||||
tmp.close()
|
||||
return tmp.name
|
||||
|
||||
|
||||
def _make_zip_with_traversal(traversal_name: str) -> str:
|
||||
"""Crea un ZIP con un miembro cuyo nombre intenta path traversal."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w") as zipf:
|
||||
info = zipfile.ZipInfo(traversal_name)
|
||||
zipf.writestr(info, b"malicious content")
|
||||
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
|
||||
tmp.write(buf.getvalue())
|
||||
tmp.close()
|
||||
return tmp.name
|
||||
|
||||
|
||||
def test_zip_normal():
|
||||
"""ZIP normal extrae correctamente dentro del destino."""
|
||||
zip_path = _make_zip({"hello.txt": b"hello world", "subdir/file.py": b"# code"})
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as dest:
|
||||
safe_extract_zip(zip_path, dest)
|
||||
assert os.path.isfile(os.path.join(dest, "hello.txt"))
|
||||
assert os.path.isfile(os.path.join(dest, "subdir", "file.py"))
|
||||
with open(os.path.join(dest, "hello.txt"), "rb") as f:
|
||||
assert f.read() == b"hello world"
|
||||
finally:
|
||||
os.unlink(zip_path)
|
||||
|
||||
|
||||
def test_zip_con_path_traversal():
|
||||
"""ZIP con path traversal lanza ValueError."""
|
||||
zip_path = _make_zip_with_traversal("../../etc/passwd")
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as dest:
|
||||
raised = False
|
||||
try:
|
||||
safe_extract_zip(zip_path, dest)
|
||||
except ValueError as e:
|
||||
raised = True
|
||||
assert "Zip Slip" in str(e)
|
||||
assert raised, "Expected ValueError for path traversal"
|
||||
finally:
|
||||
os.unlink(zip_path)
|
||||
|
||||
|
||||
def test_zip_con_paths_absolutos():
|
||||
"""ZIP con paths absolutos lanza ValueError."""
|
||||
zip_path = _make_zip_with_traversal("/etc/passwd")
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as dest:
|
||||
raised = False
|
||||
try:
|
||||
safe_extract_zip(zip_path, dest)
|
||||
except ValueError as e:
|
||||
raised = True
|
||||
assert "Zip Slip" in str(e)
|
||||
assert raised, "Expected ValueError for absolute path"
|
||||
finally:
|
||||
os.unlink(zip_path)
|
||||
|
||||
|
||||
def test_normalize_utf8_correctos_no_cambian():
|
||||
"""ZIP con nombres UTF-8 correctos (flag 0x800) no se modifican."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w") as zipf:
|
||||
info = zipfile.ZipInfo("archivo_normal.txt")
|
||||
info.flag_bits |= 0x800 # marcar como UTF-8
|
||||
zipf.writestr(info, b"content")
|
||||
buf.seek(0)
|
||||
with zipfile.ZipFile(buf, "r") as zipf:
|
||||
original_name = zipf.infolist()[0].filename
|
||||
normalize_zip_filenames(zipf)
|
||||
assert zipf.infolist()[0].filename == original_name
|
||||
|
||||
|
||||
def test_normalize_cjk_mojibake_repara():
|
||||
"""ZIP con nombres CJK en mojibake (UTF-8 bytes leidos como CP437) se reparan.
|
||||
|
||||
Simula un ZIP donde los bytes del filename son UTF-8 valido de un nombre CJK,
|
||||
pero el flag 0x800 no esta seteado, asi que zipfile los decodifica como CP437
|
||||
produciendo mojibake. normalize_zip_filenames debe detectarlo y repararlo.
|
||||
"""
|
||||
cjk_name = "\u6587\u4ef6.txt" # 文件.txt
|
||||
|
||||
# Construir ZIP con bytes UTF-8 crudos en el campo filename, sin flag 0x800.
|
||||
# Python no permite esto via ZipInfo (fuerza 0x800 para non-ASCII), por eso
|
||||
# construimos el ZIP manualmente con _make_zip_with_raw_filename.
|
||||
utf8_bytes = cjk_name.encode("utf-8")
|
||||
zip_bytes = _make_zip_with_raw_filename(utf8_bytes, b"cjk content")
|
||||
|
||||
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zipf:
|
||||
member = zipf.infolist()[0]
|
||||
# Sin el flag, zipfile lee los bytes como CP437: debe ser mojibake
|
||||
assert not (member.flag_bits & 0x800), "Flag 0x800 no deberia estar seteado"
|
||||
assert member.filename != cjk_name, "El nombre aun no debe estar reparado"
|
||||
|
||||
normalize_zip_filenames(zipf)
|
||||
repaired = zipf.infolist()[0].filename
|
||||
has_cjk = any(
|
||||
"\u4e00" <= c <= "\u9fff" or "\u3400" <= c <= "\u4dbf" for c in repaired
|
||||
)
|
||||
assert has_cjk, f"Esperaba CJK en nombre reparado, got: {repaired!r}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_zip_normal()
|
||||
print("PASS: ZIP normal extrae correctamente dentro del destino")
|
||||
|
||||
test_zip_con_path_traversal()
|
||||
print("PASS: ZIP con path traversal lanza ValueError")
|
||||
|
||||
test_zip_con_paths_absolutos()
|
||||
print("PASS: ZIP con paths absolutos lanza ValueError")
|
||||
|
||||
test_normalize_utf8_correctos_no_cambian()
|
||||
print("PASS: ZIP con nombres UTF-8 correctos no se modifican")
|
||||
|
||||
test_normalize_cjk_mojibake_repara()
|
||||
print("PASS: ZIP con nombres CJK mojibake se reparan")
|
||||
|
||||
print("\nAll tests passed.")
|
||||
Reference in New Issue
Block a user