Refactor project structure and implement new features
- Removed unused security module and updated import paths. - Enhanced OpenAI client with streaming capabilities for chat completions. - Added new backend API endpoints for health check (ping). - Established a new FastAPI application with CORS configuration. - Created a new Appshell component for the frontend with navigation links. - Integrated SVG icons and improved styling for the Appshell component. - Implemented memory management for conversation history using PostgreSQL. - Developed abstract classes for AI agents and models, with OpenAI integration. - Added encryption utilities for secure data handling.
This commit is contained in:
@@ -6,7 +6,7 @@ from sqlalchemy import Column, Integer, String
|
||||
from src.ConexionSql.Base_conexion import ConexionBase
|
||||
from src.base import Base
|
||||
from src.ApiKeys.openai_apikey import OpenAICredencial
|
||||
from security.Encriptar import Encriptar_fernet
|
||||
from src.Security.Encriptar import Encriptar_fernet
|
||||
from entrypoint import ENV_PATH
|
||||
|
||||
# ----------------------
|
||||
|
||||
@@ -12,8 +12,19 @@ class OpenAICliente:
|
||||
self.client.organization = self.credencial.organizacion
|
||||
|
||||
# --- Chat Completions ---
|
||||
def chat_completion(self, model: str, messages: list, **kwargs):
|
||||
return self.client.chat.completions.create(model=model, messages=messages, **kwargs)
|
||||
def chat_completion(self, model: str, messages: list, stream: bool = False, **kwargs):
|
||||
response = self.client.chat.completions.create(
|
||||
model=model,
|
||||
messages=messages,
|
||||
stream=stream, # Parámetro explícito
|
||||
**kwargs
|
||||
)
|
||||
return self._handle_stream(response, stream) if stream else response
|
||||
|
||||
def _handle_stream(self, stream, _):
|
||||
for chunk in stream:
|
||||
if chunk.choices and chunk.choices[0].delta.content:
|
||||
yield chunk.choices[0].delta.content
|
||||
|
||||
# --- Text Completions (legacy) ---
|
||||
def completion(self, model: str, prompt: str, **kwargs):
|
||||
|
||||
@@ -6,7 +6,7 @@ from sqlalchemy.orm import relationship
|
||||
from src.ConexionSql.Base_conexion import ConexionBase
|
||||
from src.base import Base
|
||||
from src.Credenciales.postgres_credencial import PostgresCredencial
|
||||
from security.Encriptar import Encriptar_fernet
|
||||
from src.Security.Encriptar import Encriptar_fernet
|
||||
|
||||
# ----------------------
|
||||
# Cargar clave maestra
|
||||
|
||||
@@ -0,0 +1,190 @@
|
||||
from src.Llms.Modelos.Base_model import ModeloABC
|
||||
from src.Llms.Memory.Base_MemoryConv import MemoryConvABC
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional, List, Union, AsyncGenerator
|
||||
|
||||
|
||||
class AgenteAI:
|
||||
def __init__(
|
||||
self,
|
||||
modelo: ModeloABC,
|
||||
nombre: str,
|
||||
descripcion: str,
|
||||
system_prompt: str,
|
||||
rol: str,
|
||||
objetivos: List[str],
|
||||
max_iterations: int = 1,
|
||||
memoria: Optional[MemoryConvABC] = None,
|
||||
version: str = "1.0.0",
|
||||
tools: Optional[List] = None,
|
||||
output_schema: Optional[dict] = None,
|
||||
):
|
||||
self.modelo = modelo
|
||||
self.memoria = memoria
|
||||
self.tools = tools or []
|
||||
self.output_schema = output_schema
|
||||
|
||||
self.nombre = nombre
|
||||
self.descripcion = descripcion
|
||||
self.system_prompt = system_prompt
|
||||
self.max_iterations = max_iterations
|
||||
self.rol = rol
|
||||
self.objetivos = objetivos
|
||||
self.version = version
|
||||
|
||||
self.created_at = datetime.now()
|
||||
self.updated_at = self.created_at
|
||||
self.numero_interacciones = 0
|
||||
|
||||
def actualizar_configuracion(self, **kwargs):
|
||||
for clave, valor in kwargs.items():
|
||||
if hasattr(self, clave):
|
||||
setattr(self, clave, valor)
|
||||
self.updated_at = datetime.now()
|
||||
|
||||
@property
|
||||
def full_system_prompt(self) -> str:
|
||||
partes = [
|
||||
f"Tu nombre es: {self.nombre}",
|
||||
f"Tu descripción: {self.descripcion}",
|
||||
f"Tu Rol: {self.rol}",
|
||||
f"Tus Objetivos: {', '.join(self.objetivos)}",
|
||||
""
|
||||
]
|
||||
|
||||
herramientas = self._obtener_descripcion_tools()
|
||||
if herramientas:
|
||||
partes.append("Estas son tus herramientas disponibles:\n")
|
||||
partes.extend(herramientas)
|
||||
partes.append("Úsalas cuando creas oportuno.\n")
|
||||
|
||||
partes.append(self.system_prompt)
|
||||
|
||||
if self.output_schema:
|
||||
partes.append("SIEMPRE formatea la respuesta final siguiendo estrictamente el siguiente esquema JSON:")
|
||||
partes.append(f"```json\n{self.output_schema}\n```")
|
||||
|
||||
return "\n".join(partes)
|
||||
|
||||
def _obtener_descripcion_tools(self) -> List[str]:
|
||||
descripciones = []
|
||||
if not hasattr(self, "mcp_servers"):
|
||||
return descripciones
|
||||
|
||||
for server in self.mcp_servers:
|
||||
if hasattr(server, "tools") and server.tools:
|
||||
for tool in server.tools:
|
||||
if isinstance(tool, str):
|
||||
descripciones.append(f"- {tool}: [sin descripción]")
|
||||
elif isinstance(tool, dict):
|
||||
nombre = tool.get("name", "¿?")
|
||||
descripcion = tool.get("description", "[sin descripción]")
|
||||
descripciones.append(f"- {nombre}: {descripcion}")
|
||||
elif hasattr(tool, "name"):
|
||||
descripcion = getattr(tool, "description", "[sin descripción]")
|
||||
descripciones.append(f"- {tool.name}: {descripcion}")
|
||||
return descripciones
|
||||
|
||||
def _formatear_prompt(self, mensajes: List[dict]) -> str:
|
||||
return "\n".join([f"{msg['role']}: {msg['content']}" for msg in mensajes])
|
||||
|
||||
async def interactuar(self, prompt: str, stream: bool = False) -> Union[str, AsyncGenerator[str, None]]:
|
||||
historial = self.memoria.cargar_historial_chat() if self.memoria else []
|
||||
contexto = historial + [{"role": "user", "content": prompt}]
|
||||
prompt_final = self._formatear_prompt(contexto)
|
||||
|
||||
respuesta = await self.modelo.responder(
|
||||
prompt=prompt_final,
|
||||
system_prompt=self.full_system_prompt,
|
||||
stream=stream
|
||||
)
|
||||
|
||||
if stream:
|
||||
# stream es un generador asincrónico
|
||||
async def wrapper():
|
||||
buffer_respuesta = ""
|
||||
async for token in respuesta:
|
||||
buffer_respuesta += token
|
||||
yield token
|
||||
if self.memoria:
|
||||
self.memoria.guardar_turno("user", prompt)
|
||||
self.memoria.guardar_turno("assistant", buffer_respuesta)
|
||||
self.numero_interacciones += 1
|
||||
self.updated_at = datetime.now()
|
||||
return wrapper()
|
||||
else:
|
||||
if self.memoria:
|
||||
self.memoria.guardar_turno("user", prompt)
|
||||
self.memoria.guardar_turno("assistant", respuesta)
|
||||
self.numero_interacciones += 1
|
||||
self.updated_at = datetime.now()
|
||||
return respuesta
|
||||
|
||||
async def interactuar_en_bucle(self, prompt: str, stream: bool = False) -> Union[List[str], AsyncGenerator[str, None]]:
|
||||
historial = self.memoria.cargar_historial_chat() if self.memoria else []
|
||||
respuestas = [] if not stream else None
|
||||
respuesta_anterior = None
|
||||
iteration = 0
|
||||
prompt_original = prompt.strip()
|
||||
|
||||
async def generador():
|
||||
nonlocal iteration, respuesta_anterior
|
||||
prompt_actual = prompt_original
|
||||
while self.max_iterations == 0 or iteration < self.max_iterations:
|
||||
if iteration == 0:
|
||||
prompt_actual += (
|
||||
"\n\nIMPORTANTE:\n"
|
||||
"Si al revisar tu última respuesta y mi pregunta inicial consideras que has terminado, "
|
||||
"di alguna de estas frases: <FIN>"
|
||||
)
|
||||
else:
|
||||
prompt_actual = (
|
||||
f"Esta es la pregunta original:\n{prompt_original}\n\n"
|
||||
f"Esto fue lo último que dijiste:\n{respuesta_anterior}\n"
|
||||
|
||||
"\n\nIMPORTANTE:\n"
|
||||
"Si al revisar tu última respuesta y mi pregunta inicial consideras que has terminado, "
|
||||
"di alguna de estas frases: <FIN>"
|
||||
)
|
||||
|
||||
contexto = historial + [{"role": "user", "content": prompt_actual}]
|
||||
prompt_final = self._formatear_prompt(contexto)
|
||||
|
||||
respuesta = await self.modelo.responder(
|
||||
prompt=prompt_final,
|
||||
system_prompt=self.full_system_prompt,
|
||||
stream=stream
|
||||
)
|
||||
|
||||
if stream:
|
||||
buffer_respuesta = ""
|
||||
async for token in respuesta:
|
||||
buffer_respuesta += token
|
||||
yield token
|
||||
respuesta_anterior = buffer_respuesta
|
||||
else:
|
||||
respuestas.append(respuesta)
|
||||
respuesta_anterior = respuesta
|
||||
|
||||
if self.memoria:
|
||||
self.memoria.guardar_turno("user", prompt_actual)
|
||||
self.memoria.guardar_turno("assistant", respuesta_anterior)
|
||||
|
||||
self.numero_interacciones += 1
|
||||
self.updated_at = datetime.now()
|
||||
|
||||
if "<fin>" in respuesta_anterior.lower():
|
||||
break
|
||||
|
||||
iteration += 1
|
||||
prompt_actual = ""
|
||||
|
||||
return generador() if stream else await generador_to_list(generador)
|
||||
|
||||
# Helper para consumir generador asincrónico si no es stream
|
||||
async def generador_to_list(gen: AsyncGenerator[str, None]) -> List[str]:
|
||||
buffer = ""
|
||||
async for chunk in gen:
|
||||
buffer += chunk
|
||||
return [buffer]
|
||||
@@ -0,0 +1,65 @@
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Optional, List, Dict
|
||||
from contextlib import AsyncExitStack
|
||||
from mcp import ClientSession, StdioServerParameters
|
||||
from mcp.client.stdio import stdio_client
|
||||
from mcp.types import Tool
|
||||
|
||||
class MCPStdioServer:
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
command: str,
|
||||
args: List[str],
|
||||
env: Optional[Dict[str, str]] = None
|
||||
):
|
||||
self.name = name
|
||||
self.command = command
|
||||
self.args = args
|
||||
self.env = env or os.environ.copy()
|
||||
self.exit_stack = AsyncExitStack()
|
||||
self.session: Optional[ClientSession] = None
|
||||
self.tools: List[Tool] = []
|
||||
|
||||
async def start(self):
|
||||
# Configurar el bucle de eventos Proactor en Windows si es necesario
|
||||
if os.name == "nt":
|
||||
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
||||
|
||||
server_params = StdioServerParameters(
|
||||
command=self.command,
|
||||
args=self.args,
|
||||
env=self.env
|
||||
)
|
||||
|
||||
# Iniciar el transporte y establecer la sesión
|
||||
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
|
||||
read, write = stdio_transport
|
||||
|
||||
self.session = await self.exit_stack.enter_async_context(ClientSession(read, write))
|
||||
await self.session.initialize()
|
||||
response = await self.session.list_tools()
|
||||
self.tools = response.tools
|
||||
|
||||
if self.tools:
|
||||
print(f"[{self.name}] Servidor iniciado con herramientas:")
|
||||
for tool in self.tools:
|
||||
nombre = getattr(tool, "name", "[sin nombre]")
|
||||
descripcion = getattr(tool, "description", "[sin descripción]")
|
||||
print(f" - {nombre} - {descripcion}")
|
||||
else:
|
||||
print(f"[{self.name}] Servidor iniciado, pero no se detectaron herramientas.")
|
||||
|
||||
async def call_tool(self, tool_name: str, arguments: Dict):
|
||||
if not self.session:
|
||||
raise RuntimeError("La sesión no está inicializada.")
|
||||
result = await self.session.call_tool(tool_name, arguments)
|
||||
return result.content
|
||||
|
||||
def get_tool_names(self) -> List[str]:
|
||||
return [tool.name for tool in self.tools]
|
||||
|
||||
async def stop(self):
|
||||
await self.exit_stack.aclose()
|
||||
print(f"[{self.name}] Servidor detenido.")
|
||||
@@ -0,0 +1,36 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Literal
|
||||
|
||||
class MemoryConvABC(ABC):
|
||||
"""
|
||||
Interfaz abstracta para memorias de conversación multi-turno.
|
||||
"""
|
||||
|
||||
def __init__(self, k: int = 10):
|
||||
"""
|
||||
:param k: Número máximo de turnos (pares user/assistant) a retornar.
|
||||
"""
|
||||
self.k = k
|
||||
|
||||
@abstractmethod
|
||||
def guardar_turno(self, rol: Literal["user", "assistant"], contenido: str) -> None:
|
||||
"""
|
||||
Guarda un mensaje de un turno de conversación.
|
||||
Se guarda secuencialmente y se asume que el orden es respetado.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def cargar_historial_chat(self) -> list[dict]:
|
||||
"""
|
||||
Devuelve los últimos `k*2` mensajes como lista de dicts tipo chat:
|
||||
[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def limpiar(self) -> None:
|
||||
"""
|
||||
Limpia completamente la memoria.
|
||||
"""
|
||||
pass
|
||||
@@ -0,0 +1,52 @@
|
||||
from sqlalchemy import Table, Column, Integer, String, MetaData, insert, select, delete
|
||||
from typing import Literal
|
||||
|
||||
from src.Credenciales.postgres_credencial import PostgresCredencial
|
||||
from src.ConexionSql.Postgres_conexion import PostgresConexion # Usamos la clase específica
|
||||
from src.Llms.Memory.Base_MemoryConv import MemoryConvABC
|
||||
|
||||
|
||||
class MemoryConvPostgres(MemoryConvABC):
|
||||
def __init__(self, nombre_tabla: str, k: int, credencial: PostgresCredencial):
|
||||
"""
|
||||
:param nombre_tabla: Nombre de la tabla en PostgreSQL.
|
||||
:param k: Número de pares user/assistant a recuperar.
|
||||
:param credencial: Credencial para conectar a PostgreSQL.
|
||||
"""
|
||||
super().__init__(k=k)
|
||||
self.conexion = PostgresConexion(credencial) # Se instancia directamente con la credencial
|
||||
self.nombre_tabla = nombre_tabla
|
||||
|
||||
self.metadata = MetaData()
|
||||
self.tabla = Table(
|
||||
self.nombre_tabla,
|
||||
self.metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=True),
|
||||
Column("rol", String, nullable=False),
|
||||
Column("contenido", String, nullable=False),
|
||||
)
|
||||
|
||||
# Crea la tabla si no existe
|
||||
self.metadata.create_all(self.conexion.engine)
|
||||
|
||||
def guardar_turno(self, rol: Literal["user", "assistant"], contenido: str) -> None:
|
||||
stmt = insert(self.tabla).values(rol=rol, contenido=contenido)
|
||||
with self.conexion.get_session() as session:
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
|
||||
def cargar_historial_chat(self) -> list[dict]:
|
||||
stmt = (
|
||||
select(self.tabla.c.rol, self.tabla.c.contenido)
|
||||
.order_by(self.tabla.c.id.desc())
|
||||
.limit(self.k * 2)
|
||||
)
|
||||
with self.conexion.get_session() as session:
|
||||
result = session.execute(stmt).fetchall()
|
||||
return [{"role": r.rol, "content": r.contenido} for r in reversed(result)]
|
||||
|
||||
def limpiar(self) -> None:
|
||||
stmt = delete(self.tabla)
|
||||
with self.conexion.get_session() as session:
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
@@ -0,0 +1,30 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
class ModeloABC(ABC):
|
||||
"""
|
||||
Clase base para definir la interfaz de un modelo conversacional.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model: str,
|
||||
temperature: float = 0.7,
|
||||
top_p: float = 1.0,
|
||||
top_k: int = None,
|
||||
frecuencia_penalizacion: float = 0.0,
|
||||
num_tokens_maximos: int = 512
|
||||
):
|
||||
self.model = model
|
||||
self.temperature = temperature
|
||||
self.top_p = top_p
|
||||
self.top_k = top_k
|
||||
self.frecuencia_penalizacion = frecuencia_penalizacion
|
||||
self.num_tokens_maximos = num_tokens_maximos
|
||||
|
||||
@abstractmethod
|
||||
async def responder(self, prompt: str, system_prompt: str = "", stream: bool = False, **kwargs) -> str:
|
||||
"""
|
||||
Devuelve una respuesta a partir de un prompt y configuración del modelo.
|
||||
Este método debe implementarse de forma asíncrona en las subclases.
|
||||
"""
|
||||
pass
|
||||
@@ -0,0 +1,82 @@
|
||||
from src.Llms.Modelos.Base_model import ModeloABC
|
||||
from src.ConexionApis.OpenAi_conexion import OpenAICliente
|
||||
import asyncio
|
||||
from typing import AsyncGenerator, Union
|
||||
|
||||
class ModeloOpenAI(ModeloABC):
|
||||
def __init__(
|
||||
self,
|
||||
cliente: OpenAICliente,
|
||||
model: str = "gpt-4o",
|
||||
temperature: float = 0.7,
|
||||
top_p: float = 1.0,
|
||||
top_k: int = None,
|
||||
frecuencia_penalizacion: float = 0.0,
|
||||
num_tokens_maximos: int = 512,
|
||||
use_legacy: bool = False
|
||||
):
|
||||
super().__init__(
|
||||
model=model,
|
||||
temperature=temperature,
|
||||
top_p=top_p,
|
||||
top_k=top_k,
|
||||
frecuencia_penalizacion=frecuencia_penalizacion,
|
||||
num_tokens_maximos=num_tokens_maximos
|
||||
)
|
||||
self.cliente = cliente
|
||||
self.use_legacy = use_legacy
|
||||
|
||||
async def responder(
|
||||
self,
|
||||
prompt: str,
|
||||
system_prompt: str = "",
|
||||
stream: bool = False,
|
||||
**kwargs
|
||||
) -> Union[str, AsyncGenerator[str, None]]:
|
||||
if self.use_legacy:
|
||||
if stream:
|
||||
raise NotImplementedError("El modo legacy no soporta streaming.")
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
respuesta = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.cliente.completion(
|
||||
model=self.model,
|
||||
prompt=prompt,
|
||||
temperature=self.temperature,
|
||||
top_p=self.top_p,
|
||||
max_tokens=self.num_tokens_maximos,
|
||||
frequency_penalty=self.frecuencia_penalizacion,
|
||||
**kwargs
|
||||
)
|
||||
)
|
||||
return respuesta.choices[0].text.strip()
|
||||
|
||||
# Construcción de mensajes estilo Chat
|
||||
messages = []
|
||||
if system_prompt:
|
||||
messages.append({"role": "system", "content": system_prompt})
|
||||
messages.append({"role": "user", "content": prompt})
|
||||
|
||||
def sync_call():
|
||||
return self.cliente.chat_completion(
|
||||
model=self.model,
|
||||
messages=messages,
|
||||
temperature=self.temperature,
|
||||
top_p=self.top_p,
|
||||
max_tokens=self.num_tokens_maximos,
|
||||
frequency_penalty=self.frecuencia_penalizacion,
|
||||
stream=stream,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
resultado = await loop.run_in_executor(None, sync_call)
|
||||
|
||||
if stream:
|
||||
async def generador():
|
||||
for token in resultado: # ya es un generador del cliente
|
||||
yield token
|
||||
return generador()
|
||||
else:
|
||||
return resultado.choices[0].message.content
|
||||
@@ -0,0 +1,86 @@
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
from sqlalchemy import Column, Integer, String, Float, Boolean
|
||||
|
||||
from src.ConexionSql.Base_conexion import ConexionBase
|
||||
from src.base import Base
|
||||
from src.Llms.Modelos.Openai_model import ModeloOpenAI # Clase real de lógica
|
||||
|
||||
# ----------------------
|
||||
# Cargar clave maestra
|
||||
# ----------------------
|
||||
from entrypoint import ENV_PATH
|
||||
load_dotenv(ENV_PATH)
|
||||
pssword = os.getenv('MASTER_PASSWORD')
|
||||
if pssword is None:
|
||||
raise ValueError("MASTER_PASSWORD no está definida en el archivo .env")
|
||||
|
||||
# ----------------------
|
||||
# MODELO (SQLAlchemy)
|
||||
# ----------------------
|
||||
|
||||
class ModeloOpenAIConfigModel(Base):
|
||||
__tablename__ = 'modelo_openai_configs'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
model = Column(String, nullable=False)
|
||||
temperature = Column(Float, default=0.7)
|
||||
top_p = Column(Float, default=1.0)
|
||||
top_k = Column(Integer, nullable=True)
|
||||
frecuencia_penalizacion = Column(Float, default=0.0)
|
||||
num_tokens_maximos = Column(Integer, default=512)
|
||||
use_legacy = Column(Boolean, default=False)
|
||||
|
||||
# ----------------------
|
||||
# MAPPER
|
||||
# ----------------------
|
||||
|
||||
class ModeloOpenAIConfigMapper:
|
||||
@staticmethod
|
||||
def to_dict(obj: ModeloOpenAI) -> dict:
|
||||
return {
|
||||
"model": obj.model,
|
||||
"temperature": obj.temperature,
|
||||
"top_p": obj.top_p,
|
||||
"top_k": obj.top_k,
|
||||
"frecuencia_penalizacion": obj.frecuencia_penalizacion,
|
||||
"num_tokens_maximos": obj.num_tokens_maximos,
|
||||
"use_legacy": obj.use_legacy
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_model(model: ModeloOpenAIConfigModel, cliente: object) -> ModeloOpenAI:
|
||||
return ModeloOpenAI(
|
||||
cliente=cliente,
|
||||
model=model.model,
|
||||
temperature=model.temperature,
|
||||
top_p=model.top_p,
|
||||
top_k=model.top_k,
|
||||
frecuencia_penalizacion=model.frecuencia_penalizacion,
|
||||
num_tokens_maximos=model.num_tokens_maximos,
|
||||
use_legacy=model.use_legacy
|
||||
)
|
||||
|
||||
# ----------------------
|
||||
# REPO
|
||||
# ----------------------
|
||||
|
||||
class ModeloOpenAIConfigRepo:
|
||||
def __init__(self, conexion: ConexionBase, cliente: object):
|
||||
self.session = conexion.get_session()
|
||||
self.cliente = cliente # Necesario para crear ModeloOpenAI
|
||||
|
||||
def add(self, config: ModeloOpenAI) -> int:
|
||||
data = ModeloOpenAIConfigMapper.to_dict(config)
|
||||
model = ModeloOpenAIConfigModel(**data)
|
||||
self.session.add(model)
|
||||
self.session.commit()
|
||||
return model.id
|
||||
|
||||
def get_by_id(self, id_: int) -> ModeloOpenAI | None:
|
||||
model = self.session.get(ModeloOpenAIConfigModel, id_)
|
||||
return ModeloOpenAIConfigMapper.from_model(model, self.cliente) if model else None
|
||||
|
||||
def get_all(self) -> list[ModeloOpenAI]:
|
||||
models = self.session.query(ModeloOpenAIConfigModel).all()
|
||||
return [ModeloOpenAIConfigMapper.from_model(m, self.cliente) for m in models]
|
||||
@@ -0,0 +1,44 @@
|
||||
import base64
|
||||
import os
|
||||
from cryptography.fernet import Fernet
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
class Encriptar_fernet:
|
||||
@staticmethod
|
||||
def _derivar_clave(password: str, salt: bytes) -> bytes:
|
||||
"""
|
||||
Deriva una clave segura a partir de la contraseña y el salt usando PBKDF2HMAC.
|
||||
"""
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=100_000,
|
||||
backend=default_backend()
|
||||
)
|
||||
return base64.urlsafe_b64encode(kdf.derive(password.encode()))
|
||||
|
||||
@classmethod
|
||||
def encriptar(cls, texto: str, password: str) -> bytes:
|
||||
"""
|
||||
Encripta un texto con una clave derivada de la contraseña + salt aleatorio.
|
||||
El salt es embebido al inicio del token cifrado.
|
||||
"""
|
||||
salt = os.urandom(16) # 128 bits de salt aleatorio
|
||||
key = cls._derivar_clave(password, salt)
|
||||
fernet = Fernet(key)
|
||||
token = fernet.encrypt(texto.encode('utf-8'))
|
||||
return salt + token # Embebemos el salt al principio
|
||||
|
||||
@classmethod
|
||||
def desencriptar(cls, token_con_salt: bytes, password: str) -> str:
|
||||
"""
|
||||
Extrae el salt del token, deriva la clave, y desencripta el texto.
|
||||
"""
|
||||
salt = token_con_salt[:16]
|
||||
token = token_con_salt[16:]
|
||||
key = cls._derivar_clave(password, salt)
|
||||
fernet = Fernet(key)
|
||||
return fernet.decrypt(token).decode('utf-8')
|
||||
Reference in New Issue
Block a user