feat: funciones Python infra y tipos Python (core, datascience, infra)

Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json,
http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory,
setup_logger, normalize_zip_filenames.
Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...),
6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-05 17:11:43 +02:00
parent 837563c3ba
commit 5a324f6554
110 changed files with 5714 additions and 0 deletions
+6
View File
@@ -0,0 +1,6 @@
from .setup_logger import setup_logger, get_logger
__all__ = [
"setup_logger",
"get_logger",
]
+60
View File
@@ -0,0 +1,60 @@
---
name: cache_to_file
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def cache_to_file(cache_dir: str, namespace: str = 'default') -> FileCache"
description: "Cache key-value donde cada entry es un archivo JSON en disco. Keys se hashean con SHA-256 para generar nombres de archivo seguros. Metadata (ttl, created_at, original_key) en sidecar .meta. Mejor que SQLite para valores grandes (PDFs procesados, embeddings)."
tags: [cache, file, persistence, ttl, key-value, sha256]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["os", "json", "hashlib", "time", "threading"]
tested: true
tests:
- "Set y get basico"
- "TTL expirado → None"
- "Archivo .meta con metadata correcta"
- "Clear elimina el directorio del namespace"
- "Key con caracteres especiales → hash seguro"
test_file_path: "python/functions/infra/cache_to_file_test.py"
file_path: "python/functions/infra/cache_to_file.py"
---
## Ejemplo
```python
from infra.cache_to_file import cache_to_file
store = cache_to_file("/tmp/my_cache", namespace="embeddings")
# Almacenar un embedding grande
store.set("doc:123", embedding_vector, ttl=86400)
# Recuperar
vec = store.get("doc:123")
# Factory pattern
result = store.get_or_set(
"pdf:page_42",
factory=lambda: extract_pdf_text("doc.pdf", page=42),
ttl=0, # sin expiracion
)
```
## Estructura en disco
```
cache_dir/
namespace/
{sha256_key}.json # valor serializado como JSON
{sha256_key}.meta # {"created_at": ..., "expires_at": ..., "original_key": ...}
```
## Notas
Cada entry genera exactamente dos archivos: `.json` para el valor y `.meta` para la metadata. La key original se guarda en `.meta["original_key"]` para facilitar debugging. Thread-safe mediante `threading.Lock`. La eviction es lazy: se verifica expires_at al hacer `get`.
+135
View File
@@ -0,0 +1,135 @@
"""Cache key-value donde cada entry es un archivo JSON en disco."""
import hashlib
import json
import os
import threading
import time
class FileCache:
"""Cache key-value respaldado en archivos JSON, con metadata sidecar .meta."""
def __init__(self, cache_dir: str, namespace: str = "default") -> None:
self._base = os.path.join(cache_dir, namespace)
self._hits = 0
self._misses = 0
self._lock = threading.Lock()
os.makedirs(self._base, exist_ok=True)
def _hash_key(self, key: str) -> str:
return hashlib.sha256(key.encode("utf-8")).hexdigest()
def _value_path(self, hashed: str) -> str:
return os.path.join(self._base, f"{hashed}.json")
def _meta_path(self, hashed: str) -> str:
return os.path.join(self._base, f"{hashed}.meta")
def _is_expired(self, meta: dict) -> bool:
expires_at = meta.get("expires_at")
if expires_at is None:
return False
return time.time() >= expires_at
def _load_meta(self, hashed: str) -> dict | None:
path = self._meta_path(hashed)
if not os.path.exists(path):
return None
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def get(self, key: str) -> object:
"""Retorna el valor o None si no existe o esta expirado."""
hashed = self._hash_key(key)
with self._lock:
meta = self._load_meta(hashed)
if meta is None:
self._misses += 1
return None
if self._is_expired(meta):
self._delete_files(hashed)
self._misses += 1
return None
value_path = self._value_path(hashed)
if not os.path.exists(value_path):
self._misses += 1
return None
with open(value_path, "r", encoding="utf-8") as f:
self._hits += 1
return json.load(f)
def set(self, key: str, value: object, ttl: float = 0) -> None:
"""Almacena un valor. ttl en segundos; 0 = sin expiracion."""
hashed = self._hash_key(key)
now = time.time()
expires_at = (now + ttl) if ttl > 0 else None
meta = {"created_at": now, "expires_at": expires_at, "original_key": key}
with self._lock:
with open(self._value_path(hashed), "w", encoding="utf-8") as f:
json.dump(value, f)
with open(self._meta_path(hashed), "w", encoding="utf-8") as f:
json.dump(meta, f)
def _delete_files(self, hashed: str) -> bool:
vp = self._value_path(hashed)
mp = self._meta_path(hashed)
deleted = False
if os.path.exists(vp):
os.remove(vp)
deleted = True
if os.path.exists(mp):
os.remove(mp)
deleted = True
return deleted
def delete(self, key: str) -> bool:
"""Elimina una entrada. Retorna True si existia."""
hashed = self._hash_key(key)
with self._lock:
return self._delete_files(hashed)
def clear(self) -> int:
"""Elimina todas las entradas del namespace. Retorna pares eliminados."""
with self._lock:
count = 0
if not os.path.isdir(self._base):
return 0
for fname in os.listdir(self._base):
if fname.endswith(".json"):
count += 1
fpath = os.path.join(self._base, fname)
os.remove(fpath)
return count
def stats(self) -> dict:
"""Retorna estadisticas del store: hits, misses y size actual."""
with self._lock:
if not os.path.isdir(self._base):
size = 0
else:
size = sum(
1 for f in os.listdir(self._base) if f.endswith(".json")
)
return {"hits": self._hits, "misses": self._misses, "size": size}
def get_or_set(self, key: str, factory: callable, ttl: float = 0) -> object:
"""Retorna el valor cacheado o llama factory() y lo almacena."""
value = self.get(key)
if value is None:
value = factory()
self.set(key, value, ttl)
return value
def cache_to_file(cache_dir: str, namespace: str = "default") -> FileCache:
"""Crea un FileCache respaldado en archivos JSON en disco.
Args:
cache_dir: Directorio raiz donde se almacenan los archivos de cache.
namespace: Subdirectorio logico dentro de cache_dir.
Returns:
FileCache con metodos get/set/delete/clear/stats/get_or_set.
"""
return FileCache(cache_dir, namespace)
@@ -0,0 +1,54 @@
"""Tests para cache_to_file."""
import json
import os
import time
import pytest
from .cache_to_file import cache_to_file
@pytest.fixture
def store(tmp_path):
return cache_to_file(str(tmp_path))
def test_set_y_get_basico(store):
store.set("hello", {"x": 42})
assert store.get("hello") == {"x": 42}
def test_ttl_expirado_retorna_none(store):
store.set("temp", "val", ttl=0.05)
time.sleep(0.1)
assert store.get("temp") is None
def test_archivo_meta_con_metadata_correcta(tmp_path):
s = cache_to_file(str(tmp_path), "ns")
s.set("mykey", "myval", ttl=60)
ns_dir = os.path.join(str(tmp_path), "ns")
meta_files = [f for f in os.listdir(ns_dir) if f.endswith(".meta")]
assert len(meta_files) == 1
with open(os.path.join(ns_dir, meta_files[0])) as f:
meta = json.load(f)
assert meta["original_key"] == "mykey"
assert meta["expires_at"] is not None
assert meta["created_at"] > 0
def test_clear_elimina_directorio_del_namespace(tmp_path):
s = cache_to_file(str(tmp_path), "mynamespace")
s.set("a", 1)
s.set("b", 2)
removed = s.clear()
assert removed == 2
assert s.get("a") is None
assert s.get("b") is None
def test_key_con_caracteres_especiales_hash_seguro(store):
key = "https://example.com/path?q=1&r=2 <special>#hash"
store.set(key, "safe")
assert store.get(key) == "safe"
+57
View File
@@ -0,0 +1,57 @@
---
name: cache_to_sqlite
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def cache_to_sqlite(db_path: str, namespace: str = 'default') -> CacheStore"
description: "Cache key-value persistido en SQLite con TTL y lazy eviction. Cada namespace es un espacio logico dentro de la misma BD. Keys son strings, values se serializan con JSON. TTL en segundos, 0 = sin expiracion. Thread-safe mediante mutex."
tags: [cache, sqlite, persistence, ttl, memoize, key-value]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["sqlite3", "json", "time", "threading"]
tested: true
tests:
- "Set y get basico"
- "TTL expirado → None"
- "TTL 0 → nunca expira"
- "get_or_set con factory que solo se llama en miss"
- "Namespaces independientes"
- "Clear elimina solo el namespace"
- "Stats contadores correctos"
- "Concurrencia (threading basico)"
test_file_path: "python/functions/infra/cache_to_sqlite_test.py"
file_path: "python/functions/infra/cache_to_sqlite.py"
---
## Ejemplo
```python
from infra.cache_to_sqlite import cache_to_sqlite
store = cache_to_sqlite("my_cache.db", namespace="llm")
# Almacenar con TTL de 1 hora
store.set("prompt:explain_x", "explanation...", ttl=3600)
# Recuperar (None si miss o expirado)
val = store.get("prompt:explain_x")
# Factory pattern: solo computa si no esta en cache
result = store.get_or_set(
"prompt:explain_y",
factory=lambda: call_llm("explain y"),
ttl=3600,
)
# Estadisticas
print(store.stats()) # {"hits": 2, "misses": 1, "size": 5}
```
## Notas
La eviction de entradas expiradas es lazy: se ejecuta en cada llamada a `get` o `stats`, no en background. El schema SQLite usa `(namespace, key)` como PRIMARY KEY para garantizar upserts atomicos. Usa WAL mode para mejor concurrencia de lecturas. Cada thread mantiene su propia conexion SQLite (thread-local), sincronizada via `threading.Lock` para escrituras.
+142
View File
@@ -0,0 +1,142 @@
"""Cache key-value persistido en SQLite con TTL y lazy eviction."""
import json
import sqlite3
import threading
import time
class CacheStore:
"""Cache key-value respaldado en SQLite con soporte de TTL y namespaces."""
_schema = """
CREATE TABLE IF NOT EXISTS cache (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
created_at REAL NOT NULL,
expires_at REAL,
PRIMARY KEY (namespace, key)
);
"""
def __init__(self, db_path: str, namespace: str = "default") -> None:
self._db_path = db_path
self._namespace = namespace
self._hits = 0
self._misses = 0
self._lock = threading.Lock()
self._local = threading.local()
self._init_db()
def _conn(self) -> sqlite3.Connection:
"""Retorna una conexion SQLite thread-local."""
if not hasattr(self._local, "conn"):
conn = sqlite3.connect(self._db_path, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL")
self._local.conn = conn
return self._local.conn
def _init_db(self) -> None:
conn = self._conn()
conn.execute(self._schema)
conn.commit()
def _evict_expired(self, conn: sqlite3.Connection) -> None:
"""Elimina entradas expiradas del namespace actual (lazy eviction)."""
now = time.time()
conn.execute(
"DELETE FROM cache WHERE namespace = ? AND expires_at IS NOT NULL AND expires_at <= ?",
(self._namespace, now),
)
def get(self, key: str) -> object:
"""Retorna el valor o None si no existe o esta expirado."""
with self._lock:
conn = self._conn()
self._evict_expired(conn)
conn.commit()
row = conn.execute(
"SELECT value FROM cache WHERE namespace = ? AND key = ?",
(self._namespace, key),
).fetchone()
if row is None:
self._misses += 1
return None
self._hits += 1
return json.loads(row[0])
def set(self, key: str, value: object, ttl: float = 0) -> None:
"""Almacena un valor. ttl en segundos; 0 = sin expiracion."""
now = time.time()
expires_at = (now + ttl) if ttl > 0 else None
with self._lock:
conn = self._conn()
conn.execute(
"""
INSERT INTO cache (namespace, key, value, created_at, expires_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(namespace, key) DO UPDATE SET
value = excluded.value,
created_at = excluded.created_at,
expires_at = excluded.expires_at
""",
(self._namespace, key, json.dumps(value), now, expires_at),
)
conn.commit()
def delete(self, key: str) -> bool:
"""Elimina una entrada. Retorna True si existia."""
with self._lock:
conn = self._conn()
cursor = conn.execute(
"DELETE FROM cache WHERE namespace = ? AND key = ?",
(self._namespace, key),
)
conn.commit()
return cursor.rowcount > 0
def clear(self) -> int:
"""Elimina todas las entradas del namespace. Retorna filas eliminadas."""
with self._lock:
conn = self._conn()
cursor = conn.execute(
"DELETE FROM cache WHERE namespace = ?",
(self._namespace,),
)
conn.commit()
return cursor.rowcount
def stats(self) -> dict:
"""Retorna estadisticas del store: hits, misses y size actual."""
with self._lock:
conn = self._conn()
self._evict_expired(conn)
conn.commit()
row = conn.execute(
"SELECT COUNT(*) FROM cache WHERE namespace = ?",
(self._namespace,),
).fetchone()
size = row[0] if row else 0
return {"hits": self._hits, "misses": self._misses, "size": size}
def get_or_set(self, key: str, factory: callable, ttl: float = 0) -> object:
"""Retorna el valor cacheado o llama factory() y lo almacena."""
value = self.get(key)
if value is None:
value = factory()
self.set(key, value, ttl)
return value
def cache_to_sqlite(db_path: str, namespace: str = "default") -> CacheStore:
"""Crea un CacheStore respaldado en SQLite.
Args:
db_path: Ruta al archivo SQLite (se crea si no existe).
namespace: Espacio de nombres logico dentro de la base de datos.
Returns:
CacheStore con metodos get/set/delete/clear/stats/get_or_set.
"""
return CacheStore(db_path, namespace)
@@ -0,0 +1,114 @@
"""Tests para cache_to_sqlite."""
import os
import tempfile
import threading
import time
import pytest
from .cache_to_sqlite import cache_to_sqlite
@pytest.fixture
def store(tmp_path):
db = str(tmp_path / "test.db")
return cache_to_sqlite(db)
@pytest.fixture
def store2(tmp_path):
"""Segundo namespace en la misma BD."""
db = str(tmp_path / "test.db")
return cache_to_sqlite(db, namespace="other")
@pytest.fixture
def store_and_other(tmp_path):
db = str(tmp_path / "test.db")
s1 = cache_to_sqlite(db, namespace="ns1")
s2 = cache_to_sqlite(db, namespace="ns2")
return s1, s2
def test_set_y_get_basico(store):
store.set("foo", {"x": 1})
assert store.get("foo") == {"x": 1}
def test_ttl_expirado_retorna_none(store):
store.set("expiring", "hello", ttl=0.05)
time.sleep(0.1)
assert store.get("expiring") is None
def test_ttl_cero_nunca_expira(store):
store.set("forever", 42, ttl=0)
time.sleep(0.05)
assert store.get("forever") == 42
def test_get_or_set_factory_solo_se_llama_en_miss(store):
calls = []
def factory():
calls.append(1)
return "computed"
result1 = store.get_or_set("key", factory, ttl=10)
result2 = store.get_or_set("key", factory, ttl=10)
assert result1 == "computed"
assert result2 == "computed"
assert len(calls) == 1
def test_namespaces_independientes(store_and_other):
s1, s2 = store_and_other
s1.set("k", "from_ns1")
assert s2.get("k") is None
s2.set("k", "from_ns2")
assert s1.get("k") == "from_ns1"
assert s2.get("k") == "from_ns2"
def test_clear_elimina_solo_el_namespace(store_and_other):
s1, s2 = store_and_other
s1.set("a", 1)
s2.set("b", 2)
removed = s1.clear()
assert removed == 1
assert s1.get("a") is None
assert s2.get("b") == 2
def test_stats_contadores_correctos(store):
store.set("x", 10)
store.get("x") # hit
store.get("x") # hit
store.get("z") # miss
s = store.stats()
assert s["hits"] == 2
assert s["misses"] == 1
assert s["size"] == 1
def test_concurrencia(tmp_path):
db = str(tmp_path / "concurrent.db")
s = cache_to_sqlite(db, "parallel")
errors = []
def worker(i):
try:
s.set(f"key_{i}", i)
val = s.get(f"key_{i}")
assert val == i
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(20)]
for t in threads:
t.start()
for t in threads:
t.join()
assert errors == [], f"Errors in threads: {errors}"
+36
View File
@@ -0,0 +1,36 @@
---
name: get_logger
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def get_logger(name: str = 'app') -> logging.Logger"
description: "Devuelve un logger existente si ya tiene handlers, o lo crea con setup_logger. Util en modulos internos que no controlan la inicializacion del logger."
tags: [logging, logger, infra, utility]
uses_functions: [setup_logger_py_infra]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [logging]
tested: true
tests:
- "get_logger retorna logger configurado"
test_file_path: "python/functions/infra/setup_logger_test.py"
file_path: "python/functions/infra/setup_logger.py"
---
## Ejemplo
```python
from setup_logger import get_logger
# En cualquier modulo, sin preocuparse de si el logger ya fue inicializado
log = get_logger("mi_app")
log.info("Mensaje desde un modulo interno")
```
## Notas
Companion de `setup_logger`. Si el logger tiene handlers (ya fue configurado), lo devuelve tal cual. Si no, llama a `setup_logger` con valores por defecto (log_dir="logs"). Comparten el mismo archivo de implementacion.
@@ -0,0 +1,40 @@
---
name: http_download_file
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "http_download_file(url: str, dest_path: str, headers: dict[str, str] | None = None, timeout: float = 120.0, chunk_size: int = 8192) -> dict"
description: "Descarga un archivo por HTTP en streaming (sin cargar todo en memoria). Crea directorios intermedios si no existen. Retorna dict con path, size_bytes y content_type."
tags: [http, download, file, streaming, network, stdlib, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["os", "urllib.error", "urllib.request"]
tested: true
tests:
- "mock de descarga con contenido binario"
- "directorio destino creado automaticamente"
- "retorno con size correcto"
- "timeout configurado en el request"
test_file_path: "python/functions/infra/http_download_file_test.py"
file_path: "python/functions/infra/http_download_file.py"
---
## Ejemplo
```python
result = http_download_file(
"https://example.com/report.pdf",
dest_path="/tmp/reports/report.pdf",
timeout=60.0,
)
print(f"Downloaded {result['size_bytes']} bytes to {result['path']}")
```
## Notas
Solo usa stdlib (urllib, os). La descarga se hace en chunks de `chunk_size` bytes para evitar consumo de memoria con archivos grandes. El timeout de 120s por defecto es mayor que http_get_json porque los archivos pueden ser pesados. Los directorios intermedios se crean con os.makedirs(exist_ok=True).
@@ -0,0 +1,60 @@
"""Descarga de archivos en streaming — HTTP client sin dependencias externas."""
import os
import urllib.error
import urllib.request
def http_download_file(
url: str,
dest_path: str,
headers: dict[str, str] | None = None,
timeout: float = 120.0,
chunk_size: int = 8192,
) -> dict:
"""Descarga un archivo por HTTP en streaming (sin cargar todo en memoria).
Crea los directorios intermedios si no existen. Si el archivo destino
ya existe lo sobreescribe. La descarga se hace en chunks para evitar
consumo de memoria excesivo con archivos grandes.
Args:
url: URL del archivo a descargar.
dest_path: Ruta local destino donde guardar el archivo.
headers: Headers HTTP adicionales.
timeout: Segundos maximo de espera para la conexion (default 120).
chunk_size: Tamano de cada chunk en bytes (default 8192).
Returns:
dict con campos ``path`` (str), ``size_bytes`` (int) y
``content_type`` (str).
Raises:
RuntimeError: Si el status HTTP es >= 400.
"""
req = urllib.request.Request(url, headers=headers or {}, method="GET")
os.makedirs(os.path.dirname(os.path.abspath(dest_path)), exist_ok=True)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
content_type: str = resp.headers.get("Content-Type", "")
size_bytes = 0
with open(dest_path, "wb") as f:
while True:
chunk = resp.read(chunk_size)
if not chunk:
break
f.write(chunk)
size_bytes += len(chunk)
except urllib.error.HTTPError as e:
short_url = url[:100] if len(url) > 100 else url
raise RuntimeError(
f"http_download_file: HTTP {e.code} at {short_url!r}"
) from e
return {
"path": dest_path,
"size_bytes": size_bytes,
"content_type": content_type,
}
@@ -0,0 +1,84 @@
"""Tests para http_download_file."""
import sys
import tempfile
import os
import unittest
from unittest.mock import MagicMock, patch
sys.path.insert(0, "/home/lucas/fn_registry/python/functions")
from infra.http_download_file import http_download_file
def _make_response(content: bytes, content_type: str = "application/octet-stream"):
resp = MagicMock()
# Simula lectura en chunks
chunks = [content[i:i+8192] for i in range(0, len(content), 8192)] + [b""]
resp.read.side_effect = chunks
resp.headers = {"Content-Type": content_type}
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
class TestHttpDownloadFile(unittest.TestCase):
def test_mock_descarga_con_contenido_binario(self):
content = b"\x00\x01\x02\x03" * 100
mock_resp = _make_response(content, "application/octet-stream")
with tempfile.TemporaryDirectory() as tmpdir:
dest = os.path.join(tmpdir, "file.bin")
with patch("urllib.request.urlopen", return_value=mock_resp):
result = http_download_file("http://example.com/file.bin", dest)
self.assertEqual(result["size_bytes"], len(content))
self.assertEqual(result["path"], dest)
with open(dest, "rb") as f:
self.assertEqual(f.read(), content)
def test_directorio_destino_creado_automaticamente(self):
content = b"hello binary"
mock_resp = _make_response(content)
with tempfile.TemporaryDirectory() as tmpdir:
dest = os.path.join(tmpdir, "nested", "deep", "file.bin")
self.assertFalse(os.path.exists(os.path.dirname(dest)))
with patch("urllib.request.urlopen", return_value=mock_resp):
http_download_file("http://example.com/file.bin", dest)
self.assertTrue(os.path.exists(dest))
def test_retorno_con_size_correcto(self):
content = b"x" * 5000
mock_resp = _make_response(content, "text/plain")
with tempfile.TemporaryDirectory() as tmpdir:
dest = os.path.join(tmpdir, "out.txt")
with patch("urllib.request.urlopen", return_value=mock_resp):
result = http_download_file("http://example.com/data.txt", dest)
self.assertEqual(result["size_bytes"], 5000)
self.assertEqual(result["content_type"], "text/plain")
def test_timeout_configurado_en_el_request(self):
content = b"data"
mock_resp = _make_response(content)
captured_timeout = []
def fake_urlopen(req, timeout=None):
captured_timeout.append(timeout)
return mock_resp
with tempfile.TemporaryDirectory() as tmpdir:
dest = os.path.join(tmpdir, "file.bin")
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
http_download_file("http://example.com/file.bin", dest, timeout=60.0)
self.assertEqual(captured_timeout[0], 60.0)
if __name__ == "__main__":
unittest.main()
+41
View File
@@ -0,0 +1,41 @@
---
name: http_get_json
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "http_get_json(url: str, headers: dict[str, str] | None = None, params: dict[str, str] | None = None, timeout: float = 30.0) -> dict"
description: "GET request que espera JSON. Agrega Accept: application/json automaticamente. Lanza RuntimeError si status >= 400 con status code, url truncada y primeros 200 chars del body."
tags: [http, json, get, client, network, stdlib, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["json", "urllib.error", "urllib.parse", "urllib.request"]
tested: true
tests:
- "mock de respuesta 200 con JSON"
- "mock de respuesta 404 → error con status code"
- "mock de respuesta con JSON invalido → error descriptivo"
- "params serializados como query string"
- "headers custom enviados"
test_file_path: "python/functions/infra/http_get_json_test.py"
file_path: "python/functions/infra/http_get_json.py"
---
## Ejemplo
```python
data = http_get_json(
"https://api.example.com/users",
params={"page": "1", "limit": "50"},
headers={"X-Api-Key": "secret"},
)
print(data["total"])
```
## Notas
Solo usa stdlib (urllib). Sin dependencias externas. El error incluye los primeros 200 chars del body para facilitar debugging en produccion. Params se serializa con urlencode antes de concatenar a la URL.
+58
View File
@@ -0,0 +1,58 @@
"""GET request JSON — HTTP client sin dependencias externas."""
import json
import urllib.error
import urllib.parse
import urllib.request
def http_get_json(
url: str,
headers: dict[str, str] | None = None,
params: dict[str, str] | None = None,
timeout: float = 30.0,
) -> dict:
"""Realiza un GET request y parsea la respuesta como JSON.
Agrega automaticamente el header ``Accept: application/json``.
Si el status es >= 400 lanza RuntimeError con status code, url y
los primeros 200 caracteres del body para facilitar el debugging.
Args:
url: URL del endpoint.
headers: Headers HTTP adicionales. Se fusionan con Accept por defecto.
params: Query string params. Se serializa con urllib.parse.urlencode.
timeout: Segundos maximo de espera (default 30).
Returns:
Respuesta parseada como dict o list.
Raises:
RuntimeError: Si status >= 400 o si el body no es JSON valido.
"""
if params:
url = f"{url}?{urllib.parse.urlencode(params)}"
all_headers: dict[str, str] = {"Accept": "application/json"}
if headers:
all_headers.update(headers)
req = urllib.request.Request(url, headers=all_headers, method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
except urllib.error.HTTPError as e:
body_preview = e.read(200).decode("utf-8", errors="replace")
short_url = url[:100] if len(url) > 100 else url
raise RuntimeError(
f"http_get_json: HTTP {e.code} at {short_url!r}{body_preview}"
) from e
try:
return json.loads(raw)
except json.JSONDecodeError as e:
preview = raw[:200].decode("utf-8", errors="replace")
raise RuntimeError(
f"http_get_json: response is not valid JSON — {preview}"
) from e
@@ -0,0 +1,87 @@
"""Tests para http_get_json."""
import json
import sys
import unittest
import urllib.error
import urllib.request
from io import BytesIO
from unittest.mock import MagicMock, patch
sys.path.insert(0, "/home/lucas/fn_registry/python/functions")
from infra.http_get_json import http_get_json
def _make_response(data: bytes, status: int = 200, content_type: str = "application/json"):
"""Crea un mock de HTTPResponse."""
resp = MagicMock()
resp.read.return_value = data
resp.status = status
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
class TestHttpGetJson(unittest.TestCase):
def test_mock_respuesta_200_con_json(self):
payload = {"ok": True, "value": 42}
mock_resp = _make_response(json.dumps(payload).encode())
with patch("urllib.request.urlopen", return_value=mock_resp):
result = http_get_json("http://example.com/api")
self.assertEqual(result, payload)
def test_mock_respuesta_404_error_con_status_code(self):
err = urllib.error.HTTPError(
url="http://example.com/missing",
code=404,
msg="Not Found",
hdrs=None, # type: ignore[arg-type]
fp=BytesIO(b"not found"),
)
with patch("urllib.request.urlopen", side_effect=err):
with self.assertRaises(RuntimeError) as ctx:
http_get_json("http://example.com/missing")
self.assertIn("404", str(ctx.exception))
def test_mock_respuesta_json_invalido_error_descriptivo(self):
mock_resp = _make_response(b"not-json!!!")
with patch("urllib.request.urlopen", return_value=mock_resp):
with self.assertRaises(RuntimeError) as ctx:
http_get_json("http://example.com/api")
self.assertIn("not valid JSON", str(ctx.exception))
def test_params_serializados_como_query_string(self):
captured_url = []
def fake_urlopen(req, timeout=None):
captured_url.append(req.full_url)
return _make_response(b"{}")
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
http_get_json("http://example.com/api", params={"page": "1", "limit": "10"})
url = captured_url[0]
self.assertIn("page=1", url)
self.assertIn("limit=10", url)
def test_headers_custom_enviados(self):
captured_headers = []
def fake_urlopen(req, timeout=None):
captured_headers.append(dict(req.headers))
return _make_response(b'{"x": 1}')
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
http_get_json("http://example.com/api", headers={"X-Api-Key": "secret"})
# urllib capitaliza el primer caracter de cada header
headers_lower = {k.lower(): v for k, v in captured_headers[0].items()}
self.assertIn("x-api-key", headers_lower)
self.assertEqual(headers_lower["x-api-key"], "secret")
self.assertIn("accept", headers_lower)
if __name__ == "__main__":
unittest.main()
+40
View File
@@ -0,0 +1,40 @@
---
name: http_post_json
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "http_post_json(url: str, body: dict, headers: dict[str, str] | None = None, timeout: float = 30.0) -> dict"
description: "POST request con body JSON. Agrega Content-Type: application/json y Accept: application/json. Lanza RuntimeError si status >= 400 con status code, url truncada y primeros 200 chars del body."
tags: [http, json, post, client, network, stdlib, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["json", "urllib.error", "urllib.request"]
tested: true
tests:
- "mock de POST con body serializado correctamente"
- "mock de respuesta 201"
- "mock de respuesta 500 → error"
- "body con unicode"
test_file_path: "python/functions/infra/http_post_json_test.py"
file_path: "python/functions/infra/http_post_json.py"
---
## Ejemplo
```python
response = http_post_json(
"https://api.example.com/users",
body={"name": "Alice", "role": "admin"},
headers={"X-Api-Key": "secret"},
)
print(response["id"])
```
## Notas
Solo usa stdlib (urllib). El body se serializa con json.dumps(ensure_ascii=False) y se codifica a UTF-8. Headers custom se fusionan con Content-Type y Accept por defecto (los custom tienen precedencia).
+58
View File
@@ -0,0 +1,58 @@
"""POST request JSON — HTTP client sin dependencias externas."""
import json
import urllib.error
import urllib.request
def http_post_json(
url: str,
body: dict,
headers: dict[str, str] | None = None,
timeout: float = 30.0,
) -> dict:
"""Realiza un POST request con body JSON y parsea la respuesta como JSON.
Agrega automaticamente ``Content-Type: application/json`` y
``Accept: application/json``. Si el status es >= 400 lanza RuntimeError
con status code, url y los primeros 200 caracteres del body.
Args:
url: URL del endpoint.
body: Datos a serializar como JSON en el cuerpo del request.
headers: Headers HTTP adicionales. Se fusionan con los defaults.
timeout: Segundos maximo de espera (default 30).
Returns:
Respuesta parseada como dict o list.
Raises:
RuntimeError: Si status >= 400 o si el body de respuesta no es JSON valido.
"""
all_headers: dict[str, str] = {
"Content-Type": "application/json",
"Accept": "application/json",
}
if headers:
all_headers.update(headers)
data = json.dumps(body, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(url, data=data, headers=all_headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
except urllib.error.HTTPError as e:
body_preview = e.read(200).decode("utf-8", errors="replace")
short_url = url[:100] if len(url) > 100 else url
raise RuntimeError(
f"http_post_json: HTTP {e.code} at {short_url!r}{body_preview}"
) from e
try:
return json.loads(raw)
except json.JSONDecodeError as e:
preview = raw[:200].decode("utf-8", errors="replace")
raise RuntimeError(
f"http_post_json: response is not valid JSON — {preview}"
) from e
@@ -0,0 +1,76 @@
"""Tests para http_post_json."""
import json
import sys
import unittest
import urllib.error
from io import BytesIO
from unittest.mock import MagicMock, patch
sys.path.insert(0, "/home/lucas/fn_registry/python/functions")
from infra.http_post_json import http_post_json
def _make_response(data: bytes, status: int = 200):
resp = MagicMock()
resp.read.return_value = data
resp.status = status
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
class TestHttpPostJson(unittest.TestCase):
def test_mock_post_body_serializado_correctamente(self):
captured = []
def fake_urlopen(req, timeout=None):
captured.append(req.data)
return _make_response(b'{"created": true}')
body = {"name": "test", "value": 99}
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
http_post_json("http://example.com/api", body)
sent = json.loads(captured[0])
self.assertEqual(sent["name"], "test")
self.assertEqual(sent["value"], 99)
def test_mock_respuesta_201(self):
mock_resp = _make_response(b'{"id": 1}', status=201)
with patch("urllib.request.urlopen", return_value=mock_resp):
result = http_post_json("http://example.com/api", {"x": 1})
self.assertEqual(result, {"id": 1})
def test_mock_respuesta_500_error(self):
err = urllib.error.HTTPError(
url="http://example.com/api",
code=500,
msg="Internal Server Error",
hdrs=None, # type: ignore[arg-type]
fp=BytesIO(b"server error details"),
)
with patch("urllib.request.urlopen", side_effect=err):
with self.assertRaises(RuntimeError) as ctx:
http_post_json("http://example.com/api", {"x": 1})
self.assertIn("500", str(ctx.exception))
def test_body_con_unicode(self):
captured = []
def fake_urlopen(req, timeout=None):
captured.append(req.data)
return _make_response(b'{"ok": true}')
body = {"mensaje": "Hola mundo \u00e9\u00e0\u00fc \U0001f600"}
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
http_post_json("http://example.com/api", body)
decoded = json.loads(captured[0].decode("utf-8"))
self.assertEqual(decoded["mensaje"], body["mensaje"])
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,49 @@
---
name: normalize_zip_filenames
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def normalize_zip_filenames(zipf: zipfile.ZipFile) -> None"
description: "Repara nombres de archivos UTF-8 en ZIPs que no tienen el flag UTF-8 seteado (0x800). Comun en archivos creados en Windows con nombres CJK (chino, japones, coreano). Detecta mojibake comparando rangos Unicode y recodifica CP437 -> UTF-8."
tags: [zip, encoding, utf-8, cjk, mojibake, normalize, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [zipfile]
tested: true
tests:
- "ZIP con nombres UTF-8 correctos no se modifican"
- "ZIP con nombres CJK mojibake se reparan"
test_file_path: "python/functions/infra/safe_extract_zip_test.py"
file_path: "python/functions/infra/safe_extract_zip.py"
---
## Ejemplo
```python
import zipfile
from normalize_zip_filenames import normalize_zip_filenames
with zipfile.ZipFile("archivo_windows.zip", "r") as zipf:
normalize_zip_filenames(zipf)
for info in zipf.infolist():
print(info.filename) # nombres CJK correctos
```
## Notas
Funcion impure: modifica los `ZipInfo` del objeto ZipFile in-place.
El flag `0x800` en `flag_bits` indica que el filename ya esta codificado en UTF-8 segun la especificacion PKZip. Si esta seteado, el nombre no se toca.
Deteccion de CJK: rangos `\u3400-\u4dbf`, `\u4e00-\u9fff`, `\u3000-\u303f`, `\uff00-\uffef`.
Deteccion de mojibake: rangos Greek (`\u0370-\u03ff`), Math (`\u2200-\u22ff`), Box Drawing (`\u2500-\u257f`). Estos caracteres aparecen cuando bytes UTF-8 se interpretan como CP437.
Si se reparo algun nombre, se setea `zipf.metadata_encoding = "utf-8"`.
El codigo fuente de ambas funciones vive en `safe_extract_zip.py`.
@@ -0,0 +1,45 @@
---
name: read_file_with_encoding
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "read_file_with_encoding(path: str, encodings: list[str] | None = None) -> str"
description: "Lee un archivo de texto intentando multiples encodings en orden hasta encontrar uno que funcione. Util para archivos de origen desconocido (Windows, Latin-1, con BOM, etc.)."
tags: [file, encoding, io, text, utf8, latin1, cp1252, decode]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests:
- "archivo utf-8 valido"
- "archivo utf-8 con BOM eliminado con utf-8-sig"
- "archivo latin-1"
- "archivo binario falla con ValueError"
- "encodings personalizados"
- "archivo no existe lanza FileNotFoundError"
test_file_path: "python/functions/infra/read_file_with_encoding_test.py"
file_path: "python/functions/infra/read_file_with_encoding.py"
---
## Ejemplo
```python
# Leer archivo de origen desconocido
content = read_file_with_encoding("/tmp/datos.csv")
# Leer archivo Windows con BOM explicitamente
content = read_file_with_encoding("/tmp/report.txt", encodings=["utf-8-sig", "cp1252"])
```
## Notas
Los encodings por defecto son `["utf-8", "utf-8-sig", "latin-1", "cp1252"]`. El orden importa: `utf-8` se intenta primero porque es el mas comun. Si el archivo tiene BOM y se quiere que sea eliminado automaticamente, pasar `encodings=["utf-8-sig"]` o anteponerlo a `utf-8` en la lista personalizada.
`latin-1` nunca lanza `UnicodeDecodeError` porque mapea todos los bytes 0x00-0xFF, por lo que actua como fallback universal. Si `latin-1` es el ultimo encoding y falla con `cp1252` tambien, solo un archivo binario puro (sin mapeo posible) disparara el `ValueError`.
Raises `FileNotFoundError` u `OSError` nativas si el archivo no existe o hay error de I/O — estos no se envuelven en `ValueError`.
@@ -0,0 +1,45 @@
"""Lee un archivo de texto intentando multiples encodings en orden."""
from __future__ import annotations
def read_file_with_encoding(
path: str,
encodings: list[str] | None = None,
) -> str:
"""Lee un archivo de texto intentando multiples encodings en orden.
Intenta abrir el archivo con cada encoding de la lista hasta que
uno tenga exito. Util para archivos de origen desconocido (Windows,
Latin-1, archivos con BOM, etc.).
Args:
path: Ruta al archivo a leer.
encodings: Lista de encodings a intentar en orden. Por defecto
["utf-8", "utf-8-sig", "latin-1", "cp1252"].
Returns:
Contenido del archivo como string.
Raises:
ValueError: Si ningun encoding logra decodificar el archivo.
FileNotFoundError: Si el archivo no existe.
OSError: Si hay un error de I/O al abrir el archivo.
"""
if encodings is None:
encodings = ["utf-8", "utf-8-sig", "latin-1", "cp1252"]
last_error: UnicodeDecodeError | None = None
for encoding in encodings:
try:
with open(path, encoding=encoding) as fh:
return fh.read()
except UnicodeDecodeError as exc:
last_error = exc
continue
raise ValueError(
f"Unable to decode file '{path}' with encodings {encodings}. "
f"Last error: {last_error}"
)
@@ -0,0 +1,81 @@
"""Tests para read_file_with_encoding."""
import os
import sys
import tempfile
from pathlib import Path
import pytest
_HERE = Path(__file__).parent
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
from read_file_with_encoding import read_file_with_encoding # noqa: E402
def _write_bytes(content: bytes) -> str:
"""Escribe bytes a un archivo temporal y retorna su path."""
fd, path = tempfile.mkstemp()
try:
os.write(fd, content)
finally:
os.close(fd)
return path
def test_archivo_utf8():
texto = "Hola mundo con acentos: áéíóú"
path = _write_bytes(texto.encode("utf-8"))
try:
result = read_file_with_encoding(path)
assert result == texto
finally:
os.unlink(path)
def test_archivo_utf8_con_bom():
texto = "Contenido con BOM"
path = _write_bytes(texto.encode("utf-8-sig"))
try:
# Usando utf-8-sig explicitamente para que el BOM sea eliminado
result = read_file_with_encoding(path, encodings=["utf-8-sig"])
assert result == texto
finally:
os.unlink(path)
def test_archivo_latin1():
texto = "Texto en Latin-1: café"
path = _write_bytes(texto.encode("latin-1"))
try:
result = read_file_with_encoding(path)
assert result == texto
finally:
os.unlink(path)
def test_archivo_binario_falla():
# Bytes que no son validos en ningun encoding de texto comun
path = _write_bytes(bytes([0x80, 0x81, 0x82, 0x83, 0xFF, 0xFE, 0x00, 0x01]))
try:
with pytest.raises(ValueError, match="Unable to decode file"):
# Forzar solo encodings estrictos para que falle con binario puro
read_file_with_encoding(path, encodings=["utf-8", "utf-8-sig"])
finally:
os.unlink(path)
def test_encodings_personalizados():
texto = "Windows text: Ñoño"
path = _write_bytes(texto.encode("cp1252"))
try:
result = read_file_with_encoding(path, encodings=["cp1252"])
assert result == texto
finally:
os.unlink(path)
def test_archivo_no_existe():
with pytest.raises(FileNotFoundError):
read_file_with_encoding("/tmp/archivo_que_no_existe_12345.txt")
@@ -0,0 +1,46 @@
---
name: safe_extract_zip
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def safe_extract_zip(zip_path: str, dest_dir: str) -> None"
description: "Extrae un archivo ZIP con proteccion contra Zip Slip (path traversal attack). Valida que cada archivo extraido quede dentro del directorio destino antes de extraerlo. Normaliza nombres de archivo UTF-8 antes de extraer."
tags: [zip, extract, security, zip-slip, path-traversal, infra, io]
uses_functions: [normalize_zip_filenames_py_infra]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [os, zipfile, pathlib]
tested: true
tests:
- "ZIP normal extrae correctamente dentro del destino"
- "ZIP con path traversal lanza ValueError"
- "ZIP con paths absolutos lanza ValueError"
test_file_path: "python/functions/infra/safe_extract_zip_test.py"
file_path: "python/functions/infra/safe_extract_zip.py"
---
## Ejemplo
```python
from safe_extract_zip import safe_extract_zip
# Extraccion segura
try:
safe_extract_zip("archive.zip", "/tmp/output")
except ValueError as e:
print(f"Zip Slip bloqueado: {e}")
except zipfile.BadZipFile:
print("Archivo ZIP invalido")
```
## Notas
Funcion impura: escribe archivos en disco.
La proteccion contra Zip Slip consiste en resolver el path absoluto de cada miembro antes de extraerlo y verificar que empiece con `str(dest_dir) + os.sep`. Esto bloquea tanto `../../etc/passwd` como `/etc/passwd`.
La normalizacion de nombres UTF-8 se delega a `normalize_zip_filenames` y se ejecuta antes de la validacion de paths.
@@ -0,0 +1,80 @@
"""Safe ZIP extraction with Zip Slip protection and filename normalization."""
import os
import zipfile
from pathlib import Path
def normalize_zip_filenames(zipf: zipfile.ZipFile) -> None:
"""Repara nombres de archivos UTF-8 en ZIPs sin el flag UTF-8 seteado.
Args:
zipf: Objeto ZipFile abierto en modo lectura.
Returns:
None. Modifica los infolist del ZipFile in-place.
"""
def _is_cjk(s: str) -> bool:
return any(
"\u3400" <= c <= "\u4dbf"
or "\u4e00" <= c <= "\u9fff"
or "\u3000" <= c <= "\u303f"
or "\uff00" <= c <= "\uffef"
for c in s
)
def _is_mojibake(s: str) -> bool:
return any(
"\u0370" <= c <= "\u03ff" # Greek
or "\u2200" <= c <= "\u22ff" # Math
or "\u2500" <= c <= "\u257f" # Box Drawing
for c in s
)
repaired = False
for info in zipf.infolist():
# Flag 0x800 indica que el filename ya esta en UTF-8
if info.flag_bits & 0x800:
continue
try:
repaired_name = info.filename.encode("cp437").decode("utf-8")
if _is_cjk(repaired_name) and _is_mojibake(info.filename):
info.filename = repaired_name
repaired = True
except (UnicodeEncodeError, UnicodeDecodeError):
pass
if repaired:
zipf.metadata_encoding = "utf-8"
def safe_extract_zip(zip_path: str, dest_dir: str) -> None:
"""Extrae un archivo ZIP con proteccion contra Zip Slip (path traversal).
Valida que cada archivo extraido quede dentro del directorio destino antes
de extraerlo. Normaliza los nombres de archivo UTF-8 antes de extraer.
Args:
zip_path: Ruta al archivo ZIP a extraer.
dest_dir: Directorio de destino para la extraccion.
Raises:
ValueError: Si se detecta un intento de Zip Slip (path traversal).
zipfile.BadZipFile: Si el archivo no es un ZIP valido.
FileNotFoundError: Si zip_path no existe.
"""
dest = Path(dest_dir).resolve()
with zipfile.ZipFile(zip_path, "r") as zipf:
normalize_zip_filenames(zipf)
for member in zipf.infolist():
member_path = (dest / member.filename).resolve()
# Verificar que el path resultante este dentro de dest_dir
if not str(member_path).startswith(str(dest) + os.sep):
raise ValueError(
f"Zip Slip attempt detected: {member.filename!r} would extract to {member_path}"
)
zipf.extract(member, dest)
@@ -0,0 +1,206 @@
"""Tests para safe_extract_zip y normalize_zip_filenames."""
import io
import os
import struct
import tempfile
import zipfile
from safe_extract_zip import normalize_zip_filenames, safe_extract_zip
def _make_zip_with_raw_filename(raw_filename_bytes: bytes, content: bytes) -> bytes:
"""Crea un ZIP minimal con bytes de filename raw y sin flag 0x800.
Simula un ZIP creado en Windows donde el filename tiene bytes UTF-8
pero sin el flag de UTF-8 (0x800), causando que zipfile lo lea como CP437.
"""
crc = zipfile.crc32(content) & 0xFFFFFFFF
fname_len = len(raw_filename_bytes)
buf = io.BytesIO()
# Local file header
local_header = struct.pack(
"<4sHHHHHIIIHH",
b"PK\x03\x04", # signature
20, # version needed
0, # general purpose bit flag — sin 0x800
0, # compression: stored
0, # last mod time
0, # last mod date
crc,
len(content), # compressed size
len(content), # uncompressed size
fname_len,
0, # extra field length
)
buf.write(local_header)
buf.write(raw_filename_bytes)
buf.write(content)
# Central directory header
cd_offset = buf.tell()
cd_header = struct.pack(
"<4sHHHHHHIIIHHHHHII",
b"PK\x01\x02",
20, # version made by
20, # version needed
0, # flag — sin 0x800
0, # compression
0, # mod time
0, # mod date
crc,
len(content), # compressed size
len(content), # uncompressed size
fname_len,
0, # extra length
0, # comment length
0, # disk start
0, # internal attr
0, # external attr
0, # local header offset
)
buf.write(cd_header)
buf.write(raw_filename_bytes)
# End of central directory
eocd = struct.pack(
"<4sHHHHIIH",
b"PK\x05\x06",
0, 0, 1, 1,
len(cd_header) + fname_len,
cd_offset,
0,
)
buf.write(eocd)
return buf.getvalue()
def _make_zip(members: dict[str, bytes]) -> str:
"""Crea un ZIP temporal con los miembros dados {filename: content}."""
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
with zipfile.ZipFile(tmp, "w") as zipf:
for name, content in members.items():
zipf.writestr(name, content)
tmp.close()
return tmp.name
def _make_zip_with_traversal(traversal_name: str) -> str:
"""Crea un ZIP con un miembro cuyo nombre intenta path traversal."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zipf:
info = zipfile.ZipInfo(traversal_name)
zipf.writestr(info, b"malicious content")
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
tmp.write(buf.getvalue())
tmp.close()
return tmp.name
def test_zip_normal():
"""ZIP normal extrae correctamente dentro del destino."""
zip_path = _make_zip({"hello.txt": b"hello world", "subdir/file.py": b"# code"})
try:
with tempfile.TemporaryDirectory() as dest:
safe_extract_zip(zip_path, dest)
assert os.path.isfile(os.path.join(dest, "hello.txt"))
assert os.path.isfile(os.path.join(dest, "subdir", "file.py"))
with open(os.path.join(dest, "hello.txt"), "rb") as f:
assert f.read() == b"hello world"
finally:
os.unlink(zip_path)
def test_zip_con_path_traversal():
"""ZIP con path traversal lanza ValueError."""
zip_path = _make_zip_with_traversal("../../etc/passwd")
try:
with tempfile.TemporaryDirectory() as dest:
raised = False
try:
safe_extract_zip(zip_path, dest)
except ValueError as e:
raised = True
assert "Zip Slip" in str(e)
assert raised, "Expected ValueError for path traversal"
finally:
os.unlink(zip_path)
def test_zip_con_paths_absolutos():
"""ZIP con paths absolutos lanza ValueError."""
zip_path = _make_zip_with_traversal("/etc/passwd")
try:
with tempfile.TemporaryDirectory() as dest:
raised = False
try:
safe_extract_zip(zip_path, dest)
except ValueError as e:
raised = True
assert "Zip Slip" in str(e)
assert raised, "Expected ValueError for absolute path"
finally:
os.unlink(zip_path)
def test_normalize_utf8_correctos_no_cambian():
"""ZIP con nombres UTF-8 correctos (flag 0x800) no se modifican."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zipf:
info = zipfile.ZipInfo("archivo_normal.txt")
info.flag_bits |= 0x800 # marcar como UTF-8
zipf.writestr(info, b"content")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zipf:
original_name = zipf.infolist()[0].filename
normalize_zip_filenames(zipf)
assert zipf.infolist()[0].filename == original_name
def test_normalize_cjk_mojibake_repara():
"""ZIP con nombres CJK en mojibake (UTF-8 bytes leidos como CP437) se reparan.
Simula un ZIP donde los bytes del filename son UTF-8 valido de un nombre CJK,
pero el flag 0x800 no esta seteado, asi que zipfile los decodifica como CP437
produciendo mojibake. normalize_zip_filenames debe detectarlo y repararlo.
"""
cjk_name = "\u6587\u4ef6.txt" # 文件.txt
# Construir ZIP con bytes UTF-8 crudos en el campo filename, sin flag 0x800.
# Python no permite esto via ZipInfo (fuerza 0x800 para non-ASCII), por eso
# construimos el ZIP manualmente con _make_zip_with_raw_filename.
utf8_bytes = cjk_name.encode("utf-8")
zip_bytes = _make_zip_with_raw_filename(utf8_bytes, b"cjk content")
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zipf:
member = zipf.infolist()[0]
# Sin el flag, zipfile lee los bytes como CP437: debe ser mojibake
assert not (member.flag_bits & 0x800), "Flag 0x800 no deberia estar seteado"
assert member.filename != cjk_name, "El nombre aun no debe estar reparado"
normalize_zip_filenames(zipf)
repaired = zipf.infolist()[0].filename
has_cjk = any(
"\u4e00" <= c <= "\u9fff" or "\u3400" <= c <= "\u4dbf" for c in repaired
)
assert has_cjk, f"Esperaba CJK en nombre reparado, got: {repaired!r}"
if __name__ == "__main__":
test_zip_normal()
print("PASS: ZIP normal extrae correctamente dentro del destino")
test_zip_con_path_traversal()
print("PASS: ZIP con path traversal lanza ValueError")
test_zip_con_paths_absolutos()
print("PASS: ZIP con paths absolutos lanza ValueError")
test_normalize_utf8_correctos_no_cambian()
print("PASS: ZIP con nombres UTF-8 correctos no se modifican")
test_normalize_cjk_mojibake_repara()
print("PASS: ZIP con nombres CJK mojibake se reparan")
print("\nAll tests passed.")
+64
View File
@@ -0,0 +1,64 @@
---
name: scan_directory
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def scan_directory(root: str, supported_extensions: set[str] | None = None, ignore_dirs: set[str] | None = None, include: str | None = None, exclude: str | None = None, strict: bool = False) -> DirectoryScanResult"
description: "Recorre un arbol de directorios y clasifica cada archivo como procesable o no soportado. Util para validacion pre-importacion de directorios. Ignora dot files, symlinks, archivos vacios y directorios de build/venv/cache predefinidos. Soporta filtros include/exclude con globs."
tags: [directory, scan, filesystem, classification, infra, walk, files]
uses_functions: []
uses_types: [classified_file_py_infra, directory_scan_result_py_infra]
returns: [directory_scan_result_py_infra]
returns_optional: false
error_type: "error_go_core"
imports: [os, pathlib, fnmatch, sys, dataclasses]
tested: true
tests:
- "directorio con mezcla de archivos"
- "directorio con dot files"
- "directorio con subdirs ignorados"
- "filtros include/exclude"
- "modo strict"
test_file_path: "python/functions/infra/scan_directory_test.py"
file_path: "python/functions/infra/scan_directory.py"
---
## Ejemplo
```python
from scan_directory import scan_directory
# Escanear directorio de documentos, solo PDF y Markdown
result = scan_directory(
"/data/proyecto",
supported_extensions={".pdf", ".md"},
ignore_dirs={"archive"},
exclude="*.tmp,drafts/",
strict=False,
)
print(f"Procesables: {len(result.processable)}")
print(f"No soportados: {len(result.unsupported)}")
for f in result.processable:
print(f" {f.rel_path}")
```
## Notas
Funcion impura: realiza I/O de sistema de archivos con `os.walk`.
**Directorios ignorados por defecto (`IGNORE_DIRS`):**
`__pycache__`, `node_modules`, `.git`, `.svn`, `.hg`, `venv`, `.venv`, `env`, `.env`, `.tox`, `.nox`, `.mypy_cache`, `.pytest_cache`, `.ruff_cache`, `dist`, `build`, `.next`, `.nuxt`, `target`, `vendor`.
**Logica de include/exclude:**
- `include`: patrones glob separados por coma (ej: `"*.pdf,*.md"`). Si se provee, solo se incluyen archivos que coincidan con al menos un patron.
- `exclude`: patrones glob separados por coma. Si el patron termina con `/` es un prefijo de path relativo (ej: `"drafts/"`); sin `/` es un glob de nombre (ej: `"*.tmp"`).
**Modo strict:** si `strict=True` y hay archivos no soportados, lanza `ValueError` con la lista de archivos no soportados. Util para pipelines que requieren directorio 100% homogeneo.
**Orden de resultados:** `processable` y `unsupported` se ordenan por `rel_path` ascendente para salida determinista.
Los paths relativos en `ClassifiedFile.rel_path` siempre usan forward slashes (`/`) independientemente del OS.
+217
View File
@@ -0,0 +1,217 @@
"""scan_directory — recorre un arbol de directorios y clasifica cada archivo."""
import fnmatch
import os
import sys
from pathlib import Path
# Importar tipos cuando el modulo se carga desde su directorio o via PYTHONPATH
_HERE = Path(__file__).parent
_TYPES_INFRA = Path(__file__).parent.parent.parent / "types" / "infra"
for _p in [str(_HERE), str(_TYPES_INFRA)]:
if _p not in sys.path:
sys.path.insert(0, _p)
from classified_file import ClassifiedFile # noqa: E402
from directory_scan_result import DirectoryScanResult # noqa: E402
# Directorios ignorados por defecto
IGNORE_DIRS: set[str] = {
"__pycache__",
"node_modules",
".git",
".svn",
".hg",
"venv",
".venv",
"env",
".env",
".tox",
".nox",
".mypy_cache",
".pytest_cache",
".ruff_cache",
"dist",
"build",
".next",
".nuxt",
"target",
"vendor",
}
def scan_directory(
root: str,
supported_extensions: set[str] | None = None,
ignore_dirs: set[str] | None = None,
include: str | None = None,
exclude: str | None = None,
strict: bool = False,
) -> DirectoryScanResult:
"""Recorre un arbol de directorios y clasifica cada archivo como procesable o no soportado.
Util para validacion pre-importacion de directorios: identifica que archivos
podran procesarse y cuales seran ignorados antes de iniciar cualquier pipeline.
Args:
root: Path al directorio raiz a escanear.
supported_extensions: Conjunto de extensiones procesables (ej: {".pdf", ".md"}).
Si es None, todos los archivos no filtrados se marcan como "processable".
ignore_dirs: Nombres o paths relativos de directorios adicionales a ignorar.
Se suman a IGNORE_DIRS. Los paths relativos usan forward slashes.
include: Patrones glob separados por coma (ej: "*.pdf,*.md"). Si se provee,
solo se incluyen archivos que coincidan con al menos un patron.
exclude: Patrones glob separados por coma. Patrones con "/" final son prefijos
de path (ej: "drafts/"); sin "/" son globs de nombre (ej: "*.tmp").
strict: Si True, lanza ValueError si hay archivos no soportados al final.
Returns:
DirectoryScanResult con listas de archivos procesables, no soportados,
paths saltados y warnings.
Raises:
FileNotFoundError: Si root no existe.
NotADirectoryError: Si root no es un directorio.
ValueError: Si strict=True y hay archivos no soportados.
"""
root_path = Path(root).resolve()
if not root_path.exists():
raise FileNotFoundError(f"Directorio no encontrado: {root}")
if not root_path.is_dir():
raise NotADirectoryError(f"No es un directorio: {root}")
# Construir conjuntos de filtro
extra_ignore = ignore_dirs or set()
all_ignore = IGNORE_DIRS | extra_ignore
include_patterns: list[str] = (
[p.strip() for p in include.split(",") if p.strip()] if include else []
)
exclude_patterns: list[str] = (
[p.strip() for p in exclude.split(",") if p.strip()] if exclude else []
)
processable: list[ClassifiedFile] = []
unsupported: list[ClassifiedFile] = []
skipped: list[str] = []
warnings: list[str] = []
for dirpath, dirnames, filenames in os.walk(str(root_path), topdown=True):
dir_path = Path(dirpath)
rel_dir = dir_path.relative_to(root_path)
# Podar directorios (modificar in-place para que os.walk no los visite)
pruned: list[str] = []
kept: list[str] = []
for d in dirnames:
dir_abs = dir_path / d
rel_d = rel_dir / d
rel_d_str = rel_d.as_posix()
# Skip dot dirs
if d.startswith("."):
skipped.append(f"{dir_abs} (dot directory)")
pruned.append(d)
continue
# Skip symlinks
if dir_abs.is_symlink():
skipped.append(f"{dir_abs} (symlink)")
pruned.append(d)
continue
# Skip IGNORE_DIRS (por nombre o por path relativo)
if d in all_ignore or rel_d_str in all_ignore:
skipped.append(f"{dir_abs} (ignored directory)")
pruned.append(d)
continue
kept.append(d)
dirnames[:] = kept
# Procesar archivos
for filename in sorted(filenames):
file_abs = dir_path / filename
rel_file = (rel_dir / filename).as_posix()
# Skip dot files
if filename.startswith("."):
skipped.append(f"{file_abs} (dot file)")
continue
# Skip symlinks
if file_abs.is_symlink():
skipped.append(f"{file_abs} (symlink)")
continue
# Skip archivos vacios
try:
if file_abs.stat().st_size == 0:
skipped.append(f"{file_abs} (empty file)")
continue
except OSError as exc:
warnings.append(f"No se pudo leer {file_abs}: {exc}")
continue
# Aplicar filtro include (si hay patrones, debe coincidir con al menos uno)
if include_patterns:
if not any(fnmatch.fnmatch(filename, p) for p in include_patterns):
skipped.append(f"{file_abs} (no coincide con include)")
continue
# Aplicar filtro exclude
excluded = False
for pat in exclude_patterns:
if pat.endswith("/"):
# Es un prefijo de path relativo
prefix = pat # ej: "drafts/"
if rel_file.startswith(prefix):
excluded = True
break
else:
# Es un glob de nombre de archivo
if fnmatch.fnmatch(filename, pat):
excluded = True
break
if excluded:
skipped.append(f"{file_abs} (excluido por exclude)")
continue
# Clasificar por extension
ext = Path(filename).suffix.lower()
if supported_extensions is None or ext in supported_extensions:
classification = "processable"
else:
classification = "unsupported"
cf = ClassifiedFile(
path=str(file_abs),
rel_path=rel_file,
classification=classification,
)
if classification == "processable":
processable.append(cf)
else:
unsupported.append(cf)
# Ordenar por rel_path
processable.sort(key=lambda f: f.rel_path)
unsupported.sort(key=lambda f: f.rel_path)
result = DirectoryScanResult(
root=str(root_path),
processable=processable,
unsupported=unsupported,
skipped=skipped,
warnings=warnings,
)
if strict and unsupported:
unsupported_paths = [f.rel_path for f in unsupported]
raise ValueError(
f"strict=True: {len(unsupported)} archivos no soportados: {unsupported_paths}"
)
return result
@@ -0,0 +1,181 @@
"""Tests para scan_directory."""
import os
import sys
import tempfile
from pathlib import Path
# Asegurar que los modulos del mismo directorio y tipos se puedan importar
_HERE = Path(__file__).parent
_TYPES_INFRA = Path(__file__).parent.parent.parent / "types" / "infra"
for _p in [str(_HERE), str(_TYPES_INFRA)]:
if _p not in sys.path:
sys.path.insert(0, _p)
from scan_directory import scan_directory # noqa: E402
def _make_tree(base: Path, structure: dict) -> None:
"""Crea un arbol de archivos/dirs a partir de un dict {rel_path: content}."""
for rel, content in structure.items():
path = base / rel
path.parent.mkdir(parents=True, exist_ok=True)
if content is None:
path.mkdir(parents=True, exist_ok=True)
else:
path.write_text(content, encoding="utf-8")
# ---------------------------------------------------------------------------
# Test: directorio con mezcla de archivos
# ---------------------------------------------------------------------------
def test_directorio_con_mezcla_de_archivos():
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
_make_tree(root, {
"report.pdf": "pdf content",
"notes.md": "# Notes",
"image.png": "png content",
"data.csv": "a,b,c",
})
result = scan_directory(str(root), supported_extensions={".pdf", ".md"})
rel_paths = [f.rel_path for f in result.processable]
assert "notes.md" in rel_paths, f"notes.md no en processable: {rel_paths}"
assert "report.pdf" in rel_paths, f"report.pdf no en processable: {rel_paths}"
unsup_paths = [f.rel_path for f in result.unsupported]
assert "image.png" in unsup_paths, f"image.png no en unsupported: {unsup_paths}"
assert "data.csv" in unsup_paths, f"data.csv no en unsupported: {unsup_paths}"
assert all(f.classification == "processable" for f in result.processable)
assert all(f.classification == "unsupported" for f in result.unsupported)
# ---------------------------------------------------------------------------
# Test: directorio con dot files
# ---------------------------------------------------------------------------
def test_directorio_con_dot_files():
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
_make_tree(root, {
"visible.txt": "content",
".hidden": "hidden content",
".env": "SECRET=x",
})
result = scan_directory(str(root))
all_paths = [f.rel_path for f in result.processable + result.unsupported]
assert ".hidden" not in all_paths, f".hidden no deberia aparecer: {all_paths}"
assert ".env" not in all_paths, f".env no deberia aparecer: {all_paths}"
assert "visible.txt" in all_paths, f"visible.txt deberia aparecer: {all_paths}"
skipped_paths = " ".join(result.skipped)
assert ".hidden" in skipped_paths or ".env" in skipped_paths
# ---------------------------------------------------------------------------
# Test: directorio con subdirs ignorados
# ---------------------------------------------------------------------------
def test_directorio_con_subdirs_ignorados():
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
_make_tree(root, {
"main.py": "print('hello')",
"__pycache__/module.pyc": "bytecode",
"node_modules/lib/index.js": "// js",
".git/config": "[core]",
"src/utils.py": "def f(): pass",
})
result = scan_directory(str(root))
all_rels = [f.rel_path for f in result.processable + result.unsupported]
# Archivos dentro de dirs ignorados no deben aparecer
assert not any("__pycache__" in r for r in all_rels), \
f"__pycache__ no deberia estar en resultados: {all_rels}"
assert not any("node_modules" in r for r in all_rels), \
f"node_modules no deberia estar en resultados: {all_rels}"
assert not any(".git" in r for r in all_rels), \
f".git no deberia estar en resultados: {all_rels}"
# Archivos fuera de dirs ignorados si deben aparecer
assert "main.py" in all_rels, f"main.py deberia estar: {all_rels}"
assert "src/utils.py" in all_rels, f"src/utils.py deberia estar: {all_rels}"
# ---------------------------------------------------------------------------
# Test: filtros include/exclude
# ---------------------------------------------------------------------------
def test_filtros_include_exclude():
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
_make_tree(root, {
"report.pdf": "content",
"notes.md": "notes",
"image.png": "image",
"drafts/draft.md": "draft",
"temp.tmp": "tmp",
})
# Solo incluir .pdf y .md
result = scan_directory(str(root), include="*.pdf,*.md")
all_rels = [f.rel_path for f in result.processable + result.unsupported]
assert "image.png" not in all_rels, f"image.png no deberia incluirse: {all_rels}"
assert "temp.tmp" not in all_rels, f"temp.tmp no deberia incluirse: {all_rels}"
assert "report.pdf" in all_rels
assert "notes.md" in all_rels
# Excluir path prefix drafts/ y extension .tmp
result2 = scan_directory(str(root), exclude="drafts/,*.tmp")
all_rels2 = [f.rel_path for f in result2.processable + result2.unsupported]
assert "drafts/draft.md" not in all_rels2, \
f"drafts/draft.md no deberia incluirse: {all_rels2}"
assert "temp.tmp" not in all_rels2, f"temp.tmp no deberia incluirse: {all_rels2}"
assert "report.pdf" in all_rels2
# ---------------------------------------------------------------------------
# Test: modo strict
# ---------------------------------------------------------------------------
def test_modo_strict():
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
_make_tree(root, {
"doc.pdf": "content",
"image.png": "image",
})
# strict=False no lanza error aunque haya unsupported
result = scan_directory(str(root), supported_extensions={".pdf"}, strict=False)
assert len(result.unsupported) == 1
# strict=True lanza ValueError
raised = False
try:
scan_directory(str(root), supported_extensions={".pdf"}, strict=True)
except ValueError:
raised = True
assert raised, "strict=True deberia lanzar ValueError cuando hay unsupported"
if __name__ == "__main__":
test_directorio_con_mezcla_de_archivos()
print("PASS: directorio con mezcla de archivos")
test_directorio_con_dot_files()
print("PASS: directorio con dot files")
test_directorio_con_subdirs_ignorados()
print("PASS: directorio con subdirs ignorados")
test_filtros_include_exclude()
print("PASS: filtros include/exclude")
test_modo_strict()
print("PASS: modo strict")
print("\nAll tests passed.")
+51
View File
@@ -0,0 +1,51 @@
---
name: setup_logger
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def setup_logger(name: str = 'app', log_dir: str = 'logs', level: int = logging.DEBUG) -> logging.Logger"
description: "Configura un logger con dual output: archivo con rotacion por tamano (DEBUG+, 10MB, 5 backups) y consola (INFO+). Crea log_dir si no existe. Idempotente: no duplica handlers si el logger ya esta configurado."
tags: [logging, logger, rotation, file, console, infra, debug]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [logging, logging.handlers, os, sys, datetime]
tested: true
tests:
- "logger se crea con 2 handlers"
- "segundo call no duplica handlers"
- "archivo se crea en log_dir"
- "get_logger retorna logger configurado"
- "logger level es debug"
test_file_path: "python/functions/infra/setup_logger_test.py"
file_path: "python/functions/infra/setup_logger.py"
---
## Ejemplo
```python
from setup_logger import setup_logger, get_logger
# Configurar al inicio de la aplicacion
logger = setup_logger(name="mi_app", log_dir="logs", level=logging.DEBUG)
logger.info("Aplicacion iniciada")
logger.debug("Detalle de debug")
# En modulos internos: obtener logger ya configurado
log = get_logger("mi_app")
log.warning("Algo inesperado ocurrio")
```
## Notas
Funcion impura: crea el directorio `log_dir` en disco y modifica el estado global del sistema de logging de Python.
El archivo de log tiene nombre `YYYY-MM-DD.log` segun la fecha de inicio. La rotacion es por tamano (10 MB), no por tiempo — por eso el nombre es fijo para cada dia de inicio de la aplicacion.
En Windows se reconfigura `sys.stdout` a UTF-8 para evitar mojibake con caracteres no-ASCII.
La funcion companion `get_logger` es util en modulos que no controlan la inicializacion: devuelve el logger si ya fue configurado, o lo crea con defaults.
+85
View File
@@ -0,0 +1,85 @@
"""Configuracion de logger con rotacion de archivo y salida a consola."""
import logging
import logging.handlers
import os
import sys
from datetime import datetime
def setup_logger(
name: str = "app",
log_dir: str = "logs",
level: int = logging.DEBUG,
) -> logging.Logger:
"""Configura un logger con dual output: archivo rotante y consola.
Crea el directorio de logs si no existe. El archivo usa nivel DEBUG con
formato detallado y rotacion diaria (maxBytes=10MB, backupCount=5).
La consola usa nivel INFO con formato simplificado. Es idempotente: si el
logger ya tiene handlers no se duplican.
Args:
name: Nombre del logger (identifica la instancia en el sistema de logging).
log_dir: Directorio donde se guardan los archivos de log.
level: Nivel minimo del logger principal (por defecto DEBUG).
Returns:
Logger configurado con handler de archivo y handler de consola.
"""
os.makedirs(log_dir, exist_ok=True)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.propagate = False
# Idempotente: si ya tiene handlers no agregar mas
if logger.handlers:
return logger
fmt_detailed = logging.Formatter(
"[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s"
)
fmt_simple = logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s"
)
# File handler con rotacion por tamano
log_filename = os.path.join(log_dir, f"{datetime.now():%Y-%m-%d}.log")
file_handler = logging.handlers.RotatingFileHandler(
log_filename,
maxBytes=10 * 1024 * 1024, # 10 MB
backupCount=5,
encoding="utf-8",
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt_detailed)
# Console handler
if sys.platform == "win32":
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined]
except AttributeError:
pass
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(fmt_simple)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def get_logger(name: str = "app") -> logging.Logger:
"""Devuelve un logger existente o lo crea con setup_logger.
Args:
name: Nombre del logger.
Returns:
Logger configurado.
"""
logger = logging.getLogger(name)
return logger if logger.handlers else setup_logger(name)
@@ -0,0 +1,49 @@
"""Tests para setup_logger."""
import logging
import os
import tempfile
from setup_logger import get_logger, setup_logger
def test_logger_tiene_dos_handlers():
with tempfile.TemporaryDirectory() as log_dir:
logger = setup_logger(name="test_two_handlers", log_dir=log_dir)
assert len(logger.handlers) == 2
# limpiar para no contaminar otros tests
logger.handlers.clear()
def test_segundo_call_no_duplica_handlers():
with tempfile.TemporaryDirectory() as log_dir:
logger1 = setup_logger(name="test_idempotent", log_dir=log_dir)
handler_count_after_first = len(logger1.handlers)
logger2 = setup_logger(name="test_idempotent", log_dir=log_dir)
assert logger1 is logger2
assert len(logger2.handlers) == handler_count_after_first
logger1.handlers.clear()
def test_archivo_se_crea_en_log_dir():
with tempfile.TemporaryDirectory() as log_dir:
logger = setup_logger(name="test_file_created", log_dir=log_dir)
log_files = [f for f in os.listdir(log_dir) if f.endswith(".log")]
assert len(log_files) == 1
logger.handlers.clear()
def test_get_logger_retorna_logger_configurado():
with tempfile.TemporaryDirectory() as log_dir:
# Primero configurar para que get_logger encuentre handlers
setup_logger(name="test_get_logger", log_dir=log_dir)
logger = get_logger(name="test_get_logger")
assert len(logger.handlers) == 2
logger.handlers.clear()
def test_logger_level_es_debug():
with tempfile.TemporaryDirectory() as log_dir:
logger = setup_logger(name="test_level_debug", log_dir=log_dir, level=logging.DEBUG)
assert logger.level == logging.DEBUG
logger.handlers.clear()