5a324f6554
Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json, http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory, setup_logger, normalize_zip_filenames. Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...), 6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
218 lines
7.1 KiB
Python
218 lines
7.1 KiB
Python
"""scan_directory — recorre un arbol de directorios y clasifica cada archivo."""
|
|
|
|
import fnmatch
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Importar tipos cuando el modulo se carga desde su directorio o via PYTHONPATH
|
|
_HERE = Path(__file__).parent
|
|
_TYPES_INFRA = Path(__file__).parent.parent.parent / "types" / "infra"
|
|
for _p in [str(_HERE), str(_TYPES_INFRA)]:
|
|
if _p not in sys.path:
|
|
sys.path.insert(0, _p)
|
|
|
|
from classified_file import ClassifiedFile # noqa: E402
|
|
from directory_scan_result import DirectoryScanResult # noqa: E402
|
|
|
|
# Directorios ignorados por defecto
|
|
IGNORE_DIRS: set[str] = {
|
|
"__pycache__",
|
|
"node_modules",
|
|
".git",
|
|
".svn",
|
|
".hg",
|
|
"venv",
|
|
".venv",
|
|
"env",
|
|
".env",
|
|
".tox",
|
|
".nox",
|
|
".mypy_cache",
|
|
".pytest_cache",
|
|
".ruff_cache",
|
|
"dist",
|
|
"build",
|
|
".next",
|
|
".nuxt",
|
|
"target",
|
|
"vendor",
|
|
}
|
|
|
|
|
|
def scan_directory(
|
|
root: str,
|
|
supported_extensions: set[str] | None = None,
|
|
ignore_dirs: set[str] | None = None,
|
|
include: str | None = None,
|
|
exclude: str | None = None,
|
|
strict: bool = False,
|
|
) -> DirectoryScanResult:
|
|
"""Recorre un arbol de directorios y clasifica cada archivo como procesable o no soportado.
|
|
|
|
Util para validacion pre-importacion de directorios: identifica que archivos
|
|
podran procesarse y cuales seran ignorados antes de iniciar cualquier pipeline.
|
|
|
|
Args:
|
|
root: Path al directorio raiz a escanear.
|
|
supported_extensions: Conjunto de extensiones procesables (ej: {".pdf", ".md"}).
|
|
Si es None, todos los archivos no filtrados se marcan como "processable".
|
|
ignore_dirs: Nombres o paths relativos de directorios adicionales a ignorar.
|
|
Se suman a IGNORE_DIRS. Los paths relativos usan forward slashes.
|
|
include: Patrones glob separados por coma (ej: "*.pdf,*.md"). Si se provee,
|
|
solo se incluyen archivos que coincidan con al menos un patron.
|
|
exclude: Patrones glob separados por coma. Patrones con "/" final son prefijos
|
|
de path (ej: "drafts/"); sin "/" son globs de nombre (ej: "*.tmp").
|
|
strict: Si True, lanza ValueError si hay archivos no soportados al final.
|
|
|
|
Returns:
|
|
DirectoryScanResult con listas de archivos procesables, no soportados,
|
|
paths saltados y warnings.
|
|
|
|
Raises:
|
|
FileNotFoundError: Si root no existe.
|
|
NotADirectoryError: Si root no es un directorio.
|
|
ValueError: Si strict=True y hay archivos no soportados.
|
|
"""
|
|
root_path = Path(root).resolve()
|
|
|
|
if not root_path.exists():
|
|
raise FileNotFoundError(f"Directorio no encontrado: {root}")
|
|
if not root_path.is_dir():
|
|
raise NotADirectoryError(f"No es un directorio: {root}")
|
|
|
|
# Construir conjuntos de filtro
|
|
extra_ignore = ignore_dirs or set()
|
|
all_ignore = IGNORE_DIRS | extra_ignore
|
|
|
|
include_patterns: list[str] = (
|
|
[p.strip() for p in include.split(",") if p.strip()] if include else []
|
|
)
|
|
exclude_patterns: list[str] = (
|
|
[p.strip() for p in exclude.split(",") if p.strip()] if exclude else []
|
|
)
|
|
|
|
processable: list[ClassifiedFile] = []
|
|
unsupported: list[ClassifiedFile] = []
|
|
skipped: list[str] = []
|
|
warnings: list[str] = []
|
|
|
|
for dirpath, dirnames, filenames in os.walk(str(root_path), topdown=True):
|
|
dir_path = Path(dirpath)
|
|
rel_dir = dir_path.relative_to(root_path)
|
|
|
|
# Podar directorios (modificar in-place para que os.walk no los visite)
|
|
pruned: list[str] = []
|
|
kept: list[str] = []
|
|
for d in dirnames:
|
|
dir_abs = dir_path / d
|
|
rel_d = rel_dir / d
|
|
rel_d_str = rel_d.as_posix()
|
|
|
|
# Skip dot dirs
|
|
if d.startswith("."):
|
|
skipped.append(f"{dir_abs} (dot directory)")
|
|
pruned.append(d)
|
|
continue
|
|
|
|
# Skip symlinks
|
|
if dir_abs.is_symlink():
|
|
skipped.append(f"{dir_abs} (symlink)")
|
|
pruned.append(d)
|
|
continue
|
|
|
|
# Skip IGNORE_DIRS (por nombre o por path relativo)
|
|
if d in all_ignore or rel_d_str in all_ignore:
|
|
skipped.append(f"{dir_abs} (ignored directory)")
|
|
pruned.append(d)
|
|
continue
|
|
|
|
kept.append(d)
|
|
|
|
dirnames[:] = kept
|
|
|
|
# Procesar archivos
|
|
for filename in sorted(filenames):
|
|
file_abs = dir_path / filename
|
|
rel_file = (rel_dir / filename).as_posix()
|
|
|
|
# Skip dot files
|
|
if filename.startswith("."):
|
|
skipped.append(f"{file_abs} (dot file)")
|
|
continue
|
|
|
|
# Skip symlinks
|
|
if file_abs.is_symlink():
|
|
skipped.append(f"{file_abs} (symlink)")
|
|
continue
|
|
|
|
# Skip archivos vacios
|
|
try:
|
|
if file_abs.stat().st_size == 0:
|
|
skipped.append(f"{file_abs} (empty file)")
|
|
continue
|
|
except OSError as exc:
|
|
warnings.append(f"No se pudo leer {file_abs}: {exc}")
|
|
continue
|
|
|
|
# Aplicar filtro include (si hay patrones, debe coincidir con al menos uno)
|
|
if include_patterns:
|
|
if not any(fnmatch.fnmatch(filename, p) for p in include_patterns):
|
|
skipped.append(f"{file_abs} (no coincide con include)")
|
|
continue
|
|
|
|
# Aplicar filtro exclude
|
|
excluded = False
|
|
for pat in exclude_patterns:
|
|
if pat.endswith("/"):
|
|
# Es un prefijo de path relativo
|
|
prefix = pat # ej: "drafts/"
|
|
if rel_file.startswith(prefix):
|
|
excluded = True
|
|
break
|
|
else:
|
|
# Es un glob de nombre de archivo
|
|
if fnmatch.fnmatch(filename, pat):
|
|
excluded = True
|
|
break
|
|
if excluded:
|
|
skipped.append(f"{file_abs} (excluido por exclude)")
|
|
continue
|
|
|
|
# Clasificar por extension
|
|
ext = Path(filename).suffix.lower()
|
|
if supported_extensions is None or ext in supported_extensions:
|
|
classification = "processable"
|
|
else:
|
|
classification = "unsupported"
|
|
|
|
cf = ClassifiedFile(
|
|
path=str(file_abs),
|
|
rel_path=rel_file,
|
|
classification=classification,
|
|
)
|
|
if classification == "processable":
|
|
processable.append(cf)
|
|
else:
|
|
unsupported.append(cf)
|
|
|
|
# Ordenar por rel_path
|
|
processable.sort(key=lambda f: f.rel_path)
|
|
unsupported.sort(key=lambda f: f.rel_path)
|
|
|
|
result = DirectoryScanResult(
|
|
root=str(root_path),
|
|
processable=processable,
|
|
unsupported=unsupported,
|
|
skipped=skipped,
|
|
warnings=warnings,
|
|
)
|
|
|
|
if strict and unsupported:
|
|
unsupported_paths = [f.rel_path for f in unsupported]
|
|
raise ValueError(
|
|
f"strict=True: {len(unsupported)} archivos no soportados: {unsupported_paths}"
|
|
)
|
|
|
|
return result
|