628 lines
18 KiB
Python
628 lines
18 KiB
Python
import marimo
|
||
|
||
__generated_with = "0.15.5"
|
||
app = marimo.App(width="columns")
|
||
|
||
|
||
@app.cell(column=0)
|
||
def _():
|
||
import marimo as mo
|
||
return (mo,)
|
||
|
||
|
||
@app.cell
|
||
def _():
|
||
PUERTO_POSTGRES = 55455
|
||
SERVICIO = "postgres_ext"
|
||
DB_PASSWORD = "mipassword"
|
||
DB_USER = "postgres"
|
||
DB_NAME = "basededatos"
|
||
DB_HOST = "localhost"
|
||
return DB_HOST, DB_NAME, DB_PASSWORD, DB_USER, PUERTO_POSTGRES
|
||
|
||
|
||
@app.cell
|
||
def _(DB_HOST, DB_NAME, DB_PASSWORD, DB_USER, PUERTO_POSTGRES):
|
||
# scaffold_backend.py
|
||
import os
|
||
from pathlib import Path
|
||
|
||
|
||
BASE_DIR = Path.cwd() / "backend"
|
||
DB_DIR = BASE_DIR / "db"
|
||
DB_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
# __init__.py
|
||
(DB_DIR / "__init__.py").write_text("", encoding="utf-8")
|
||
|
||
# base.py
|
||
base_py = """\
|
||
# backend/db/base.py
|
||
from sqlalchemy.orm import declarative_base
|
||
|
||
# Declarative Base común para todos los modelos ORM
|
||
Base = declarative_base()
|
||
"""
|
||
(DB_DIR / "base.py").write_text(base_py, encoding="utf-8")
|
||
|
||
# session.py
|
||
session_py = f"""\
|
||
# backend/db/session.py
|
||
import os
|
||
from sqlalchemy import create_engine
|
||
from sqlalchemy.orm import sessionmaker
|
||
from urllib.parse import quote_plus
|
||
from dotenv import load_dotenv
|
||
|
||
# Cargar variables de entorno (.env) desde backend/.env
|
||
load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "..", ".env"))
|
||
|
||
DB_USER = os.getenv("DB_USER")
|
||
DB_PASSWORD = quote_plus(os.getenv("DB_PASSWORD"))
|
||
DB_HOST = os.getenv("DB_HOST")
|
||
DB_PORT = os.getenv("DB_PORT", "{PUERTO_POSTGRES}")
|
||
DB_NAME = os.getenv("DB_NAME")
|
||
|
||
DATABASE_URL = (
|
||
f"postgresql+psycopg2://{{DB_USER}}:{{DB_PASSWORD}}@{{DB_HOST}}:{{DB_PORT}}/{{DB_NAME}}"
|
||
)
|
||
|
||
# Crear engine
|
||
engine = create_engine(
|
||
DATABASE_URL,
|
||
pool_size=5,
|
||
max_overflow=10,
|
||
future=True,
|
||
echo=False, # pon True para ver las queries en consola
|
||
)
|
||
|
||
# Fábrica de sesiones
|
||
SessionLocal = sessionmaker(
|
||
bind=engine,
|
||
autoflush=False,
|
||
expire_on_commit=False,
|
||
future=True,
|
||
)
|
||
"""
|
||
(DB_DIR / "session.py").write_text(session_py, encoding="utf-8")
|
||
|
||
# .env handling
|
||
env_file = BASE_DIR / ".env"
|
||
default_env = {
|
||
"DB_USER": DB_USER,
|
||
"DB_PASSWORD": DB_PASSWORD,
|
||
"DB_HOST": DB_HOST,
|
||
"DB_PORT": PUERTO_POSTGRES,
|
||
"DB_NAME": DB_NAME,
|
||
}
|
||
|
||
|
||
if not env_file.exists():
|
||
# Crear archivo nuevo con todas las claves
|
||
env_content = "\n".join([f"{k}={v}" for k, v in default_env.items()]) + "\n"
|
||
env_file.write_text(env_content, encoding="utf-8")
|
||
print(f"✅ Creado {env_file}")
|
||
else:
|
||
# Leer y comprobar claves
|
||
existing = env_file.read_text(encoding="utf-8").splitlines()
|
||
existing_keys = {line.split("=")[0] for line in existing if "=" in line}
|
||
missing = {k: v for k, v in default_env.items() if k not in existing_keys}
|
||
if missing:
|
||
with env_file.open("a", encoding="utf-8") as f:
|
||
for k, v in missing.items():
|
||
f.write(f"{k}={v}\n")
|
||
print(f"📝 Añadidas claves faltantes: {', '.join(missing.keys())}")
|
||
else:
|
||
print(f"ℹ️ {env_file} ya tiene todas las claves necesarias.")
|
||
|
||
print(f"✅ Creado {DB_DIR/'base.py'}")
|
||
print(f"✅ Creado {DB_DIR/'session.py'}")
|
||
return Path, os
|
||
|
||
|
||
@app.cell
|
||
def _(Path, os):
|
||
|
||
# import os
|
||
|
||
RUTA_CARPETA_SNIPPETS = os.environ["SNIPPETS_ROUTE"]
|
||
|
||
DEST_BASE = Path("./domains/arquitecture_layer")
|
||
DEST_BASE.mkdir(parents=True, exist_ok=True)
|
||
|
||
archivos = {
|
||
"utils/ArquitectureLayer/Repo.py": "Repo.py",
|
||
"utils/ArquitectureLayer/Mapper.py": "Mapper.py",
|
||
"utils/ArquitectureLayer/Model.py": "Model.py",
|
||
}
|
||
|
||
# Copiar los archivos
|
||
for origen_rel, nombre_dest in archivos.items():
|
||
origen = Path(RUTA_CARPETA_SNIPPETS) / origen_rel
|
||
destino = DEST_BASE / nombre_dest
|
||
destino.write_text(origen.read_text(encoding="utf-8"), encoding="utf-8")
|
||
print(f"✅ {origen} → {destino}")
|
||
|
||
# Crear __init__.py para exponer los módulos
|
||
init_file = DEST_BASE / "__init__.py"
|
||
init_code = """\
|
||
# domains/arquitecture_layer/__init__.py
|
||
from .Repo import *
|
||
from .Mapper import *
|
||
from .Model import *
|
||
|
||
__all__ = ["Repo", "Mapper", "Model"]
|
||
"""
|
||
init_file.write_text(init_code, encoding="utf-8")
|
||
print(f"✅ Creado {init_file}")
|
||
return (RUTA_CARPETA_SNIPPETS,)
|
||
|
||
|
||
@app.cell(column=1)
|
||
def _(os):
|
||
#Obtenemos las variables
|
||
|
||
# import os
|
||
|
||
def obtener_texto_archivo(ruta_archivo):
|
||
"""Lee un archivo o todos los archivos de un directorio."""
|
||
if os.path.isdir(ruta_archivo):
|
||
textos = {}
|
||
for archivo in os.listdir(ruta_archivo):
|
||
ruta = os.path.join(ruta_archivo, archivo)
|
||
if os.path.isfile(ruta):
|
||
with open(ruta, 'r', encoding='utf-8') as f:
|
||
textos[archivo] = f.read()
|
||
return textos
|
||
elif os.path.isfile(ruta_archivo):
|
||
with open(ruta_archivo, 'r', encoding='utf-8') as f:
|
||
return f.read()
|
||
else:
|
||
raise FileNotFoundError(f"No se encontró archivo o directorio: {ruta_archivo}")
|
||
|
||
|
||
|
||
repo_code = obtener_texto_archivo("domains/arquitecture_layer/Repo.py")
|
||
mapper_code = obtener_texto_archivo("domains/arquitecture_layer/Mapper.py")
|
||
model_code = obtener_texto_archivo("domains/arquitecture_layer/Model.py")
|
||
|
||
repo_ruta_code = "from domains.arquitecture_layer.Model import Model_base"
|
||
mapper_ruta_code = "domains.arquitecture_layer.Mapper import Mapper_base"
|
||
model_ruta_code = "domains.arquitecture_layer.Repo import Repo_base"
|
||
return (
|
||
mapper_code,
|
||
mapper_ruta_code,
|
||
model_code,
|
||
model_ruta_code,
|
||
repo_code,
|
||
repo_ruta_code,
|
||
)
|
||
|
||
|
||
@app.cell
|
||
def _(mo):
|
||
input_entidad = mo.ui.text_area("", label="Entidad a crear", rows=1, full_width=True)
|
||
input_descripcion = mo.ui.text_area("", label="Descripcion de la Entidad", rows=3, full_width=True)
|
||
|
||
mo.vstack([input_entidad, input_descripcion])
|
||
return input_descripcion, input_entidad
|
||
|
||
|
||
@app.cell
|
||
def _(mo):
|
||
esquema = mo.ui.text(
|
||
value="public",
|
||
label="Esquema:"
|
||
)
|
||
esquema
|
||
return (esquema,)
|
||
|
||
|
||
@app.cell
|
||
def _(
|
||
RUTA_CARPETA_SNIPPETS,
|
||
esquema,
|
||
input_descripcion,
|
||
input_entidad,
|
||
mo,
|
||
os,
|
||
):
|
||
from jinja2 import Template
|
||
# from pathlib import Path
|
||
|
||
# RUTA_CARPETA_SNIPPETS = os.environ["SNIPPETS_ROUTE"]
|
||
|
||
with open(os.path.join(RUTA_CARPETA_SNIPPETS, "markdown/plantilla_generacion_campos_detalles.md"), "r", encoding="utf-8") as _f:
|
||
plantilla_detalles = Template(_f.read())
|
||
|
||
entidad = input_entidad.value.lower()
|
||
descripcion = input_descripcion.value.lower()
|
||
|
||
_contexto = {
|
||
|
||
"ENTIDAD" : entidad, # Nombre de la entidad
|
||
"DESCRIPCION" : descripcion,
|
||
"SCHEMA" : esquema.value
|
||
}
|
||
|
||
# Pasar el diccionario con **
|
||
resultado_plantilla = plantilla_detalles.render(**_contexto)
|
||
|
||
mo.output.append(mo.md("### Prompt para la base del modelo:"))
|
||
mo.output.append(mo.md(resultado_plantilla))
|
||
return Template, entidad, resultado_plantilla
|
||
|
||
|
||
@app.cell
|
||
def _(mo):
|
||
run_button = mo.ui.run_button(label="Generar JSON", kind="neutral")
|
||
run_button
|
||
return (run_button,)
|
||
|
||
|
||
@app.cell
|
||
def _(mo, os, resultado_plantilla, run_button):
|
||
from openai import OpenAI
|
||
import json
|
||
|
||
|
||
def call_openai_to_json(template_text: str) -> dict:
|
||
"""Call OpenAI and return parsed JSON; raises on errors."""
|
||
# NOTE: Read API key from environment for reproducibility
|
||
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
|
||
resp = client.chat.completions.create(
|
||
model="gpt-4o", # or "gpt-5" if available
|
||
response_format={"type": "json_object"},
|
||
messages=[
|
||
{"role": "system", "content": "Eres un asistente estricto que SIEMPRE devuelve JSON válido y nada más."},
|
||
{"role": "user", "content": resultado_plantilla}
|
||
],
|
||
temperature=0.2
|
||
)
|
||
json_text = resp.choices[0].message.content
|
||
return json.loads(json_text)
|
||
|
||
|
||
status_message = "Pulsa el botón para generar."
|
||
json_text = None # Default sentinel so dependents can run
|
||
|
||
if run_button.value:
|
||
try:
|
||
json_text = call_openai_to_json(resultado_plantilla)
|
||
status_message = "JSON generado correctamente."
|
||
except Exception as exc:
|
||
# Safe error surface without stopping the graph
|
||
status_message = f"Error al generar: {exc!s}"
|
||
|
||
# Return a UI element; if no resultado, mostramos solo el mensaje
|
||
mo.vstack([
|
||
mo.md(f"**Estado:** {status_message}"),
|
||
mo.json(json_text) if json_text is not None else mo.md("")
|
||
])
|
||
return OpenAI, json, json_text
|
||
|
||
|
||
@app.cell
|
||
def _(json, json_text, mo):
|
||
from glom import glom, Coalesce
|
||
|
||
# --- 1) Normalizar entrada: str JSON -> dict
|
||
def to_obj(x):
|
||
if isinstance(x, str):
|
||
return json.loads(x) # <- parsea el texto JSON
|
||
return x
|
||
|
||
data = to_obj(json_text) # json_text puede ser str o dict
|
||
|
||
# Extraer listas
|
||
campos_list = glom(data, Coalesce("CAMPOS", default=[]))
|
||
relaciones_list = glom(data, Coalesce("RELACIONES", default=[]))
|
||
indices_list = glom(data, Coalesce("INDEX_LIST", default=[]))
|
||
unicos_list = glom(data, Coalesce("UNIQUE_LIST", default=[]))
|
||
|
||
# Por si vienen como string en lugar de lista:
|
||
def ensure_list(v):
|
||
if v is None:
|
||
return []
|
||
if isinstance(v, list):
|
||
return v
|
||
return [str(v)]
|
||
|
||
# Convertir a texto multilínea
|
||
campos_list = ensure_list(campos_list)
|
||
relaciones_list = ensure_list(relaciones_list)
|
||
indices_list = ensure_list(indices_list)
|
||
unicos_list = ensure_list(unicos_list)
|
||
|
||
campos = "\n".join(map(str, campos_list))
|
||
relaciones = "\n".join(map(str, relaciones_list))
|
||
indices = "\n".join(map(str, indices_list))
|
||
unicos = "\n".join(map(str, unicos_list))
|
||
|
||
campos_final = mo.ui.text_area(campos, label="Campos", rows=15, full_width=True)
|
||
relaciones_final = mo.ui.text_area(relaciones, label="Relaciones", rows=3, full_width=True)
|
||
indices_final = mo.ui.text_area(indices, label="Indices", rows=3, full_width=True)
|
||
unicos_final = mo.ui.text_area(unicos, label="Unicos", rows=3, full_width=True)
|
||
|
||
layout = mo.hstack(
|
||
[
|
||
campos_final,
|
||
mo.vstack([relaciones_final, indices_final, unicos_final])
|
||
],
|
||
widths=[2, 1], # t1 ocupa el doble de ancho que el stack vertical
|
||
gap=1 # margen entre los elementos en rem (1 rem ≈ 16 px)
|
||
)
|
||
|
||
layout
|
||
return campos_final, indices_final, relaciones_final, unicos_final
|
||
|
||
|
||
@app.cell
|
||
def _(
|
||
RUTA_CARPETA_SNIPPETS,
|
||
Template,
|
||
campos_final,
|
||
entidad,
|
||
esquema,
|
||
indices_final,
|
||
mapper_code,
|
||
mapper_ruta_code,
|
||
mo,
|
||
model_code,
|
||
model_ruta_code,
|
||
os,
|
||
relaciones_final,
|
||
repo_code,
|
||
repo_ruta_code,
|
||
unicos_final,
|
||
):
|
||
with open(os.path.join(RUTA_CARPETA_SNIPPETS, "markdown/plantilla_generacion_mmr.md"), "r", encoding="utf-8") as _f:
|
||
plantilla = Template(_f.read())
|
||
|
||
with open(os.path.join(RUTA_CARPETA_SNIPPETS, "markdown/ejemplos_base_mmr.md"), "r", encoding="utf-8") as _f:
|
||
ejemplo = _f.read()
|
||
|
||
# Diccionario con variables
|
||
_contexto = {
|
||
|
||
"ENTIDAD" : entidad, # Nombre de la entidad
|
||
|
||
"RUTA_REPO_BASE": repo_ruta_code,
|
||
"RUTA_MAPPER_BASE": mapper_ruta_code,
|
||
"RUTA_MODEL_BASE": model_ruta_code,
|
||
|
||
"REPO_BASE": repo_code,
|
||
"MAPPER_BASE": mapper_code,
|
||
"MODEL_BASE": model_code,
|
||
|
||
"EJEMPLO": ejemplo,
|
||
|
||
"SCHEMA": esquema.value,
|
||
"TABLA": entidad,
|
||
|
||
|
||
"CAMPOS": campos_final.value,
|
||
"RELACIONES" : relaciones_final.value,
|
||
"INDEX_LIST": indices_final.value,
|
||
"UNIQUE_LIST": unicos_final.value,
|
||
|
||
}
|
||
|
||
|
||
# Pasar el diccionario con **
|
||
resultado = plantilla.render(**_contexto)
|
||
|
||
mo.output.append(mo.md("### Prompt para el modelo:"))
|
||
mo.output.append(mo.md(resultado))
|
||
return (resultado,)
|
||
|
||
|
||
@app.cell
|
||
def _(mo):
|
||
run_button_code = mo.ui.run_button(label="Generar código Python", kind="neutral")
|
||
run_button_code
|
||
return (run_button_code,)
|
||
|
||
|
||
@app.cell
|
||
def _(OpenAI, mo, os, resultado, run_button_code):
|
||
# from openai import OpenAI
|
||
|
||
def call_openai_to_code(user_prompt: str) -> str:
|
||
"""Call OpenAI and return Python code as plain text."""
|
||
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
|
||
resp = client.chat.completions.create(
|
||
model="gpt-4o", # o "gpt-5" si lo tienes
|
||
messages=[
|
||
{
|
||
"role": "system",
|
||
"content": "Eres un asistente estricto que SIEMPRE devuelve codigo python de acuerdo a sus reglas"
|
||
},
|
||
{"role": "user", "content": user_prompt}
|
||
],
|
||
temperature=0.2
|
||
)
|
||
return resp.choices[0].message.content
|
||
|
||
_status_message = "Pulsa el botón para generar."
|
||
respuesta_codigo = None # valor por defecto
|
||
|
||
if run_button_code.value:
|
||
try:
|
||
respuesta_codigo = call_openai_to_code(resultado)
|
||
_status_message = "Código generado correctamente."
|
||
except Exception as exc:
|
||
_status_message = f"Error al generar: {exc!s}"
|
||
|
||
mo.vstack([
|
||
mo.md(f"**Estado:** {_status_message}"),
|
||
mo.md(f"```python\n{respuesta_codigo}\n```") if respuesta_codigo else mo.md("")
|
||
])
|
||
return (respuesta_codigo,)
|
||
|
||
|
||
@app.cell
|
||
def _(entidad, mo, os, respuesta_codigo):
|
||
# Botón para guardar
|
||
boton_guardar = mo.ui.button(
|
||
label="Guardar archivo",
|
||
on_click=lambda _: guardar()
|
||
)
|
||
|
||
def guardar():
|
||
codigo_limpio = (
|
||
respuesta_codigo.strip()
|
||
.removeprefix("```python")
|
||
.removesuffix("```")
|
||
.strip()
|
||
)
|
||
|
||
# Carpeta relativa (ej: ./domains dentro de tu proyecto)
|
||
carpeta = "domains"
|
||
os.makedirs(carpeta, exist_ok=True)
|
||
|
||
ruta = os.path.join(carpeta, f"{entidad}.py")
|
||
with open(ruta, "w", encoding="utf-8") as f:
|
||
f.write(codigo_limpio)
|
||
|
||
print(f"✅ Archivo guardado en {ruta}")
|
||
|
||
boton_guardar
|
||
return
|
||
|
||
|
||
@app.cell
|
||
def _(mo, run_button_db):
|
||
from backend.db.session import engine
|
||
from backend.db.base import Base
|
||
|
||
# # -> AQUI IMPORTA TUS MODELOS para que se registren en Base.metadata
|
||
# from domains.Clientes import ClientesModel
|
||
# from domains.Vehiculos import VehiculosModel
|
||
|
||
# from domains.ejemplo import ejemploModel
|
||
|
||
from domains.nota import notaModel
|
||
|
||
|
||
def create_all_tables() -> dict:
|
||
"""Create all tables bound to Base.metadata and return a summary."""
|
||
# Ensure models are imported so their metadata is registered (done in Cell 1)
|
||
Base.metadata.create_all(bind=engine)
|
||
created_tables = list(Base.metadata.tables.keys())
|
||
return {
|
||
"message": "✔ Tablas creadas",
|
||
"tables": created_tables,
|
||
"count": len(created_tables),
|
||
}
|
||
|
||
_status_message = "Importa tus modelos y pulsa el botón para crear las tablas."
|
||
result_payload = None # Default so dependents don't break
|
||
|
||
if run_button_db.value:
|
||
try:
|
||
result_payload = create_all_tables()
|
||
_status_message = "Operación completada correctamente."
|
||
except Exception as exc:
|
||
_status_message = f"Error al crear tablas: {exc!s}"
|
||
|
||
mo.vstack([
|
||
mo.md(f"**Estado:** {_status_message}"),
|
||
mo.json(result_payload) if result_payload is not None else mo.md("")
|
||
])
|
||
return (engine,)
|
||
|
||
|
||
@app.cell
|
||
def _(mo):
|
||
run_button_db = mo.ui.run_button(label="Crear tablas en la base de datos", kind="neutral")
|
||
|
||
mo.vstack([mo.md("Finalmente registrar las tablas en la bbdd"),
|
||
run_button_db])
|
||
return (run_button_db,)
|
||
|
||
|
||
@app.cell
|
||
def _(mo):
|
||
# Cell 2
|
||
run_button_drop = mo.ui.run_button(label="Eliminar todas las tablas vacías", kind = "danger" )
|
||
run_button_drop
|
||
return (run_button_drop,)
|
||
|
||
|
||
@app.cell
|
||
def _(engine, mo, run_button_drop):
|
||
from sqlalchemy import inspect, func, select
|
||
from sqlalchemy import MetaData
|
||
from sqlalchemy import MetaData, select, exists
|
||
|
||
def drop_all_empty_tables_improved():
|
||
engine.dispose()
|
||
metadata = MetaData()
|
||
metadata.reflect(bind=engine, resolve_fks=False)
|
||
|
||
dropped, skipped = [], []
|
||
with engine.connect() as connection:
|
||
trans = connection.begin()
|
||
try:
|
||
for table in metadata.sorted_tables:
|
||
col = list(table.c)[0] # primera columna para usar en exists()
|
||
has_data = connection.execute(
|
||
select(exists().where(col != None))
|
||
).scalar()
|
||
|
||
if not has_data:
|
||
table.drop(engine, checkfirst=True)
|
||
dropped.append(table.name)
|
||
else:
|
||
skipped.append(table.name)
|
||
trans.commit()
|
||
except:
|
||
trans.rollback()
|
||
raise
|
||
|
||
return {"dropped": dropped, "skipped": skipped}
|
||
|
||
|
||
_status_message = "Pulsa el botón para eliminar tablas vacías."
|
||
_result_payload = None
|
||
|
||
if run_button_drop.value:
|
||
try:
|
||
_result_payload = drop_all_empty_tables_improved()
|
||
_status_message = "Operación completada correctamente."
|
||
except Exception as exc:
|
||
_status_message = f"Error al eliminar tablas: {exc!s}"
|
||
|
||
mo.vstack([
|
||
mo.md(f"**Estado:** {_status_message}"),
|
||
mo.json(_result_payload) if _result_payload else mo.md("")
|
||
])
|
||
return
|
||
|
||
|
||
@app.cell
|
||
def _():
|
||
# # Obtenemos las variables de nuestro template
|
||
|
||
# from jinja2 import Environment, meta
|
||
|
||
# # Crear un entorno de Jinja
|
||
# env = Environment()
|
||
|
||
# with open(os.path.join(RUTA_CARPETA_SNIPPETS, "markdown/plantilla_mmr.md"), "r", encoding="utf-8") as _f:
|
||
# source = _f.read()
|
||
|
||
# # Parsear la plantilla sin renderizar
|
||
# parsed_content = env.parse(source)
|
||
|
||
# # Obtener las variables no declaradas
|
||
# variables = meta.find_undeclared_variables(parsed_content)
|
||
|
||
# print("Variables encontradas:", variables)
|
||
return
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run()
|