Cambios a las 3 bases Model mapper repo para que funcionen a partir de las clases heredando todos los metodos comunes

This commit is contained in:
2025-05-12 01:24:44 +02:00
parent 712bd877b8
commit bf1814bb8e
18 changed files with 992 additions and 361 deletions
+45 -33
View File
@@ -8,6 +8,13 @@ from src.base import Base
from src.ApiKeys.openai_apikey import OpenAICredencial
from src.Security.Encriptar import Encriptar_fernet
from entrypoint import ENV_PATH
from src.ArquitectureLayer.Mapper import Mapper_base
from sqlalchemy import Column, String
from src.ArquitectureLayer.Model import Model_base
from src.ArquitectureLayer.Repo import Repo_base
# ----------------------
# Cargar clave maestra
@@ -21,7 +28,7 @@ if pssword is None:
# MODELO (SQLAlchemy)
# ----------------------
class OpenAICredencialModel(Base):
class OpenAICredencialModel(Base, Model_base):
__tablename__ = 'openai_credenciales'
id = Column(String, primary_key=True)
@@ -33,7 +40,30 @@ class OpenAICredencialModel(Base):
# MAPPER
# ----------------------
class OpenAICredencialMapper:
class OpenAICredencialMapper(Mapper_base[OpenAICredencial, OpenAICredencialModel]):
@staticmethod
def to_model(obj: OpenAICredencial) -> OpenAICredencialModel:
return OpenAICredencialModel(
id=obj.id,
titulo=obj.titulo,
api_key=base64.b64encode(
Encriptar_fernet.encriptar(obj.api_key, pssword)
).decode("utf-8"),
organizacion=obj.organizacion
)
@staticmethod
def from_model(model: OpenAICredencialModel) -> OpenAICredencial:
return OpenAICredencial(
id=model.id,
titulo=model.titulo,
api_key=Encriptar_fernet.desencriptar(
base64.b64decode(model.api_key), pssword
),
organizacion=model.organizacion
)
@staticmethod
def to_dict(obj: OpenAICredencial) -> dict:
return {
@@ -41,7 +71,7 @@ class OpenAICredencialMapper:
"titulo": obj.titulo,
"api_key": base64.b64encode(
Encriptar_fernet.encriptar(obj.api_key, pssword)
).decode('utf-8'),
).decode("utf-8"),
"organizacion": obj.organizacion
}
@@ -56,40 +86,22 @@ class OpenAICredencialMapper:
organizacion=data.get("organizacion")
)
@staticmethod
def from_model(model: OpenAICredencialModel) -> OpenAICredencial:
return OpenAICredencial(
id=model.id,
titulo=model.titulo,
api_key=Encriptar_fernet.desencriptar(
base64.b64decode(model.api_key), pssword
),
organizacion=model.organizacion
)
# ----------------------
# REPO
# ----------------------
class OpenAICredencialRepo:
class OpenAICredencialRepo(Repo_base[OpenAICredencialModel, OpenAICredencial]):
def __init__(self, conexion: ConexionBase):
self.session = conexion.get_session()
def add(self, credencial: OpenAICredencial) -> str:
data = OpenAICredencialMapper.to_dict(credencial)
model = OpenAICredencialModel(**data)
self.session.add(model)
self.session.commit()
return model.id
def get_all(self) -> list[OpenAICredencial]:
models = self.session.query(OpenAICredencialModel).all()
return [OpenAICredencialMapper.from_model(m) for m in models]
super().__init__(
session=conexion.get_session(),
modelo=OpenAICredencialModel,
mapper=OpenAICredencialMapper
)
def get_by_titulo(self, titulo: str) -> OpenAICredencial | None:
model = self.session.query(OpenAICredencialModel).filter_by(titulo=titulo).first()
return OpenAICredencialMapper.from_model(model) if model else None
def get_by_id(self, id_: str) -> OpenAICredencial | None:
model = self.session.get(OpenAICredencialModel, id_)
return OpenAICredencialMapper.from_model(model) if model else None
model = (
self.session.query(self.Modelo)
.filter_by(titulo=titulo, sys_deleted_at=None)
.first()
)
return self.Mapper.from_model(model) if model else None
+75
View File
@@ -0,0 +1,75 @@
# src\ArquitectureLayer\Mapper.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Type
import json
TDominio = TypeVar("TDominio")
TModelo = TypeVar("TModelo")
class Mapper_base(ABC, Generic[TDominio, TModelo]):
# ----------------------------
# Conversiones individuales
# ----------------------------
@staticmethod
@abstractmethod
def to_model(obj: TDominio) -> TModelo:
"""Convierte objeto de dominio a modelo ORM"""
pass
@staticmethod
@abstractmethod
def from_model(model: TModelo) -> TDominio:
"""Convierte modelo ORM a objeto de dominio"""
pass
@staticmethod
@abstractmethod
def to_dict(obj: TDominio) -> dict:
"""Convierte objeto de dominio a diccionario plano"""
pass
@staticmethod
@abstractmethod
def from_dict(d: dict) -> TDominio:
"""Convierte diccionario plano a objeto de dominio"""
pass
@classmethod
def to_json(cls, obj: TDominio) -> str:
"""Convierte objeto de dominio a JSON string"""
return json.dumps(cls.to_dict(obj), default=str)
@classmethod
def from_json(cls, json_str: str) -> TDominio:
"""Convierte JSON string a objeto de dominio"""
return cls.from_dict(json.loads(json_str))
# ----------------------------
# Conversiones en lote (bulk)
# ----------------------------
@classmethod
def to_model_list(cls, objs: list[TDominio]) -> list[TModelo]:
return [cls.to_model(o) for o in objs]
@classmethod
def from_model_list(cls, models: list[TModelo]) -> list[TDominio]:
return [cls.from_model(m) for m in models]
@classmethod
def to_dict_list(cls, objs: list[TDominio]) -> list[dict]:
return [cls.to_dict(o) for o in objs]
@classmethod
def from_dict_list(cls, dicts: list[dict]) -> list[TDominio]:
return [cls.from_dict(d) for d in dicts]
@classmethod
def to_json_list(cls, objs: list[TDominio]) -> str:
return json.dumps(cls.to_dict_list(objs), default=str)
@classmethod
def from_json_list(cls, json_str: str) -> list[TDominio]:
return cls.from_dict_list(json.loads(json_str))
+57
View File
@@ -0,0 +1,57 @@
# src\ArquitectureLayer\Model.py
from sqlalchemy import Column, DateTime, String, Integer, Text, func
from sqlalchemy.ext.declarative import declared_attr, as_declarative
from datetime import datetime
@as_declarative()
class Model_base:
__abstract__ = True
@declared_attr
def sys_created_at(cls):
return Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
@declared_attr
def sys_created_by(cls):
return Column(String, nullable=True)
@declared_attr
def sys_updated_at(cls):
return Column(DateTime(timezone=True), onupdate=func.now(), nullable=True)
@declared_attr
def sys_updated_by(cls):
return Column(String, nullable=True)
@declared_attr
def sys_version(cls):
return Column(Integer, default=1, nullable=False)
@declared_attr
def sys_notes(cls):
return Column(Text, nullable=True)
@declared_attr
def sys_deleted_at(cls):
return Column(DateTime(timezone=True), nullable=True)
def __repr__(self):
id_val = getattr(self, "id", None)
return f"<{self.__class__.__name__} id={id_val}>"
def __str__(self):
cls = self.__class__.__name__
id_val = getattr(self, "id", None)
return f"{cls}(id={id_val})"
def __json__(self) -> dict:
"""Devuelve una representación JSON serializable (dict plano)."""
out = {}
for attr in self.__table__.columns:
val = getattr(self, attr.name)
if isinstance(val, datetime):
out[attr.name] = val.isoformat()
else:
out[attr.name] = val
return out
+148
View File
@@ -0,0 +1,148 @@
# src\ArquitectureLayer\Repo.py
from abc import ABC
from typing import Type, TypeVar, Generic, Optional
from sqlalchemy.orm import Session
from sqlalchemy import func
from datetime import datetime
from src.ArquitectureLayer.Mapper import Mapper_base # Asegúrate de importar tu ABC base
TModelo = TypeVar("TModelo")
TDominio = TypeVar("TDominio")
class Repo_base(ABC, Generic[TModelo, TDominio]):
def __init__(self, session: Session, modelo: type[TModelo], mapper: type[Mapper_base[TDominio, TModelo]]):
self.session = session
self.Modelo = modelo
self.Mapper = mapper
# ----------------------
# ADD
# ----------------------
def add(self, dominio: TDominio, created_by: Optional[str] = None, notes: Optional[str] = None) -> str:
data = self.Mapper.to_dict(dominio)
data.update({
"sys_created_by": created_by,
"sys_notes": notes,
"sys_version": 1
})
model = self.Modelo(**data)
self.session.add(model)
self.session.commit()
return model.id
def add_many(self, dominios: list[TDominio], created_by: Optional[str] = None, notes: Optional[str] = None) -> list[str]:
ids = []
for dominio in dominios:
data = self.Mapper.to_dict(dominio)
data.update({
"sys_created_by": created_by,
"sys_notes": notes,
"sys_version": 1
})
model = self.Modelo(**data)
self.session.add(model)
ids.append(model.id)
self.session.commit()
return ids
# ----------------------
# GET
# ----------------------
def get_by_id(self, id_: str) -> Optional[TDominio]:
model = self.session.query(self.Modelo).filter_by(id=id_, sys_deleted_at=None).first()
return self.Mapper.from_model(model) if model else None
def get_all(self) -> list[TDominio]:
models = self.session.query(self.Modelo).filter_by(sys_deleted_at=None).all()
return self.Mapper.from_model_list(models)
def get_paginated(self, offset: int = 0, limit: int = 10) -> list[TDominio]:
models = self.session.query(self.Modelo).filter_by(sys_deleted_at=None).offset(offset).limit(limit).all()
return self.Mapper.from_model_list(models)
def get_deleted(self) -> list[TDominio]:
models = self.session.query(self.Modelo).filter(self.Modelo.sys_deleted_at.isnot(None)).all()
return self.Mapper.from_model_list(models)
# ----------------------
# UPDATE
# ----------------------
def update(self, id_: str, new_data: dict, updated_by: Optional[str] = None, notes: Optional[str] = None) -> bool:
model = self.session.query(self.Modelo).filter_by(id=id_, sys_deleted_at=None).first()
if not model:
return False
for key, value in new_data.items():
if hasattr(model, key):
setattr(model, key, value)
model.sys_updated_by = updated_by
model.sys_notes = notes
model.sys_version = (model.sys_version or 1) + 1
self.session.commit()
return True
def bulk_update(self, updates: list[tuple[str, dict]], updated_by: Optional[str] = None, notes: Optional[str] = None) -> int:
count = 0
for id_, data in updates:
if self.update(id_, data, updated_by=updated_by, notes=notes):
count += 1
return count
# ----------------------
# DELETE
# ----------------------
def delete_by_id(self, id_: str) -> bool:
model = self.session.query(self.Modelo).filter_by(id=id_).first()
if model:
self.session.delete(model)
self.session.commit()
return True
return False
def delete_all(self) -> int:
deleted = self.session.query(self.Modelo).delete()
self.session.commit()
return deleted
# ----------------------
# SOFT DELETE
# ----------------------
def soft_delete(self, id_: str, deleted_by: Optional[str] = None, notes: Optional[str] = None) -> bool:
model = self.session.query(self.Modelo).filter_by(id=id_, sys_deleted_at=None).first()
if model:
model.sys_deleted_at = datetime.now()
model.sys_updated_by = deleted_by
model.sys_notes = notes
model.sys_version = (model.sys_version or 1) + 1
self.session.commit()
return True
return False
def soft_restore(self, id_: str, restored_by: Optional[str] = None, notes: Optional[str] = None) -> bool:
model = self.session.query(self.Modelo).filter_by(id=id_).first()
if model and model.sys_deleted_at is not None:
model.sys_deleted_at = None
model.sys_updated_by = restored_by
model.sys_notes = notes
model.sys_version = (model.sys_version or 1) + 1
self.session.commit()
return True
return False
# ----------------------
# OTROS
# ----------------------
def exists(self, id_: str) -> bool:
return self.session.query(self.Modelo).filter_by(id=id_, sys_deleted_at=None).first() is not None
def count(self) -> int:
return self.session.query(self.Modelo).filter_by(sys_deleted_at=None).count()
+49 -36
View File
@@ -2,6 +2,13 @@ import os
from dotenv import load_dotenv
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from sqlalchemy import DateTime, Text, func
import base64
from src.ArquitectureLayer.Mapper import Mapper_base
from src.ArquitectureLayer.Model import Model_base
from src.ArquitectureLayer.Repo import Repo_base
from src.ConexionSql.Base_conexion import ConexionBase
from src.base import Base
@@ -21,7 +28,7 @@ if pssword is None:
# MODELO (SQLAlchemy)
# ----------------------
class PostgresCredencialModel(Base):
class PostgresCredencialModel(Base, Model_base):
__tablename__ = 'postgres_credenciales'
id = Column(String, primary_key=True)
@@ -36,9 +43,36 @@ class PostgresCredencialModel(Base):
# MAPPER
# ----------------------
import base64
class PostgresCredencialMapper(Mapper_base[PostgresCredencial, PostgresCredencialModel]):
@staticmethod
def to_model(obj: PostgresCredencial) -> PostgresCredencialModel:
return PostgresCredencialModel(
id=obj.id,
titulo=obj.titulo,
host=obj.host,
port=obj.port,
dbname=obj.dbname,
user=obj.user,
password=base64.b64encode(
Encriptar_fernet.encriptar(obj.password, pssword)
).decode('utf-8')
)
@staticmethod
def from_model(model: PostgresCredencialModel) -> PostgresCredencial:
return PostgresCredencial(
id=model.id,
titulo=model.titulo,
host=model.host,
port=model.port,
dbname=model.dbname,
user=model.user,
password=Encriptar_fernet.desencriptar(
base64.b64decode(model.password), pssword
)
)
class PostgresCredencialMapper:
@staticmethod
def to_dict(obj: PostgresCredencial) -> dict:
return {
@@ -67,43 +101,22 @@ class PostgresCredencialMapper:
)
)
@staticmethod
def from_model(model: PostgresCredencialModel) -> PostgresCredencial:
return PostgresCredencial(
id=model.id,
titulo=model.titulo,
host=model.host,
port=model.port,
dbname=model.dbname,
user=model.user,
password=Encriptar_fernet.desencriptar(
base64.b64decode(model.password), pssword
)
)
# ----------------------
# REPO
# ----------------------
class PostgresCredencialRepo:
class PostgresCredencialRepo(Repo_base[PostgresCredencialModel, PostgresCredencial]):
def __init__(self, conexion: ConexionBase):
self.session = conexion.get_session()
def add(self, credencial: PostgresCredencial) -> str:
data = PostgresCredencialMapper.to_dict(credencial)
model = PostgresCredencialModel(**data)
self.session.add(model)
self.session.commit()
return model.id
def get_all(self) -> list[PostgresCredencial]:
models = self.session.query(PostgresCredencialModel).all()
return [PostgresCredencialMapper.from_model(m) for m in models]
super().__init__(
session=conexion.get_session(),
modelo=PostgresCredencialModel,
mapper=PostgresCredencialMapper
)
def get_by_titulo(self, titulo: str) -> PostgresCredencial | None:
model = self.session.query(PostgresCredencialModel).filter_by(titulo=titulo).first()
return PostgresCredencialMapper.from_model(model) if model else None
def get_by_id(self, id_: str) -> PostgresCredencial | None:
model = self.session.get(PostgresCredencialModel, id_)
return PostgresCredencialMapper.from_model(model) if model else None
model = (
self.session.query(self.Modelo)
.filter_by(titulo=titulo, sys_deleted_at=None)
.first()
)
return self.Mapper.from_model(model) if model else None
+38 -25
View File
@@ -1,6 +1,12 @@
import os
from dotenv import load_dotenv
from sqlalchemy import Column, String
from sqlalchemy import Column, String, ForeignKey
from src.ArquitectureLayer.Mapper import Mapper_base
from src.ArquitectureLayer.Model import Model_base
from src.ArquitectureLayer.Repo import Repo_base
from src.ConexionSql.Base_conexion import ConexionBase
from src.base import Base
from src.Security.GenerarIDs import GeneradorIDUnico
@@ -17,18 +23,36 @@ load_dotenv(ENV_PATH)
# MODELO (SQLAlchemy)
# ----------------------
class OpenAIEmbedderModel(Base):
class OpenAIEmbedderModel(Base, Model_base):
__tablename__ = "openai_embedders"
id = Column(String, primary_key=True)
api_key_id = Column(String, nullable=False) # ID de la credencial asociada (clave foránea lógica)
api_key_id = Column(String, ForeignKey("openai_credenciales.id"), nullable=False)
model = Column(String, nullable=False)
# ----------------------
# MAPPER
# ----------------------
class OpenAIEmbedderMapper:
class OpenAIEmbedderMapper(Mapper_base[OpenAIEmbedder, OpenAIEmbedderModel]):
@staticmethod
def to_model(obj: OpenAIEmbedder) -> OpenAIEmbedderModel:
return OpenAIEmbedderModel(
id=obj.id,
api_key_id=obj.client.credencial.id,
model=obj.model
)
@staticmethod
def from_model(model: OpenAIEmbedderModel, credencial: OpenAICredencial) -> OpenAIEmbedder:
return OpenAIEmbedder(
id=model.id,
credencial=credencial,
model=model.model
)
@staticmethod
def to_dict(obj: OpenAIEmbedder) -> dict:
return {
@@ -45,39 +69,28 @@ class OpenAIEmbedderMapper:
model=data["model"]
)
@staticmethod
def from_model(model: OpenAIEmbedderModel, credencial: OpenAICredencial) -> OpenAIEmbedder:
return OpenAIEmbedder(
id=model.id,
credencial=credencial,
model=model.model
)
# ----------------------
# REPO
# ----------------------
class OpenAIEmbedderRepo:
class OpenAIEmbedderRepo(Repo_base[OpenAIEmbedderModel, OpenAIEmbedder]):
def __init__(self, conexion: ConexionBase):
self.session = conexion.get_session()
def add(self, embedder: OpenAIEmbedder) -> str:
data = OpenAIEmbedderMapper.to_dict(embedder)
model = OpenAIEmbedderModel(**data)
self.session.add(model)
self.session.commit()
return model.id
super().__init__(
session=conexion.get_session(),
modelo=OpenAIEmbedderModel,
mapper=OpenAIEmbedderMapper
)
def get_by_id(self, id_: str, credencial: OpenAICredencial) -> OpenAIEmbedder | None:
model = self.session.get(OpenAIEmbedderModel, id_)
return OpenAIEmbedderMapper.from_model(model, credencial) if model else None
model = self.session.get(self.Modelo, id_)
return self.Mapper.from_model(model, credencial) if model else None
def get_all(self, credencial_loader: callable) -> list[OpenAIEmbedder]:
"""
:param credencial_loader: función que recibe un api_key_id y devuelve una instancia de OpenAICredencial
"""
models = self.session.query(OpenAIEmbedderModel).all()
models = self.session.query(self.Modelo).all()
return [
OpenAIEmbedderMapper.from_model(m, credencial_loader(m.api_key_id))
self.Mapper.from_model(m, credencial_loader(m.api_key_id))
for m in models
]
]
+64 -30
View File
@@ -2,6 +2,12 @@ import os
from dotenv import load_dotenv
from sqlalchemy import Column, Integer, String, Float, Boolean
from src.ArquitectureLayer.Mapper import Mapper_base
from src.ArquitectureLayer.Model import Model_base
from src.ArquitectureLayer.Repo import Repo_base
from typing import Optional
from src.ConexionSql.Base_conexion import ConexionBase
from src.base import Base
from src.Llms.Modelos.Openai_model import ModeloOpenAI # Clase real de lógica
@@ -19,23 +25,54 @@ if pssword is None:
# MODELO (SQLAlchemy)
# ----------------------
class ModeloOpenAIConfigModel(Base):
class ModeloOpenAIConfigModel(Base, Model_base):
__tablename__ = 'modelo_openai_configs'
id = Column(String, primary_key=True)
model = Column(String, nullable=False)
temperature = Column(Float, default=0.7)
top_p = Column(Float, default=1.0)
temperature = Column(Float, default=0.7, nullable=False)
top_p = Column(Float, default=1.0, nullable=False)
top_k = Column(Integer, nullable=True)
frecuencia_penalizacion = Column(Float, default=0.0)
num_tokens_maximos = Column(Integer, default=512)
use_legacy = Column(Boolean, default=False)
frecuencia_penalizacion = Column(Float, default=0.0, nullable=False)
num_tokens_maximos = Column(Integer, default=512, nullable=False)
use_legacy = Column(Boolean, default=False, nullable=False)
# ----------------------
# MAPPER
# ----------------------
class ModeloOpenAIConfigMapper:
class ModeloOpenAIConfigMapper(Mapper_base[ModeloOpenAI, ModeloOpenAIConfigModel]):
@staticmethod
def to_model(obj: ModeloOpenAI) -> ModeloOpenAIConfigModel:
return ModeloOpenAIConfigModel(
id=obj.id,
model=obj.model,
temperature=obj.temperature,
top_p=obj.top_p,
top_k=obj.top_k,
frecuencia_penalizacion=obj.frecuencia_penalizacion,
num_tokens_maximos=obj.num_tokens_maximos,
use_legacy=obj.use_legacy
)
@staticmethod
def from_model(model: ModeloOpenAIConfigModel, cliente: Optional[object] = None) -> ModeloOpenAI:
return ModeloOpenAI(
id=model.id,
cliente=cliente,
model=model.model,
temperature=model.temperature,
top_p=model.top_p,
top_k=model.top_k,
frecuencia_penalizacion=model.frecuencia_penalizacion,
num_tokens_maximos=model.num_tokens_maximos,
use_legacy=model.use_legacy
)
@staticmethod
def to_dict(obj: ModeloOpenAI) -> dict:
return {
@@ -50,39 +87,36 @@ class ModeloOpenAIConfigMapper:
}
@staticmethod
def from_model(model: ModeloOpenAIConfigModel, cliente: object) -> ModeloOpenAI:
def from_dict(data: dict, cliente: Optional[object] = None) -> ModeloOpenAI:
return ModeloOpenAI(
id=model.id,
id=data["id"],
cliente=cliente,
model=model.model,
temperature=model.temperature,
top_p=model.top_p,
top_k=model.top_k,
frecuencia_penalizacion=model.frecuencia_penalizacion,
num_tokens_maximos=model.num_tokens_maximos,
use_legacy=model.use_legacy
model=data["model"],
temperature=data["temperature"],
top_p=data["top_p"],
top_k=data["top_k"],
frecuencia_penalizacion=data["frecuencia_penalizacion"],
num_tokens_maximos=data["num_tokens_maximos"],
use_legacy=data["use_legacy"]
)
# ----------------------
# REPO
# ----------------------
class ModeloOpenAIConfigRepo:
class ModeloOpenAIConfigRepo(Repo_base[ModeloOpenAIConfigModel, ModeloOpenAI]):
def __init__(self, conexion: ConexionBase, cliente: object):
self.session = conexion.get_session()
self.cliente = cliente # Necesario para crear ModeloOpenAI
def add(self, config: ModeloOpenAI) -> str:
data = ModeloOpenAIConfigMapper.to_dict(config)
model = ModeloOpenAIConfigModel(**data)
self.session.add(model)
self.session.commit()
return model.id
super().__init__(
session=conexion.get_session(),
modelo=ModeloOpenAIConfigModel,
mapper=ModeloOpenAIConfigMapper
)
self.cliente = cliente # Necesario para construir el dominio con lógica
def get_by_id(self, id_: str) -> ModeloOpenAI | None:
model = self.session.get(ModeloOpenAIConfigModel, id_)
return ModeloOpenAIConfigMapper.from_model(model, self.cliente) if model else None
model = self.session.get(self.Modelo, id_)
return self.Mapper.from_model(model, self.cliente) if model else None
def get_all(self) -> list[ModeloOpenAI]:
models = self.session.query(ModeloOpenAIConfigModel).all()
return [ModeloOpenAIConfigMapper.from_model(m, self.cliente) for m in models]
models = self.session.query(self.Modelo).all()
return [self.Mapper.from_model(m, self.cliente) for m in models]
+1 -1
View File
@@ -3,7 +3,7 @@ from src.Llms.Embedders.Base_Embedder import EmbedderABC # Asegúrate de que es
from typing import List, Optional
from src.ConexionSql.Base_conexion import ConexionBase
from sqlalchemy import MetaData # Asegúrate de importar esto
from src.TextManager.notas_biblioteca_mmr import generar_tabla_nota_para_biblioteca # Ajusta si es necesario
from src.TextManager.notas_mmr import generar_tabla_nota_para_biblioteca # Ajusta si es necesario
from sqlalchemy import inspect
+53 -42
View File
@@ -3,6 +3,10 @@ import base64
from dotenv import load_dotenv
from sqlalchemy import Column, String, Integer
from src.ArquitectureLayer.Mapper import Mapper_base
from src.ArquitectureLayer.Model import Model_base
from src.ArquitectureLayer.Repo import Repo_base
from src.ConexionSql.Base_conexion import ConexionBase
from src.base import Base
from src.Security.Encriptar import Encriptar_fernet
@@ -23,39 +27,32 @@ if pssword is None:
# MODELO (SQLAlchemy)
# ----------------------
class BibliotecaModel(Base):
class BibliotecaModel(Base, Model_base):
__tablename__ = "bibliotecas"
id = Column(String, primary_key=True, unique=True)
nombre = Column(String, nullable=False, unique=True)
descripcion = Column(String, default="")
descripcion = Column(String, nullable=False, default="")
vector_dim = Column(Integer, nullable=False)
embedder_info = Column(String, nullable=True) # Se puede guardar nombre de clase o config encriptada
embedder_info = Column(String, nullable=True) # Nombre de clase, ID de configuración, o info encriptada del embedder
# ----------------------
# MAPPER
# ----------------------
class BibliotecaMapper:
@staticmethod
def to_dict(obj: Biblioteca) -> dict:
embedder_info = type(obj.embedder).__name__ if obj.embedder else None
return {
"id": obj.id,
"nombre": obj.nombre,
"descripcion": obj.descripcion,
"vector_dim": obj.vector_dim,
"embedder_info": embedder_info # sin codificar ni encriptar
}
class BibliotecaMapper(Mapper_base[Biblioteca, BibliotecaModel]):
@staticmethod
def from_dict(data: dict) -> Biblioteca:
return Biblioteca(
id=data["id"],
nombre=data["nombre"],
descripcion=data["descripcion"],
vector_dim=data["vector_dim"],
embedder=None # Mantienes la lógica actual de no restaurarlo automáticamente
def to_model(obj: Biblioteca) -> BibliotecaModel:
return BibliotecaModel(
id=obj.id,
nombre=obj.nombre,
descripcion=obj.descripcion,
vector_dim=obj.vector_dim
# embedder no se serializa en el modelo, se maneja por separado
)
@staticmethod
@@ -65,37 +62,51 @@ class BibliotecaMapper:
nombre=model.nombre,
descripcion=model.descripcion,
vector_dim=model.vector_dim,
embedder=None # Se puede cargar manualmente si es necesario
embedder=None # se deja para inyección posterior si es necesario
)
@staticmethod
def to_dict(obj: Biblioteca) -> dict:
embedder_info = type(obj.embedder).__name__ if obj.embedder else None
return {
"id": obj.id,
"nombre": obj.nombre,
"descripcion": obj.descripcion,
"vector_dim": obj.vector_dim,
"embedder_info": embedder_info
}
@staticmethod
def from_dict(data: dict) -> Biblioteca:
return Biblioteca(
id=data["id"],
nombre=data["nombre"],
descripcion=data["descripcion"],
vector_dim=data["vector_dim"],
embedder=None # inyección manual si se desea
)
# ----------------------
# REPO
# ----------------------
class BibliotecaRepo:
class BibliotecaRepo(Repo_base[BibliotecaModel, Biblioteca]):
def __init__(self, conexion: ConexionBase):
self.session = conexion.get_session()
super().__init__(
session=conexion.get_session(),
modelo=BibliotecaModel,
mapper=BibliotecaMapper
)
def add(self, biblioteca: Biblioteca) -> str:
# Verificar si ya existe una biblioteca con el mismo nombre
existente = self.session.query(BibliotecaModel).filter_by(nombre=biblioteca.nombre).first()
def add(self, biblioteca: Biblioteca, created_by: str = None, notes: str = None) -> str:
# Lógica personalizada: prevenir duplicados por nombre
existente = self.session.query(self.Modelo).filter_by(nombre=biblioteca.nombre).first()
if existente:
raise ValueError(f"Ya existe una biblioteca con el nombre '{biblioteca.nombre}'")
data = BibliotecaMapper.to_dict(biblioteca)
model = BibliotecaModel(**data)
self.session.add(model)
self.session.commit()
return model.id
def get_all(self) -> list[Biblioteca]:
models = self.session.query(BibliotecaModel).all()
return [BibliotecaMapper.from_model(m) for m in models]
return super().add(biblioteca, created_by=created_by, notes=notes)
def get_by_nombre(self, nombre: str) -> Biblioteca | None:
model = self.session.query(BibliotecaModel).filter_by(nombre=nombre).first()
return BibliotecaMapper.from_model(model) if model else None
def get_by_id(self, id_: str) -> Biblioteca | None:
model = self.session.get(BibliotecaModel, id_)
return BibliotecaMapper.from_model(model) if model else None
model = self.session.query(self.Modelo).filter_by(nombre=nombre, sys_deleted_at=None).first()
return self.Mapper.from_model(model) if model else None
-112
View File
@@ -1,112 +0,0 @@
import os
from dotenv import load_dotenv
from sqlalchemy import Column, String, Table, MetaData
from pgvector.sqlalchemy import Vector
from sqlalchemy.orm import registry, Session
from src.TextManager.nota import Nota
from src.ConexionSql.Base_conexion import ConexionBase
# ----------------------
# Cargar .env
# ----------------------
from entrypoint import ENV_PATH
load_dotenv(ENV_PATH)
# ----------------------
# REGISTRO DINÁMICO PARA TABLAS
# ----------------------
mapper_registry = registry()
# ----------------------
# FUNCIONES AUXILIARES
# ----------------------
def generar_tabla_nota_para_biblioteca(biblioteca_nombre: str, vector_dim: int, metadata: MetaData):
nombre_tabla = biblioteca_nombre.lower().replace(" ", "_")
tabla = Table(
nombre_tabla,
metadata,
Column("id", String, primary_key=True),
Column("titulo", String, nullable=False),
Column("tags", String),
Column("conexiones", String),
Column("texto", String),
Column("resumen", String),
Column("vector", Vector(vector_dim), nullable=True),
Column("vector_resumen", Vector(vector_dim), nullable=True),
)
class NotaModel:
def __init__(self, nota: Nota):
self.id = nota.id
self.titulo = nota.titulo
self.tags = ",".join(nota.tags)
self.conexiones = ",".join(nota.conexiones)
self.texto = nota.texto
self.resumen = nota.resumen
self.vector = nota.vector
self.vector_resumen = getattr(nota, "vector_resumen", None)
# Evitar mapear dos veces la misma clase
if NotaModel not in mapper_registry._class_registry.values():
mapper_registry.map_imperatively(NotaModel, tabla)
return tabla, NotaModel
# ----------------------
# MAPPER
# ----------------------
class NotaMapper:
@staticmethod
def to_dict(nota: Nota) -> dict:
return {
"id": nota.id,
"titulo": nota.titulo,
"tags": ",".join(nota.tags),
"conexiones": ",".join(nota.conexiones),
"texto": nota.texto,
"resumen": nota.resumen,
"vector": nota.vector,
"vector_resumen": nota.vector_resumen
}
@staticmethod
def from_model(model) -> Nota:
return Nota(
id=model.id,
titulo=model.titulo,
tags=model.tags.split(",") if model.tags else [],
conexiones=model.conexiones.split(",") if model.conexiones else [],
texto=model.texto,
resumen=model.resumen,
vector=list(model.vector) if model.vector is not None else None,
vector_resumen=list(model.vector_resumen) if model.vector_resumen is not None else None
)
# ----------------------
# REPO
# ----------------------
class NotaRepo:
def __init__(self, session: Session, modelo_nota):
if modelo_nota is None:
raise ValueError("No se puede instanciar NotaRepo sin un modelo válido de nota.")
self.session = session
self.ModeloNota = modelo_nota
def add(self, nota: Nota) -> str:
model = self.ModeloNota(nota)
self.session.add(model)
self.session.commit()
return model.id
def get_by_id(self, id_: str) -> Nota | None:
model = self.session.get(self.ModeloNota, id_)
return NotaMapper.from_model(model) if model else None
def get_all(self) -> list[Nota]:
models = self.session.query(self.ModeloNota).all()
return [NotaMapper.from_model(m) for m in models]
+174
View File
@@ -0,0 +1,174 @@
import os
from dotenv import load_dotenv
from sqlalchemy import Column, String, Table, MetaData
from pgvector.sqlalchemy import Vector
from sqlalchemy.orm import registry, Session
from src.TextManager.nota import Nota
from src.ConexionSql.Base_conexion import ConexionBase
from typing import Tuple
import re
from src.ArquitectureLayer.Mapper import Mapper_base
from src.ArquitectureLayer.Model import Model_base
from src.ArquitectureLayer.Repo import Repo_base
# ----------------------
# Cargar .env
# ----------------------
from entrypoint import ENV_PATH
load_dotenv(ENV_PATH)
# ----------------------
# REGISTRO DINÁMICO PARA TABLAS
# ----------------------
mapper_registry = registry()
# ----------------------
# FUNCIONES AUXILIARES
# ----------------------
def generar_tabla_nota_para_biblioteca(biblioteca_nombre: str, vector_dim: int, metadata: MetaData) -> Tuple[Table, type]:
"""
Genera una tabla dinámica y modelo ORM para una biblioteca dada, con campos vectoriales.
"""
# Normalización robusta del nombre de la tabla
nombre_tabla = re.sub(r"[^a-zA-Z0-9_]", "_", biblioteca_nombre.strip().lower())
tabla = Table(
nombre_tabla,
metadata,
Column("id", String, primary_key=True),
Column("titulo", String, nullable=False),
Column("tags", String),
Column("conexiones", String),
Column("texto", String),
Column("resumen", String),
Column("vector", Vector(vector_dim), nullable=True),
Column("vector_resumen", Vector(vector_dim), nullable=True),
*Model_base.__table__.columns # Añade columnas del sistema si Model_base es declarativo
)
class NotaModel(Model_base):
"""
Modelo ORM generado dinámicamente para notas de una biblioteca.
"""
def __init__(self, nota):
self.id = nota.id
self.titulo = nota.titulo
self.tags = ",".join(nota.tags)
self.conexiones = ",".join(nota.conexiones)
self.texto = nota.texto
self.resumen = nota.resumen
self.vector = nota.vector
self.vector_resumen = getattr(nota, "vector_resumen", None)
# Registrar solo si aún no se ha hecho
if NotaModel.__name__ not in mapper_registry._class_registry:
mapper_registry.map_imperatively(NotaModel, tabla)
return tabla, NotaModel
# ----------------------
# MAPPER
# ----------------------
class NotaMapper(Mapper_base[Nota, object]): # Usa `object` si el modelo es dinámico
@staticmethod
def to_model(nota: Nota):
return {
"id": nota.id,
"titulo": nota.titulo,
"tags": ",".join(nota.tags),
"conexiones": ",".join(nota.conexiones),
"texto": nota.texto,
"resumen": nota.resumen,
"vector": nota.vector,
"vector_resumen": nota.vector_resumen
}
@staticmethod
def from_model(model) -> Nota:
return Nota(
id=model.id,
titulo=model.titulo,
tags=model.tags.split(",") if model.tags else [],
conexiones=model.conexiones.split(",") if model.conexiones else [],
texto=model.texto,
resumen=model.resumen,
vector=list(model.vector) if model.vector is not None else None,
vector_resumen=list(model.vector_resumen) if model.vector_resumen is not None else None
)
@staticmethod
def to_dict(nota: Nota) -> dict:
return {
"id": nota.id,
"titulo": nota.titulo,
"tags": ",".join(nota.tags),
"conexiones": ",".join(nota.conexiones),
"texto": nota.texto,
"resumen": nota.resumen,
"vector": nota.vector,
"vector_resumen": nota.vector_resumen
}
@staticmethod
def from_dict(data: dict) -> Nota:
return Nota(
id=data["id"],
titulo=data["titulo"],
tags=data["tags"].split(",") if data.get("tags") else [],
conexiones=data["conexiones"].split(",") if data.get("conexiones") else [],
texto=data["texto"],
resumen=data["resumen"],
vector=data.get("vector"),
vector_resumen=data.get("vector_resumen")
)
# ----------------------
# REPO
# ----------------------
class NotaRepo(Repo_base):
def __init__(self, session: Session, modelo_nota: type):
if modelo_nota is None:
raise ValueError("No se puede instanciar NotaRepo sin un modelo válido de nota.")
super().__init__(session=session, modelo=modelo_nota, mapper=NotaMapper)
# ------------------------
# Métodos personalizados
# ------------------------
def get_by_tag(self, tag: str) -> list[Nota]:
query = self.session.query(self.Modelo).filter(self.Modelo.tags.contains([tag]))
models = query.all()
return self.Mapper.from_model_list(models)
def search_by_text(self, texto: str) -> list[Nota]:
query = self.session.query(self.Modelo).filter(self.Modelo.texto.ilike(f'%{texto}%'))
models = query.all()
return self.Mapper.from_model_list(models)
def get_paginated(self, offset: int = 0, limit: int = 10) -> list[Nota]:
models = self.session.query(self.Modelo).offset(offset).limit(limit).all()
return self.Mapper.from_model_list(models)
def update(self, id_: str, nota_actualizada: Nota) -> bool:
model = self.session.get(self.Modelo, id_)
if not model:
return False
# Campos de dominio
model.titulo = nota_actualizada.titulo
model.texto = nota_actualizada.texto
model.tags = nota_actualizada.tags
model.conexiones = nota_actualizada.conexiones
model.resumen = nota_actualizada.resumen
# Actualización de campos de sistema
model.sys_version = (model.sys_version or 1) + 1
self.session.commit()
return True