Actualizacion para mcp

This commit is contained in:
2025-05-16 02:12:33 +02:00
parent 95c1762ca7
commit 9db2f70009
13 changed files with 597 additions and 247 deletions
+9
View File
@@ -0,0 +1,9 @@
import asyncio
from fastmcp.client import Client
async def main():
async with Client("http://127.0.0.1:4300") as client:
is_alive = await client.ping()
print("Ping exitoso:", is_alive)
asyncio.run(main())
+29
View File
@@ -0,0 +1,29 @@
# client.py
import asyncio
from src.Llms.MCPs.Mcp_client import MCPClient
from src.Llms.MCPs.Http_mcp_server import HttpMCPServer
async def main():
client = MCPClient()
client.register_server(HttpMCPServer(
name="tools",
path="IGNORED_IN_CLIENT", # no importa aquí
host="127.0.0.1",
port=4300,
path_http="/tools"
))
await client.connect_all()
result = await client.call_tool({
"server": "tools",
"tool": "get_hostname",
"input": {}
})
print("RESULT:", result)
await client.disconnect_all()
if __name__ == "__main__":
asyncio.run(main())
+100 -42
View File
@@ -7,53 +7,111 @@ from src.ConexionApis.OpenAi_conexion import OpenAICliente
from src.Llms.Modelos.Openai_model import ModeloOpenAI from src.Llms.Modelos.Openai_model import ModeloOpenAI
from src.Llms.Agente import AgenteAI from src.Llms.Agente import AgenteAI
from src.Llms.Memory.postgres_MemoryConv import MemoryConvPostgres from src.Llms.Memory.postgres_MemoryConv import MemoryConvPostgres
from fastmcp.client.transports import StreamableHttpTransport
from fastmcp.client import Client
from src.Llms.MCPs.McpClient import MCPClient # ya tienes esta clase
from src.Llms.MCPs.McpClient_Registry import ClientRegistry # o ajusta según tu estructura
import asyncio
async def main():
# Usar Credencial openai
conexion_admin = PostgresConexion(db_credencial)
repo = OpenAICredencialRepo(conexion_admin)
credencial_openai = repo.get_by_id("OPAK20250513-61b29978b7604031014")
cliente = OpenAICliente(credencial_openai)
conexion_admin = PostgresConexion(db_credencial) # crea el modelo (openai)
repo = OpenAICredencialRepo(conexion_admin) modelo = ModeloOpenAI(
credencial_openai = repo.get_by_id("OPAK20250513-61b29978b7604031014") cliente=cliente,
model="gpt-4o",
cliente = OpenAICliente(credencial_openai) temperature=1,
top_p=1.0
modelo = ModeloOpenAI(
cliente=cliente,
model="gpt-4o",
temperature=1,
top_p=1.0
)
memoria = MemoryConvPostgres(
credencial=db_credencial,
nombre_tabla="memoria_conversacion_pruebas",
k=10
)
agente2 = AgenteAI(
modelo=modelo,
nombre="Experto en Astronomía",
descripcion="Un experto en astronomía que responde preguntas sobre el universo.",
system_prompt="Actúa como un experto en astronomía y astrofísica con experiencia académica y práctica en observación astronómica, física estelar, cosmología, mecánica orbital y análisis de datos astronómicos. Cuando respondas, utiliza lenguaje técnico pero accesible para alguien con conocimientos intermedios en física y matemáticas. Siempre que sea posible, incluye explicaciones detalladas, ejemplos numéricos y referencias a teorías o descubrimientos relevantes (por ejemplo, relatividad general, evolución estelar, espectroscopía, etc.). No simplifiques en exceso. Si la pregunta tiene múltiples dimensiones (como observacional y teórica), aborda todas. ¿Estás listo para empezar?",
rol="astronomo",
max_iterations=5,
memoria=memoria,
objetivos=["Responder preguntas sobre astronomía y astrofísica", "Proporcionar explicaciones detalladas y ejemplos numéricos"],
)
async def probar_interaccion_stream():
print("Respuesta en streaming:\n")
# Paso 1: espera la corutina para obtener el generador
respuesta_gen = await agente2.interactuar_en_bucle(
"¿Hacia qué va orbitando cada astro del espacio? responde jerárquicamente",
stream=True
) )
# Paso 2: itera sobre el generador # Le otorga memoria
async for token in respuesta_gen:
print(token, end="", flush=True) memoria = MemoryConvPostgres(
credencial=db_credencial,
nombre_tabla="memoria_conversacion_pruebas",
k=10
)
# Cargamos las herramientas
herramientas = MCPClient.from_http(
name="tools",
url="http://127.0.0.1:4300/tools/"
)
math = MCPClient.from_http(
name="math",
url="http://127.0.0.1:4200/math/"
)
# Las añadimos al registro de herramientas
registry = ClientRegistry()
registry.add("tools", herramientas)
registry.add("math", math)
# --- INICIALIZACIÓN DEL AGENTE ---
agente2 = AgenteAI(
modelo=modelo,
nombre="Asistente Inteligente",
descripcion="Un asistente conversacional versátil, capaz de resolver problemas, acceder a herramientas y proporcionar respuestas útiles.",
system_prompt=(
"Eres un asistente inteligente que ayuda al usuario a resolver tareas, responder preguntas y usar herramientas disponibles si es necesario. "
"Debes razonar paso a paso, y si se detecta que una herramienta MCP es útil, actúa generando el bloque MCP apropiado sin dar más explicaciones. "
"Siempre estructura tus respuestas con claridad, y termina con <END> cuando creas haber completado la tarea."
),
rol="asistente",
objetivos=[
"Resolver tareas del usuario",
"Usar herramientas MCP si es útil",
"Responder de forma clara y útil"
],
# max_iterations=3,
# memoria=memoria,
mcp=registry # ← ✅ Integración del cliente MCP
)
# --- FUNCIÓN DE EJECUCIÓN ---
async def probar_interaccion_stream():
# # 🔌 Conectar a los servidores MCP registrados
# await mcp_client.connect_all()
print("Respuesta en streaming:\n")
respuesta_gen = await agente2.interactuar_en_bucle(
"¿Cuál es mi nombre de usuario en este sistema?",
stream=True
)
async for token in respuesta_gen:
print(token, end="", flush=True)
await probar_interaccion_stream()
# Ejecutar
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(probar_interaccion_stream())
+19 -64
View File
@@ -1,74 +1,29 @@
import asyncio
from llms.MCPs.MCPStdioServer import MCPStdioServer
import os
async def main():
prueba = MCPStdioServer(
name="prueba_server_mcp",
command="C:/Users/lucas/Desktop/mcps/.venv/Scripts/python.exe",
args=["C:/Users/lucas/Desktop/mcps/server_mcp_python/server_mcp.py"],
)
await prueba.start()
print("Herramientas:", prueba.get_tool_names())
await prueba.stop() # <- esto previene el error
if __name__ == "__main__":
# Asegura compatibilidad para subprocess en Windows
if os.name == "nt":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(main()) async def test_registry(registry: ClientRegistry):
tools = await registry.listar_tools_por_cliente()
prompts = await registry.listar_prompts_por_cliente()
resources = await registry.listar_resources_por_cliente()
print("\n🔧 Herramientas:", tools)
print("\n📋 Prompts:", prompts)
print("\n📂 Resources:", resources)
asyncio.run(test_registry(registry))
async def test_wrapper():
from src.ApiKeys.openai_apikey import OpenAICredencial # 2. Llamar a una herramienta de prueba
from src.ApiKeys.openai_apikey_mmr import OpenAICredencialRepo # Ajusta si está en otro módulo result = await herramientas.call_tool("generate_uuid")
from src.ConexionSql.Postgres_conexion import PostgresConexion print("\n🆔 UUID generado:", result[0].text) # Accedemos al contenido directamente
# 1. Crear instancia de conexión (asegúrate de configurar bien tu conexión en Base_conexion)
from entrypoint.init_db import db_credencial
conexion_admin = PostgresConexion(db_credencial)
# 3. Guardar la credencial en la base de datos
repo = OpenAICredencialRepo(conexion_admin)
credencial_openai = repo.get_by_id(1)
print(f"✅ Credencial: {credencial_openai.titulo}")
from src.ConexionApis.OpenAi_conexion import OpenAICliente
cliente = OpenAICliente(credencial_openai)
from llms.Modelos.Openai_model import ModeloOpenAI
modelo = ModeloOpenAI(
cliente=cliente,
model="gpt-4o",
temperature=1,
top_p=1.0
)
from llms.Agente import AgenteAI
agente_con_herramientas = AgenteAI( # asyncio.run(test_wrapper())
modelo=modelo,
nombre="Agente con herramientas",
descripcion="Un agente que puede usar herramientas",
system_prompt="Eres un asistente que puede usar herramientas para responder preguntas.",
rol="asistente",
objetivos=["Asistir al usuario en tareas complejas", "usar herramientas para obtener información adicional"]
# tools=
)
respuesta = agente_con_herramientas.interactuar(
prompt="Hola como estas?",
)
print(respuesta)
+128 -49
View File
@@ -1,6 +1,6 @@
from src.Llms.Modelos.Base_model import ModeloABC from src.Llms.Modelos.Base_model import ModeloABC
from src.Llms.Memory.Base_MemoryConv import MemoryConvABC from src.Llms.Memory.Base_MemoryConv import MemoryConvABC
from src.Llms.MCPs.McpClient_Registry import ClientRegistry
from datetime import datetime from datetime import datetime
from typing import Optional, List, Union, AsyncGenerator from typing import Optional, List, Union, AsyncGenerator
@@ -17,12 +17,11 @@ class AgenteAI:
max_iterations: int = 1, max_iterations: int = 1,
memoria: Optional[MemoryConvABC] = None, memoria: Optional[MemoryConvABC] = None,
version: str = "1.0.0", version: str = "1.0.0",
tools: Optional[List] = None, mcp: ClientRegistry = None,
output_schema: Optional[dict] = None, output_schema: Optional[dict] = None,
): ):
self.modelo = modelo self.modelo = modelo
self.memoria = memoria self.memoria = memoria
self.tools = tools or []
self.output_schema = output_schema self.output_schema = output_schema
self.nombre = nombre self.nombre = nombre
@@ -36,6 +35,9 @@ class AgenteAI:
self.created_at = datetime.now() self.created_at = datetime.now()
self.updated_at = self.created_at self.updated_at = self.created_at
self.numero_interacciones = 0 self.numero_interacciones = 0
self.mcp = mcp # <-- Aquí guardamos el registry
def actualizar_configuracion(self, **kwargs): def actualizar_configuracion(self, **kwargs):
for clave, valor in kwargs.items(): for clave, valor in kwargs.items():
@@ -43,56 +45,105 @@ class AgenteAI:
setattr(self, clave, valor) setattr(self, clave, valor)
self.updated_at = datetime.now() self.updated_at = datetime.now()
@property @property
def full_system_prompt(self) -> str: async def full_system_prompt(self) -> str:
partes = [ tools_str = await self._obtener_herramientas_disponibles_str()
f"Tu nombre es: {self.nombre}", return f"""
f"Tu descripción: {self.descripcion}", Eres un agente conversacional con acceso a herramientas MCP (Model Context Protocol).
f"Tu Rol: {self.rol}",
f"Tus Objetivos: {', '.join(self.objetivos)}",
""
]
herramientas = self._obtener_descripcion_tools() Tu comportamiento sigue este flujo:
if herramientas:
partes.append("Estas son tus herramientas disponibles:")
partes.extend(herramientas)
partes.append(
"Cuando consideres necesario, utiliza las herramientas disponibles "
"para responder de manera más precisa o realizar tareas específicas. "
"Indica claramente qué herramienta estás utilizando y por qué."
)
partes.append(self.system_prompt) 1. **Piensa** para razonar tu decisión.
2. **Decide** si:
- puedes responder tú mismo,
- necesitas más información del usuario,
- o necesitas una herramienta MCP.
3. **Actúa**:
- Cuando uses MCP, termina **solo** con un bloque de código MCP y **nada más**.
- Ten en cuenta EXACTAMENTE los parámetros especificados.
- **No expliques, no hables después del bloque. Termina tu turno.**
if self.output_schema: ---
partes.append("SIEMPRE formatea la respuesta final siguiendo estrictamente el siguiente esquema JSON:")
partes.append(f"```json\n{self.output_schema}\n```")
return "\n".join(partes) # Formato MCP
def _obtener_descripcion_tools(self) -> List[str]: ```mcp
descripciones = [] {{
if not hasattr(self, "mcp_servers"): "tool": "<nombre_de_la_herramienta>",
return descripciones "input": {{
"clave": "valor"
}}
}}
Reglas clave:
for server in self.mcp_servers: Razonas antes de actuar.
if hasattr(server, "tools") and server.tools:
for tool in server.tools: Nunca hables después de un bloque MCP.
if isinstance(tool, str):
descripciones.append(f"- {tool}: [sin descripción]") No combines respuestas y herramientas.
elif isinstance(tool, dict):
nombre = tool.get("name", "¿?") Piensa. Decide. Actúa.
descripcion = tool.get("description", "[sin descripción]")
descripciones.append(f"- {nombre}: {descripcion}") Herramientas disponibles para usar con MCP:
elif hasattr(tool, "name"): {tools_str}
descripcion = getattr(tool, "description", "[sin descripción]")
descripciones.append(f"- {tool.name}: {descripcion}") """.strip()
return descripciones
# Conseguir las herramientas disponibles
async def _obtener_herramientas_disponibles_str(self) -> str:
if not self.mcp:
return "No se han definido herramientas disponibles."
herramientas = []
tools_por_cliente = await self.mcp.listar_tools_por_cliente()
for name, tools in tools_por_cliente.items():
if not tools:
continue
herramientas.append(f"\n🔌 Cliente: {name}")
for tool in tools:
props = tool.inputSchema.get("properties", {})
parametros = "\n ".join(f"- {k} ({v.get('type', '?')})" for k, v in props.items())
herramientas.append(f"""Nombre: {tool.name}
Descripción: {tool.description}
Parámetros:
{parametros}
""")
return "\n".join(herramientas) or "No hay herramientas disponibles actualmente."
# Formatear prompt para agentes
def _formatear_prompt(self, mensajes: List[dict]) -> str: def _formatear_prompt(self, mensajes: List[dict]) -> str:
return "\n".join([f"{msg['role']}: {msg['content']}" for msg in mensajes]) return "\n".join([f"{msg['role']}: {msg['content']}" for msg in mensajes])
###----------- Funcion para interactuar
async def interactuar(self, prompt: str, stream: bool = False) -> Union[str, AsyncGenerator[str, None]]: async def interactuar(self, prompt: str, stream: bool = False) -> Union[str, AsyncGenerator[str, None]]:
historial = self.memoria.cargar_historial_chat() if self.memoria else [] historial = self.memoria.cargar_historial_chat() if self.memoria else []
contexto = historial + [{"role": "user", "content": prompt}] contexto = historial + [{"role": "user", "content": prompt}]
@@ -100,12 +151,11 @@ class AgenteAI:
respuesta = await self.modelo.responder( respuesta = await self.modelo.responder(
prompt=prompt_final, prompt=prompt_final,
system_prompt=self.full_system_prompt, system_prompt=await self.full_system_prompt, # ✅ correcto
stream=stream stream=stream
) )
if stream: if stream:
# stream es un generador asincrónico
async def wrapper(): async def wrapper():
buffer_respuesta = "" buffer_respuesta = ""
async for token in respuesta: async for token in respuesta:
@@ -125,31 +175,45 @@ class AgenteAI:
self.updated_at = datetime.now() self.updated_at = datetime.now()
return respuesta return respuesta
###----------- Funcion para interactuar en bucle
async def interactuar_en_bucle(self, prompt: str, stream: bool = False) -> Union[List[str], AsyncGenerator[str, None]]: async def interactuar_en_bucle(self, prompt: str, stream: bool = False) -> Union[List[str], AsyncGenerator[str, None]]:
print("🚀 [interactuar_en_bucle] Iniciando interacción")
historial = self.memoria.cargar_historial_chat() if self.memoria else [] historial = self.memoria.cargar_historial_chat() if self.memoria else []
print(f"📜 [interactuar_en_bucle] Historial cargado: {historial}")
respuestas = [] if not stream else None respuestas = [] if not stream else None
respuesta_anterior = None respuesta_anterior = None
iteration = 0 iteration = 0
prompt_original = prompt.strip() prompt_original = prompt.strip()
print(f"✏️ [interactuar_en_bucle] Prompt original: {prompt_original}")
async def generador(): async def generador():
nonlocal iteration, respuesta_anterior nonlocal iteration, respuesta_anterior
prompt_actual = prompt_original prompt_actual = prompt_original
while self.max_iterations == 0 or iteration < self.max_iterations: while self.max_iterations == 0 or iteration < self.max_iterations:
print(f"\n🔁 [generador] Iteración: {iteration}")
if iteration == 0: if iteration == 0:
prompt_actual += ( prompt_actual += (
"\n\nIMPORTANTE:\n" "\n\nIMPORTANTE:\n"
"Si al revisar tu última respuesta y mi pregunta inicial consideras que has terminado, " "Si al revisar tu última respuesta y mi pregunta inicial consideras que has terminado, "
"di alguna de estas frases: <FIN>" "di alguna de estas frases: <END>"
) )
else: else:
prompt_actual = ( prompt_actual = (
f"Esta es la pregunta original:\n{prompt_original}\n\n" f"Esta es la pregunta original:\n{prompt_original}\n\n"
f"Esto fue lo último que dijiste:\n{respuesta_anterior}\n" f"Esto fue lo último que dijiste:\n{respuesta_anterior}\n"
"\n\nIMPORTANTE:\n" "\n\nIMPORTANTE:\n"
"Si al revisar tu última respuesta y mi pregunta inicial consideras que has terminado, " "Si al revisar tu última respuesta y mi pregunta inicial consideras que has terminado, "
"di alguna de estas frases: <FIN>" "di alguna de estas frases: <END>"
"En caso contrario, responde a la pregunta original " "En caso contrario, responde a la pregunta original "
"y añade información relevante que no hayas mencionado antes.\n\n" "y añade información relevante que no hayas mencionado antes.\n\n"
) )
@@ -157,30 +221,40 @@ class AgenteAI:
contexto = historial + [{"role": "user", "content": prompt_actual}] contexto = historial + [{"role": "user", "content": prompt_actual}]
prompt_final = self._formatear_prompt(contexto) prompt_final = self._formatear_prompt(contexto)
print(f"📨 [generador] Prompt final enviado al modelo:\n{prompt_final}")
print("🤖 [generador] Esperando respuesta del modelo...")
respuesta = await self.modelo.responder( respuesta = await self.modelo.responder(
prompt=prompt_final, prompt=prompt_final,
system_prompt=self.full_system_prompt, system_prompt=await self.full_system_prompt,
stream=stream stream=stream
) )
print("✅ [generador] Respuesta recibida")
if stream: if stream:
buffer_respuesta = "" buffer_respuesta = ""
async for token in respuesta: async for token in respuesta:
buffer_respuesta += token buffer_respuesta += token
# print(f"🔹 [stream] Token: {token}")
yield token yield token
respuesta_anterior = buffer_respuesta respuesta_anterior = buffer_respuesta
# print(f"📦 [stream] Respuesta completa:\n{respuesta_anterior}")
else: else:
respuestas.append(respuesta) respuestas.append(respuesta)
respuesta_anterior = respuesta respuesta_anterior = respuesta
# print(f"📦 [generador] Respuesta completa:\n{respuesta_anterior}")
if self.memoria: if self.memoria:
print("💾 [memoria] Guardando turno en la memoria...")
self.memoria.guardar_turno("user", prompt_actual) self.memoria.guardar_turno("user", prompt_actual)
self.memoria.guardar_turno("assistant", respuesta_anterior) self.memoria.guardar_turno("assistant", respuesta_anterior)
self.numero_interacciones += 1 self.numero_interacciones += 1
self.updated_at = datetime.now() self.updated_at = datetime.now()
print(f"📊 [generador] Interacción #{self.numero_interacciones} registrada")
if "<fin>" in respuesta_anterior.lower(): if "<end>" in respuesta_anterior.lower():
print("🛑 [generador] Detectado <end>. Terminando bucle.")
break break
iteration += 1 iteration += 1
@@ -188,6 +262,11 @@ class AgenteAI:
return generador() if stream else await generador_to_list(generador) return generador() if stream else await generador_to_list(generador)
# Helper para consumir generador asincrónico si no es stream # Helper para consumir generador asincrónico si no es stream
async def generador_to_list(gen: AsyncGenerator[str, None]) -> List[str]: async def generador_to_list(gen: AsyncGenerator[str, None]) -> List[str]:
buffer = "" buffer = ""
+96
View File
@@ -0,0 +1,96 @@
from pathlib import Path
from typing import Any, Optional, Union
from pydantic import AnyUrl
from fastmcp.client import Client
from fastmcp.client.transports import (
StreamableHttpTransport,
PythonStdioTransport,
ClientTransport,
)
from mcp.types import *
from fastmcp.exceptions import ClientError
class MCPClient:
def __init__(self, name: str, client: Client):
self.name = name
self.client = client
def __repr__(self) -> str:
return f"<ClientWrapper(name={self.name})>"
@classmethod
def from_http(cls, name: str, url: str | AnyUrl) -> "MCPClient":
transport = StreamableHttpTransport(url=str(url))
client = Client(transport=transport)
return cls(name=name, client=client)
@classmethod
def from_stdio(
cls,
name: str,
script_path: Union[str, Path],
args: Optional[list[str]] = None,
cwd: Optional[Union[str, Path]] = None,
env: Optional[dict[str, str]] = None,
) -> "MCPClient":
transport = PythonStdioTransport(
script_path=script_path, args=args, cwd=cwd, env=env
)
client = Client(transport=transport)
return cls(name=name, client=client)
def is_connected(self) -> bool:
return self.client.is_connected()
async def __aenter__(self):
await self.client.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.__aexit__(exc_type, exc_val, exc_tb)
# Delegación MCP
async def call_tool(
self, name: str, arguments: dict[str, Any] | None = None
) -> list[TextContent | ImageContent | EmbeddedResource]:
return await self.client.call_tool(name, arguments)
async def get_prompt(
self, name: str, arguments: dict[str, str] | None = None
) -> GetPromptResult:
return await self.client.get_prompt(name, arguments)
async def list_tools(self) -> list[Tool]:
return await self.client.list_tools()
async def list_prompts(self) -> list[Prompt]:
return await self.client.list_prompts()
async def list_resources(self) -> list[Resource]:
return await self.client.list_resources()
async def list_resource_templates(self) -> list[ResourceTemplate]:
return await self.client.list_resource_templates()
async def read_resource(
self, uri: AnyUrl | str
) -> list[TextResourceContents | BlobResourceContents]:
return await self.client.read_resource(uri)
async def complete(
self,
ref: ResourceReference | PromptReference,
argument: dict[str, str],
) -> Completion:
return await self.client.complete(ref, argument)
async def ping(self) -> bool:
return await self.client.ping()
async def set_logging_level(self, level: LoggingLevel) -> None:
return await self.client.set_logging_level(level)
async def send_roots_list_changed(self) -> None:
return await self.client.send_roots_list_changed()
+56
View File
@@ -0,0 +1,56 @@
from src.Llms.MCPs.McpClient import MCPClient
from typing import Any
class ClientRegistry:
def __init__(self):
self._clients: dict[str, MCPClient] = {}
def add(self, name: str, wrapper: MCPClient) -> None:
self._clients[name] = wrapper
def get(self, name: str) -> MCPClient:
if name not in self._clients:
raise KeyError(f"Cliente '{name}' no encontrado en el registro.")
return self._clients[name]
def all(self) -> dict[str, MCPClient]:
return self._clients
def list_names(self) -> list[str]:
return list(self._clients.keys())
def __contains__(self, name: str) -> bool:
return name in self._clients
async def listar_tools_por_cliente(self) -> dict[str, list[Any]]:
resultado = {}
for name, wrapper in self._clients.items():
try:
async with wrapper:
resultado[name] = await wrapper.list_tools()
except Exception as e:
print(f"[TOOLS] ❌ Error en '{name}': {e}")
resultado[name] = []
return resultado
async def listar_prompts_por_cliente(self) -> dict[str, list[Any]]:
resultado = {}
for name, wrapper in self._clients.items():
try:
async with wrapper:
resultado[name] = await wrapper.list_prompts()
except Exception as e:
print(f"[PROMPTS] ❌ Error en '{name}': {e}")
resultado[name] = []
return resultado
async def listar_resources_por_cliente(self) -> dict[str, list[Any]]:
resultado = {}
for name, wrapper in self._clients.items():
try:
async with wrapper:
resultado[name] = await wrapper.list_resources()
except Exception as e:
print(f"[RESOURCES] ❌ Error en '{name}': {e}")
resultado[name] = []
return resultado
@@ -1,15 +0,0 @@
from fastmcp import Client
import asyncio
async def main():
async with Client("http://127.0.0.1:8080/mcp") as client:
tools = await client.list_tools()
for tool in tools:
print(f"🔧 {tool.name} - {tool.description or 'sin descripción'}")
client.call_tool_mcp()
if __name__ == "__main__":
asyncio.run(main())
@@ -1,17 +0,0 @@
# cliente_prueba_http.py
import asyncio
from fastmcp import Client
async def main():
async with Client("http://127.0.0.1:8080/mcp") as client:
tools = await client.list_tools()
for tool in tools:
print(f"🔧 {tool.name} - {tool.description or 'sin descripción'}")
# ✅ llamar a la herramienta correctamente
result = await client.call_tool_mcp(name="esperar", arguments={"segundos": 5})
print(f"\nResultado de 'saludar': {result.content[0].text}")
if __name__ == "__main__":
asyncio.run(main())
+92
View File
@@ -0,0 +1,92 @@
from fastmcp import FastMCP
mcp = FastMCP()
@mcp.tool(description="Suma dos números enteros.")
def add(a: int, b: int) -> int:
return a + b
@mcp.tool(description="Resta dos números enteros.")
def subtract(a: int, b: int) -> int:
return a - b
@mcp.tool(description="Multiplica dos números enteros.")
def multiply(a: int, b: int) -> int:
return a * b
@mcp.tool(description="Divide dos números y devuelve el resultado flotante.")
def divide(a: float, b: float) -> float:
if b == 0:
raise ValueError("No se puede dividir entre cero.")
return a / b
@mcp.tool(description="Calcula el módulo de dos números enteros.")
def modulo(a: int, b: int) -> int:
return a % b
@mcp.tool(description="Concatena dos cadenas de texto.")
def concat(a: str, b: str) -> str:
return a + b
@mcp.tool(description="Devuelve la longitud de una cadena.")
def string_length(s: str) -> int:
return len(s)
@mcp.tool(description="Convierte una cadena a mayúsculas.")
def to_upper(s: str) -> str:
return s.upper()
@mcp.tool(description="Convierte una cadena a minúsculas.")
def to_lower(s: str) -> str:
return s.lower()
@mcp.tool(description="Devuelve la suma de todos los elementos en una lista de enteros.")
def sum_list(numbers: list[int]) -> int:
return sum(numbers)
@mcp.tool(description="Devuelve el valor máximo en una lista de enteros.")
def max_in_list(numbers: list[int]) -> int:
return max(numbers)
@mcp.tool(description="Verifica si un número es par.")
def is_even(n: int) -> bool:
return n % 2 == 0
@mcp.tool(description="Verifica si una cadena es un palíndromo.")
def is_palindrome(s: str) -> bool:
return s == s[::-1]
@mcp.tool(description="Calcula el factorial de un número entero positivo.")
def factorial(n: int) -> int:
if n < 0:
raise ValueError("El factorial no está definido para negativos.")
if n == 0:
return 1
result = 1
for i in range(1, n + 1):
result *= i
return result
@mcp.tool(description="Devuelve los primeros n números de Fibonacci.")
def fibonacci(n: int) -> list[int]:
if n <= 0:
return []
seq = [0, 1]
while len(seq) < n:
seq.append(seq[-1] + seq[-2])
return seq[:n]
@mcp.tool(description="Devuelve si un número es primo.")
def is_prime(n: int) -> bool:
if n <= 1:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
if __name__ == "__main__":
# mcp.run(transport="streamable-http", host="127.0.0.1", port=4200, path="/math")
mcp.run(transport="stdio")
-30
View File
@@ -1,30 +0,0 @@
# archivo: sse_server.py
from fastmcp.server import FastMCP
import asyncio
from fastmcp import Client
# Crear la instancia del servidor
server = FastMCP(
name="ServidorSSE",
instructions="Este servidor expone herramientas de prueba.",
)
# Herramienta 1: saludar
@server.tool(name="saludar", description="Saluda a una persona por su nombre.")
def saludar(nombre: str) -> str:
return f"¡Hola, {nombre}!"
# Herramienta 2: espera asíncrona
@server.tool(name="esperar", description="Espera N segundos y responde.")
async def esperar(segundos: int) -> str:
await asyncio.sleep(segundos)
return f"Esperé {segundos} segundos como me pediste."
# Punto de entrada para ejecutarlo por SSE
if __name__ == "__main__":
server.run(
transport="streamable-http", # <-- cambio aquí
host="0.0.0.0",
port=8080,
)
@@ -1,30 +0,0 @@
# archivo: sse_server.py
from fastmcp.server import FastMCP
import asyncio
from fastmcp import Client
# Crear la instancia del servidor
server = FastMCP(
name="ServidorSSE",
instructions="Este servidor expone herramientas de prueba.",
)
# Herramienta 1: saludar
@server.tool(name="saludar", description="Saluda a una persona por su nombre.")
def saludar(nombre: str) -> str:
return f"¡Hola, {nombre}!"
# Herramienta 2: espera asíncrona
@server.tool(name="esperar", description="Espera N segundos y responde.")
async def esperar(segundos: int) -> str:
await asyncio.sleep(segundos)
return f"Esperé {segundos} segundos como me pediste."
# Punto de entrada para ejecutarlo por SSE
if __name__ == "__main__":
server.run(
transport="streamable-http", # <-- cambio aquí
host="0.0.0.0",
port=8080,
)
+68
View File
@@ -0,0 +1,68 @@
from fastmcp import FastMCP
import uuid
import datetime
import socket
import platform
import os
mcp = FastMCP()
@mcp.tool(description="Genera un UUID versión 4.")
def generate_uuid() -> str:
return str(uuid.uuid4())
@mcp.tool(description="Devuelve la fecha y hora actuales en formato ISO 8601.")
def current_datetime() -> str:
return datetime.datetime.now().isoformat()
@mcp.tool(description="Devuelve solo la fecha actual.")
def current_date() -> str:
return datetime.date.today().isoformat()
@mcp.tool(description="Devuelve el nombre del host actual.")
def get_hostname() -> str:
return socket.gethostname()
@mcp.tool(description="Devuelve el sistema operativo actual.")
def get_os() -> str:
return platform.system()
@mcp.tool(description="Devuelve el nombre del usuario actual del sistema.")
def get_current_user() -> str:
return os.getlogin()
@mcp.tool(description="Invierte un valor booleano.")
def invert_boolean(flag: bool) -> bool:
return not flag
# @mcp.tool(description="Devuelve los archivos y carpetas del directorio actual.")
# def list_current_directory() -> list[str]:
# return os.listdir()
# @mcp.tool(description="Crea un archivo con un nombre dado.")
# def create_file(filename: str) -> str:
# with open(filename, "w") as f:
# f.write("")
# return f"Archivo '{filename}' creado."
# @mcp.tool(description="Lee el contenido de un archivo de texto dado.")
# def read_file(filename: str) -> str:
# with open(filename, "r") as f:
# return f.read()
# @mcp.tool(description="Escribe contenido a un archivo, sobrescribiéndolo.")
# def write_file(filename: str, content: str) -> str:
# with open(filename, "w") as f:
# f.write(content)
# return f"Contenido escrito en '{filename}'."
@mcp.tool(description="Devuelve el número de CPUs disponibles en el sistema.")
def get_cpu_count() -> int:
return os.cpu_count()
@mcp.tool(description="Devuelve el timestamp actual (UNIX).")
def current_timestamp() -> float:
return datetime.datetime.now().timestamp()
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="127.0.0.1", port=4300, path="/tools")