"""Tests para el backend diffusers: load_pipeline, set_scheduler, generate, unload.""" from __future__ import annotations import sys import os import time import pytest # Ajustar path para importar desde python/functions/ml/ _ML_PATH = os.path.join( os.path.dirname(__file__), "..", "..", ) sys.path.insert(0, os.path.abspath(_ML_PATH)) # Importaciones lazy de torch y diffusers — las omitimos si no estan disponibles. torch = pytest.importorskip("torch", reason="torch no instalado — skip tests diffusers") pytest.importorskip("diffusers", reason="diffusers no instalado — skip tests diffusers") from ml.model_ref import ModelRef from ml.generation_config import GenerationConfig from ml.image_gen_result import ImageGenResult from ml.diffusers_load_pipeline import diffusers_load_pipeline, _clear_pipeline_cache from ml.diffusers_set_scheduler import diffusers_set_scheduler from ml.diffusers_unload import diffusers_unload # diffusers_generate importa image_gen_result sin prefijo de paquete. # Para evitar el double-import problem (ml.image_gen_result != image_gen_result), # forzamos que sys.modules["image_gen_result"] apunte al modulo ya cargado # como ml.image_gen_result antes de importar diffusers_generate. import sys as _sys import ml.image_gen_result as _igr_module import ml.generation_config as _gcfg_module import ml.genconfig_to_diffusers_kwargs as _gkwargs_module for _alias, _mod in [ ("image_gen_result", _igr_module), ("generation_config", _gcfg_module), ("genconfig_to_diffusers_kwargs", _gkwargs_module), ]: if _alias not in _sys.modules: _sys.modules[_alias] = _mod from ml.diffusers_generate import diffusers_generate # --------------------------------------------------------------------------- # Constantes # --------------------------------------------------------------------------- SD_TURBO_PATH = "/home/lucas/vaults/imagegen_models/diffusers/sd-turbo" # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture(scope="session") def sd_turbo_model() -> ModelRef: """ModelRef apuntando a SD Turbo local.""" if not os.path.isdir(SD_TURBO_PATH): pytest.skip(f"SD Turbo no encontrado en {SD_TURBO_PATH}") return ModelRef( name="sd-turbo", model_type="sd15", quantization="fp16", path=SD_TURBO_PATH, ) @pytest.fixture(scope="session") def loaded_pipe(sd_turbo_model: ModelRef): """Pipeline SD Turbo cargado una sola vez para toda la sesion de tests.""" # Intentar fp16 primero; si falla (no hay variante fp16) usar fp32 try: pipe = diffusers_load_pipeline(sd_turbo_model, device="auto", dtype="fp16") except Exception: _clear_pipeline_cache() pipe = diffusers_load_pipeline(sd_turbo_model, device="auto", dtype="fp32") yield pipe # Teardown: liberar al final de la sesion diffusers_unload(None) @pytest.fixture(scope="session") def sd_turbo_cfg(sd_turbo_model: ModelRef) -> GenerationConfig: """GenerationConfig minimo para SD Turbo (1 step, 512x512).""" return GenerationConfig( prompt="a simple red circle on white background", negative_prompt=None, seed=42, steps=1, cfg_scale=0.0, sampler="euler", width=512, height=512, model=sd_turbo_model, loras=[], ) # --------------------------------------------------------------------------- # Test: carga pipeline y retorna callable # --------------------------------------------------------------------------- def test_load_pipeline_returns_callable(sd_turbo_model: ModelRef) -> None: """carga pipeline y retorna callable""" _clear_pipeline_cache() pipe = diffusers_load_pipeline(sd_turbo_model, device="auto", dtype="fp16") assert callable(pipe), "El pipeline debe ser callable" assert hasattr(pipe, "scheduler"), "El pipeline debe tener atributo scheduler" # --------------------------------------------------------------------------- # Test: segunda carga usa cache (< 100ms) # --------------------------------------------------------------------------- def test_load_pipeline_caches(sd_turbo_model: ModelRef) -> None: """segunda carga usa cache (< 100ms)""" # Primera carga (puede tardar varios segundos) _clear_pipeline_cache() _ = diffusers_load_pipeline(sd_turbo_model, device="auto", dtype="fp16") # Segunda carga debe ser cache hit t0 = time.perf_counter() pipe2 = diffusers_load_pipeline(sd_turbo_model, device="auto", dtype="fp16") elapsed_ms = (time.perf_counter() - t0) * 1000 assert elapsed_ms < 100, ( f"Segunda carga tardo {elapsed_ms:.1f}ms (esperado < 100ms — debe ser cache hit)" ) assert pipe2 is not None # --------------------------------------------------------------------------- # Test: set_scheduler cambia la clase del scheduler # --------------------------------------------------------------------------- def test_set_scheduler_changes_scheduler_class(loaded_pipe) -> None: """euler cambia scheduler a EulerDiscreteScheduler""" pipe = diffusers_set_scheduler(loaded_pipe, "euler") scheduler_name = type(pipe.scheduler).__name__ assert scheduler_name == "EulerDiscreteScheduler", ( f"Esperado EulerDiscreteScheduler, obtenido {scheduler_name}" ) def test_set_scheduler_euler_a(loaded_pipe) -> None: """euler_a cambia scheduler a EulerAncestralDiscreteScheduler""" pipe = diffusers_set_scheduler(loaded_pipe, "euler_a") scheduler_name = type(pipe.scheduler).__name__ assert scheduler_name == "EulerAncestralDiscreteScheduler", ( f"Esperado EulerAncestralDiscreteScheduler, obtenido {scheduler_name}" ) # Restaurar euler para no afectar otros tests diffusers_set_scheduler(loaded_pipe, "euler") def test_set_scheduler_invalid_raises_value_error(loaded_pipe) -> None: """sampler invalido lanza ValueError""" with pytest.raises(ValueError, match="no soportado"): diffusers_set_scheduler(loaded_pipe, "nonexistent_sampler_xyz") # --------------------------------------------------------------------------- # Test: genera imagen retorna ImageGenResult # --------------------------------------------------------------------------- def test_generate_returns_image_gen_result( loaded_pipe, sd_turbo_cfg: GenerationConfig ) -> None: """genera imagen retorna ImageGenResult""" result = diffusers_generate(loaded_pipe, sd_turbo_cfg) assert isinstance(result, ImageGenResult), ( f"Esperado ImageGenResult, obtenido {type(result)}" ) assert result.image is not None, "result.image no debe ser None" assert result.duration_ms > 0, ( f"duration_ms debe ser positivo, obtenido {result.duration_ms}" ) assert "backend" in result.meta, "meta debe tener key 'backend'" assert result.meta["backend"] == "diffusers", ( f"meta['backend'] debe ser 'diffusers', obtenido {result.meta['backend']}" ) assert "model" in result.meta, "meta debe tener key 'model'" # Verificar que la imagen tiene las dimensiones correctas w, h = result.image.size assert w == sd_turbo_cfg.width and h == sd_turbo_cfg.height, ( f"Imagen esperada {sd_turbo_cfg.width}x{sd_turbo_cfg.height}, " f"obtenida {w}x{h}" ) # --------------------------------------------------------------------------- # Test: unload limpia cache cuda si disponible # --------------------------------------------------------------------------- def test_unload_clears_cuda() -> None: """unload None limpia cache cuda si disponible""" cuda_available = torch.cuda.is_available() # Limpiar cache — no debe lanzar excepcion independientemente de si hay CUDA diffusers_unload(None) if cuda_available: # Despues de empty_cache, la memoria reservada por el allocator baja # No podemos asumir que sea 0 (otros tensores pueden estar vivos), # pero la llamada debe completarse sin error. reserved = torch.cuda.memory_reserved() # Solo verificamos que no lanza excepcion y que la llamada completo assert reserved >= 0, "memory_reserved debe ser >= 0"