Files
fn_registry/python/functions/infra/scan_directory.py
T
egutierrez 5a324f6554 feat: funciones Python infra y tipos Python (core, datascience, infra)
Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json,
http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory,
setup_logger, normalize_zip_filenames.
Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...),
6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:43 +02:00

218 lines
7.1 KiB
Python

"""scan_directory — recorre un arbol de directorios y clasifica cada archivo."""
import fnmatch
import os
import sys
from pathlib import Path
# Importar tipos cuando el modulo se carga desde su directorio o via PYTHONPATH
_HERE = Path(__file__).parent
_TYPES_INFRA = Path(__file__).parent.parent.parent / "types" / "infra"
for _p in [str(_HERE), str(_TYPES_INFRA)]:
if _p not in sys.path:
sys.path.insert(0, _p)
from classified_file import ClassifiedFile # noqa: E402
from directory_scan_result import DirectoryScanResult # noqa: E402
# Directorios ignorados por defecto
IGNORE_DIRS: set[str] = {
"__pycache__",
"node_modules",
".git",
".svn",
".hg",
"venv",
".venv",
"env",
".env",
".tox",
".nox",
".mypy_cache",
".pytest_cache",
".ruff_cache",
"dist",
"build",
".next",
".nuxt",
"target",
"vendor",
}
def scan_directory(
root: str,
supported_extensions: set[str] | None = None,
ignore_dirs: set[str] | None = None,
include: str | None = None,
exclude: str | None = None,
strict: bool = False,
) -> DirectoryScanResult:
"""Recorre un arbol de directorios y clasifica cada archivo como procesable o no soportado.
Util para validacion pre-importacion de directorios: identifica que archivos
podran procesarse y cuales seran ignorados antes de iniciar cualquier pipeline.
Args:
root: Path al directorio raiz a escanear.
supported_extensions: Conjunto de extensiones procesables (ej: {".pdf", ".md"}).
Si es None, todos los archivos no filtrados se marcan como "processable".
ignore_dirs: Nombres o paths relativos de directorios adicionales a ignorar.
Se suman a IGNORE_DIRS. Los paths relativos usan forward slashes.
include: Patrones glob separados por coma (ej: "*.pdf,*.md"). Si se provee,
solo se incluyen archivos que coincidan con al menos un patron.
exclude: Patrones glob separados por coma. Patrones con "/" final son prefijos
de path (ej: "drafts/"); sin "/" son globs de nombre (ej: "*.tmp").
strict: Si True, lanza ValueError si hay archivos no soportados al final.
Returns:
DirectoryScanResult con listas de archivos procesables, no soportados,
paths saltados y warnings.
Raises:
FileNotFoundError: Si root no existe.
NotADirectoryError: Si root no es un directorio.
ValueError: Si strict=True y hay archivos no soportados.
"""
root_path = Path(root).resolve()
if not root_path.exists():
raise FileNotFoundError(f"Directorio no encontrado: {root}")
if not root_path.is_dir():
raise NotADirectoryError(f"No es un directorio: {root}")
# Construir conjuntos de filtro
extra_ignore = ignore_dirs or set()
all_ignore = IGNORE_DIRS | extra_ignore
include_patterns: list[str] = (
[p.strip() for p in include.split(",") if p.strip()] if include else []
)
exclude_patterns: list[str] = (
[p.strip() for p in exclude.split(",") if p.strip()] if exclude else []
)
processable: list[ClassifiedFile] = []
unsupported: list[ClassifiedFile] = []
skipped: list[str] = []
warnings: list[str] = []
for dirpath, dirnames, filenames in os.walk(str(root_path), topdown=True):
dir_path = Path(dirpath)
rel_dir = dir_path.relative_to(root_path)
# Podar directorios (modificar in-place para que os.walk no los visite)
pruned: list[str] = []
kept: list[str] = []
for d in dirnames:
dir_abs = dir_path / d
rel_d = rel_dir / d
rel_d_str = rel_d.as_posix()
# Skip dot dirs
if d.startswith("."):
skipped.append(f"{dir_abs} (dot directory)")
pruned.append(d)
continue
# Skip symlinks
if dir_abs.is_symlink():
skipped.append(f"{dir_abs} (symlink)")
pruned.append(d)
continue
# Skip IGNORE_DIRS (por nombre o por path relativo)
if d in all_ignore or rel_d_str in all_ignore:
skipped.append(f"{dir_abs} (ignored directory)")
pruned.append(d)
continue
kept.append(d)
dirnames[:] = kept
# Procesar archivos
for filename in sorted(filenames):
file_abs = dir_path / filename
rel_file = (rel_dir / filename).as_posix()
# Skip dot files
if filename.startswith("."):
skipped.append(f"{file_abs} (dot file)")
continue
# Skip symlinks
if file_abs.is_symlink():
skipped.append(f"{file_abs} (symlink)")
continue
# Skip archivos vacios
try:
if file_abs.stat().st_size == 0:
skipped.append(f"{file_abs} (empty file)")
continue
except OSError as exc:
warnings.append(f"No se pudo leer {file_abs}: {exc}")
continue
# Aplicar filtro include (si hay patrones, debe coincidir con al menos uno)
if include_patterns:
if not any(fnmatch.fnmatch(filename, p) for p in include_patterns):
skipped.append(f"{file_abs} (no coincide con include)")
continue
# Aplicar filtro exclude
excluded = False
for pat in exclude_patterns:
if pat.endswith("/"):
# Es un prefijo de path relativo
prefix = pat # ej: "drafts/"
if rel_file.startswith(prefix):
excluded = True
break
else:
# Es un glob de nombre de archivo
if fnmatch.fnmatch(filename, pat):
excluded = True
break
if excluded:
skipped.append(f"{file_abs} (excluido por exclude)")
continue
# Clasificar por extension
ext = Path(filename).suffix.lower()
if supported_extensions is None or ext in supported_extensions:
classification = "processable"
else:
classification = "unsupported"
cf = ClassifiedFile(
path=str(file_abs),
rel_path=rel_file,
classification=classification,
)
if classification == "processable":
processable.append(cf)
else:
unsupported.append(cf)
# Ordenar por rel_path
processable.sort(key=lambda f: f.rel_path)
unsupported.sort(key=lambda f: f.rel_path)
result = DirectoryScanResult(
root=str(root_path),
processable=processable,
unsupported=unsupported,
skipped=skipped,
warnings=warnings,
)
if strict and unsupported:
unsupported_paths = [f.rel_path for f in unsupported]
raise ValueError(
f"strict=True: {len(unsupported)} archivos no soportados: {unsupported_paths}"
)
return result