feat: Implement MCP server generation and process management scripts
This commit is contained in:
@@ -0,0 +1,64 @@
|
|||||||
|
"""
|
||||||
|
Script para generar un servidor MCP a partir de código fuente recibido (por ejemplo, desde un LLM).
|
||||||
|
Crea un archivo Python en la carpeta de servidores MCP con el código proporcionado y un nombre único.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from fastmcp import FastMCP
|
||||||
|
|
||||||
|
SERVERS_DIR = Path(__file__).parent
|
||||||
|
mcp = FastMCP()
|
||||||
|
|
||||||
|
def generate_server(code: str, name: str = None) -> str:
|
||||||
|
if not name:
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
name = f"server_llm_{timestamp}.py"
|
||||||
|
else:
|
||||||
|
if not name.endswith('.py'):
|
||||||
|
name += '.py'
|
||||||
|
server_path = SERVERS_DIR / name
|
||||||
|
with open(server_path, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(code)
|
||||||
|
return str(server_path)
|
||||||
|
|
||||||
|
@mcp.tool(description="Genera un archivo de servidor MCP a partir de código fuente y un nombre opcional.")
|
||||||
|
def mcp_generate_server(code: str, name: str = None) -> str:
|
||||||
|
"""
|
||||||
|
Esta herramienta guarda el código fuente en un archivo Python con nombre opcional.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
code: Código fuente del servidor MCP como string.
|
||||||
|
name: (opcional) Nombre del archivo. Si no se especifica, se generará uno con timestamp.
|
||||||
|
|
||||||
|
Requiere que el código incluya explícitamente: path="/"
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Ruta absoluta del archivo creado.
|
||||||
|
|
||||||
|
Ejemplo de uso mínimo:
|
||||||
|
|
||||||
|
mcp_generate_server(
|
||||||
|
code=\"\"\"from fastmcp import FastMCP
|
||||||
|
mcp = FastMCP()
|
||||||
|
|
||||||
|
@mcp.tool(description="Saluda al mundo.")
|
||||||
|
def hello():
|
||||||
|
return "Hola mundo"
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
mcp.run(transport="streamable-http", host="127.0.0.1", port=4211, path="/")
|
||||||
|
\"\"\"
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
# Validación: asegurar que el código incluya path="/"
|
||||||
|
if 'path="/"' not in code.replace(" ", "").replace("'", '"'):
|
||||||
|
raise ValueError('El código del servidor debe contener path="/".')
|
||||||
|
|
||||||
|
return generate_server(code, name)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
mcp.run(transport="streamable-http", host="127.0.0.1", port=4210, path="/")
|
||||||
|
# mcp.run(transport="stdio")
|
||||||
@@ -1,3 +0,0 @@
|
|||||||
20136
|
|
||||||
21880
|
|
||||||
12708
|
|
||||||
@@ -5,17 +5,13 @@ import re
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
ROOT = Path(__file__).resolve().parent
|
ROOT = Path(__file__).resolve().parent
|
||||||
PYTHON_PATH = (ROOT.parent / ".venv" / "Scripts" / "python.exe").resolve()
|
PYTHON_PATH = (ROOT.parent / ROOT.parent / ".venv" / "Scripts" / "python.exe").resolve()
|
||||||
PID_FILE = ROOT / "mcps.pid"
|
PID_FILE = ROOT / "mcps.pid"
|
||||||
|
|
||||||
# Rutas relativas de scripts (no es necesario configurar puertos)
|
# Carpeta de MCPs
|
||||||
scripts = [
|
MCPS_DIR = (ROOT / "../domains/Llms/MCPs/McpServers").resolve()
|
||||||
"../domains/Llms/MCPs/McpServers/server_files.py",
|
|
||||||
"../domains/Llms/MCPs/McpServers/server_math.py",
|
|
||||||
"../domains/Llms/MCPs/McpServers/server_utils.py"
|
|
||||||
]
|
|
||||||
|
|
||||||
print("🚀 Iniciando scripts MCP con entorno virtual...")
|
print("🚀 Iniciando todos los MCPs en el directorio:", MCPS_DIR)
|
||||||
|
|
||||||
# Verificar intérprete Python
|
# Verificar intérprete Python
|
||||||
if not PYTHON_PATH.exists():
|
if not PYTHON_PATH.exists():
|
||||||
@@ -29,13 +25,10 @@ if PID_FILE.exists():
|
|||||||
# Expresión regular para capturar la URL de uvicorn
|
# Expresión regular para capturar la URL de uvicorn
|
||||||
URL_REGEX = re.compile(r"Uvicorn running on (?P<url>http://[^\s]+)")
|
URL_REGEX = re.compile(r"Uvicorn running on (?P<url>http://[^\s]+)")
|
||||||
|
|
||||||
# Lanzar cada script y detectar su URL
|
# Ejecutar todos los archivos .py en MCPS_DIR
|
||||||
for relative_path in scripts:
|
for script_path in sorted(MCPS_DIR.glob("*.py")):
|
||||||
script_path = (ROOT / relative_path).resolve()
|
if script_path.name.startswith("__"):
|
||||||
|
continue # Evita archivos como __init__.py
|
||||||
if not script_path.exists():
|
|
||||||
print(f"⚠ Script no encontrado: {script_path}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
proc = subprocess.Popen(
|
proc = subprocess.Popen(
|
||||||
@@ -57,13 +50,13 @@ for relative_path in scripts:
|
|||||||
with open(PID_FILE, "a", encoding="utf-8") as f:
|
with open(PID_FILE, "a", encoding="utf-8") as f:
|
||||||
f.write(str(proc.pid) + "\n")
|
f.write(str(proc.pid) + "\n")
|
||||||
|
|
||||||
print(f"→ {relative_path} lanzado con PID {proc.pid}")
|
print(f"→ {script_path.relative_to(ROOT)} lanzado con PID {proc.pid}")
|
||||||
if url:
|
if url:
|
||||||
print(f" 🛰️ Servidor accesible en: {url}")
|
print(f" 🛰️ Servidor accesible en: {url}")
|
||||||
else:
|
else:
|
||||||
print(" ⚠ No se detectó URL del servidor.")
|
print(" ⚠ No se detectó URL del servidor.")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"❌ Error al lanzar {relative_path}:\n{e}")
|
print(f"❌ Error al lanzar {script_path.name}:\n{e}")
|
||||||
|
|
||||||
print("✅ Todos los scripts han sido procesados. PIDs guardados en mcps.pid.")
|
print("✅ Todos los scripts han sido procesados. PIDs guardados en mcps.pid.")
|
||||||
Reference in New Issue
Block a user