# Repo.py from abc import ABC from typing import Type, TypeVar, Generic, Optional from sqlalchemy.orm import Session from sqlalchemy import func from datetime import datetime from datetime import datetime, timezone from .Mapper import Mapper_base # Asegúrate de importar tu ABC base TModelo = TypeVar("TModelo") TDominio = TypeVar("TDominio") class Repo_base(ABC, Generic[TModelo, TDominio]): def __init__(self, session: Session, modelo: type[TModelo], mapper: type[Mapper_base[TDominio, TModelo]]): self.session = session self.Modelo = modelo self.Mapper = mapper # ---------------------- # ADD # ---------------------- def add(self, dominio: TDominio, created_by: Optional[str] = None, notes: Optional[str] = None) -> str: data = self.Mapper.to_dict(dominio) data.update({ "sys_created_by": created_by, "sys_notes": notes, "sys_version": 1 }) model = self.Modelo(**data) self.session.add(model) self.session.commit() return model.id def add_many(self, dominios: list[TDominio], created_by: Optional[str] = None, notes: Optional[str] = None) -> list[str]: ids = [] for dominio in dominios: data = self.Mapper.to_dict(dominio) data.update({ "sys_created_by": created_by, "sys_notes": notes, "sys_version": 1 }) model = self.Modelo(**data) self.session.add(model) ids.append(model.id) self.session.commit() return ids # ---------------------- # GET # ---------------------- def get_by_id(self, id_: str) -> Optional[TDominio]: model = self.session.query(self.Modelo).filter_by(id=id_, sys_deleted_at=None).first() return self.Mapper.from_model(model) if model else None def get_all(self) -> list[TDominio]: models = self.session.query(self.Modelo).filter_by(sys_deleted_at=None).all() return self.Mapper.from_model_list(models) def get_paginated(self, offset: int = 0, limit: int = 10) -> list[TDominio]: models = self.session.query(self.Modelo).filter_by(sys_deleted_at=None).offset(offset).limit(limit).all() return self.Mapper.from_model_list(models) def get_deleted(self) -> list[TDominio]: models = self.session.query(self.Modelo).filter(self.Modelo.sys_deleted_at.isnot(None)).all() return self.Mapper.from_model_list(models) # ---------------------- # UPDATE # ---------------------- def update(self, id_: str, new_data: dict, updated_by: Optional[str] = None, notes: Optional[str] = None) -> bool: model = self.session.query(self.Modelo).filter_by(id=id_, sys_deleted_at=None).first() if not model: return False for key, value in new_data.items(): if hasattr(model, key): setattr(model, key, value) model.sys_updated_by = updated_by model.sys_notes = notes model.sys_version = (model.sys_version or 1) + 1 self.session.commit() return True def bulk_update(self, updates: list[tuple[str, dict]], updated_by: Optional[str] = None, notes: Optional[str] = None) -> int: count = 0 for id_, data in updates: if self.update(id_, data, updated_by=updated_by, notes=notes): count += 1 return count # ---------------------- # DELETE # ---------------------- def delete_by_id(self, id_: str) -> bool: model = self.session.query(self.Modelo).filter_by(id=id_).first() if model: self.session.delete(model) self.session.commit() return True return False def delete_all(self) -> int: deleted = self.session.query(self.Modelo).delete() self.session.commit() return deleted # ---------------------- # SOFT DELETE # ---------------------- def soft_delete(self, id_: str, deleted_by: Optional[str] = None, notes: Optional[str] = None) -> bool: model = self.session.query(self.Modelo).filter_by(id=id_, sys_deleted_at=None).first() if model: model.sys_deleted_at = datetime.now(timezone.utc) # TZ-aware model.sys_updated_by = deleted_by model.sys_notes = notes model.sys_version = (model.sys_version or 1) + 1 self.session.commit() return True return False def soft_restore(self, id_: str, restored_by: Optional[str] = None, notes: Optional[str] = None) -> bool: model = self.session.query(self.Modelo).filter_by(id=id_).first() if model and model.sys_deleted_at is not None: model.sys_deleted_at = None model.sys_updated_by = restored_by model.sys_notes = notes model.sys_version = (model.sys_version or 1) + 1 self.session.commit() return True return False # ---------------------- # OTROS # ---------------------- def exists(self, id_: str) -> bool: return self.session.query(self.Modelo).filter_by(id=id_, sys_deleted_at=None).first() is not None def count(self) -> int: return self.session.query(self.Modelo).filter_by(sys_deleted_at=None).count()