"""scan_directory — recorre un arbol de directorios y clasifica cada archivo.""" import fnmatch import os import sys from pathlib import Path # Importar tipos cuando el modulo se carga desde su directorio o via PYTHONPATH _HERE = Path(__file__).parent _TYPES_INFRA = Path(__file__).parent.parent.parent / "types" / "infra" for _p in [str(_HERE), str(_TYPES_INFRA)]: if _p not in sys.path: sys.path.insert(0, _p) from classified_file import ClassifiedFile # noqa: E402 from directory_scan_result import DirectoryScanResult # noqa: E402 # Directorios ignorados por defecto IGNORE_DIRS: set[str] = { "__pycache__", "node_modules", ".git", ".svn", ".hg", "venv", ".venv", "env", ".env", ".tox", ".nox", ".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build", ".next", ".nuxt", "target", "vendor", } def scan_directory( root: str, supported_extensions: set[str] | None = None, ignore_dirs: set[str] | None = None, include: str | None = None, exclude: str | None = None, strict: bool = False, ) -> DirectoryScanResult: """Recorre un arbol de directorios y clasifica cada archivo como procesable o no soportado. Util para validacion pre-importacion de directorios: identifica que archivos podran procesarse y cuales seran ignorados antes de iniciar cualquier pipeline. Args: root: Path al directorio raiz a escanear. supported_extensions: Conjunto de extensiones procesables (ej: {".pdf", ".md"}). Si es None, todos los archivos no filtrados se marcan como "processable". ignore_dirs: Nombres o paths relativos de directorios adicionales a ignorar. Se suman a IGNORE_DIRS. Los paths relativos usan forward slashes. include: Patrones glob separados por coma (ej: "*.pdf,*.md"). Si se provee, solo se incluyen archivos que coincidan con al menos un patron. exclude: Patrones glob separados por coma. Patrones con "/" final son prefijos de path (ej: "drafts/"); sin "/" son globs de nombre (ej: "*.tmp"). strict: Si True, lanza ValueError si hay archivos no soportados al final. Returns: DirectoryScanResult con listas de archivos procesables, no soportados, paths saltados y warnings. Raises: FileNotFoundError: Si root no existe. NotADirectoryError: Si root no es un directorio. ValueError: Si strict=True y hay archivos no soportados. """ root_path = Path(root).resolve() if not root_path.exists(): raise FileNotFoundError(f"Directorio no encontrado: {root}") if not root_path.is_dir(): raise NotADirectoryError(f"No es un directorio: {root}") # Construir conjuntos de filtro extra_ignore = ignore_dirs or set() all_ignore = IGNORE_DIRS | extra_ignore include_patterns: list[str] = ( [p.strip() for p in include.split(",") if p.strip()] if include else [] ) exclude_patterns: list[str] = ( [p.strip() for p in exclude.split(",") if p.strip()] if exclude else [] ) processable: list[ClassifiedFile] = [] unsupported: list[ClassifiedFile] = [] skipped: list[str] = [] warnings: list[str] = [] for dirpath, dirnames, filenames in os.walk(str(root_path), topdown=True): dir_path = Path(dirpath) rel_dir = dir_path.relative_to(root_path) # Podar directorios (modificar in-place para que os.walk no los visite) pruned: list[str] = [] kept: list[str] = [] for d in dirnames: dir_abs = dir_path / d rel_d = rel_dir / d rel_d_str = rel_d.as_posix() # Skip dot dirs if d.startswith("."): skipped.append(f"{dir_abs} (dot directory)") pruned.append(d) continue # Skip symlinks if dir_abs.is_symlink(): skipped.append(f"{dir_abs} (symlink)") pruned.append(d) continue # Skip IGNORE_DIRS (por nombre o por path relativo) if d in all_ignore or rel_d_str in all_ignore: skipped.append(f"{dir_abs} (ignored directory)") pruned.append(d) continue kept.append(d) dirnames[:] = kept # Procesar archivos for filename in sorted(filenames): file_abs = dir_path / filename rel_file = (rel_dir / filename).as_posix() # Skip dot files if filename.startswith("."): skipped.append(f"{file_abs} (dot file)") continue # Skip symlinks if file_abs.is_symlink(): skipped.append(f"{file_abs} (symlink)") continue # Skip archivos vacios try: if file_abs.stat().st_size == 0: skipped.append(f"{file_abs} (empty file)") continue except OSError as exc: warnings.append(f"No se pudo leer {file_abs}: {exc}") continue # Aplicar filtro include (si hay patrones, debe coincidir con al menos uno) if include_patterns: if not any(fnmatch.fnmatch(filename, p) for p in include_patterns): skipped.append(f"{file_abs} (no coincide con include)") continue # Aplicar filtro exclude excluded = False for pat in exclude_patterns: if pat.endswith("/"): # Es un prefijo de path relativo prefix = pat # ej: "drafts/" if rel_file.startswith(prefix): excluded = True break else: # Es un glob de nombre de archivo if fnmatch.fnmatch(filename, pat): excluded = True break if excluded: skipped.append(f"{file_abs} (excluido por exclude)") continue # Clasificar por extension ext = Path(filename).suffix.lower() if supported_extensions is None or ext in supported_extensions: classification = "processable" else: classification = "unsupported" cf = ClassifiedFile( path=str(file_abs), rel_path=rel_file, classification=classification, ) if classification == "processable": processable.append(cf) else: unsupported.append(cf) # Ordenar por rel_path processable.sort(key=lambda f: f.rel_path) unsupported.sort(key=lambda f: f.rel_path) result = DirectoryScanResult( root=str(root_path), processable=processable, unsupported=unsupported, skipped=skipped, warnings=warnings, ) if strict and unsupported: unsupported_paths = [f.rel_path for f in unsupported] raise ValueError( f"strict=True: {len(unsupported)} archivos no soportados: {unsupported_paths}" ) return result