250 lines
8.7 KiB
Python
250 lines
8.7 KiB
Python
"""CLI interactiva para conversar con los agentes registrados."""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
from pprint import pformat
|
||
from typing import Any, Dict, List
|
||
|
||
from dotenv import load_dotenv
|
||
|
||
from agentes import AGENT_REGISTRY, create_agent
|
||
from LokiLogger import LokiLogger
|
||
from mcp_wrapper import close_mcp_tools
|
||
|
||
# Logger reutilizado en toda la sesión
|
||
logger = LokiLogger(
|
||
service_name="agente_kanboard",
|
||
add_labels={"env": "local", "agent": "CLI"},
|
||
)
|
||
|
||
|
||
def _make_json_safe(value: Any) -> Any:
|
||
if isinstance(value, dict):
|
||
return {k: _make_json_safe(v) for k, v in value.items()}
|
||
if isinstance(value, list):
|
||
return [_make_json_safe(v) for v in value]
|
||
if hasattr(value, "to_dict"):
|
||
return _make_json_safe(value.to_dict())
|
||
if hasattr(value, "model_dump"):
|
||
return _make_json_safe(value.model_dump())
|
||
if hasattr(value, "dict"):
|
||
return _make_json_safe(value.dict())
|
||
if isinstance(value, (str, int, float, bool)) or value is None:
|
||
return value
|
||
return str(value)
|
||
|
||
|
||
def _pretty_print(title: str, payload: Any) -> None:
|
||
safe_payload = _make_json_safe(payload)
|
||
print(f"\n{title}")
|
||
print("=" * len(title))
|
||
print(pformat(safe_payload, width=100, compact=False, sort_dicts=False))
|
||
|
||
|
||
def _announce_session(context: Dict[str, Any]) -> None:
|
||
summary = {
|
||
"agente": context.get("agent_name"),
|
||
"usuario": context.get("user_id"),
|
||
"sesion": context.get("session_id"),
|
||
"herramientas_locales": context.get("local_tools", []),
|
||
"servidores_mcp_activos": context.get("active_servers", []),
|
||
"herramientas_mcp": context.get("server_tool_map", {}),
|
||
"base_datos": context.get("db_file"),
|
||
"reanudar_memoria": context.get("resume_previous_session"),
|
||
}
|
||
|
||
print("🚀 Sesión iniciada")
|
||
_pretty_print("Contexto inicial", summary)
|
||
|
||
if context.get("db_file"):
|
||
print(f"\n🗄️ Memoria persistente: {context['db_file']}")
|
||
|
||
if context.get("active_servers"):
|
||
print("\n🔗 MCP habilitado. Servidores activos detectados.")
|
||
else:
|
||
print("\n⚠️ MCP no tiene servidores activos configurados.")
|
||
|
||
print("\nEscribe 'salir' para terminar la conversación.")
|
||
|
||
|
||
def _render_response(response: Any) -> None:
|
||
if not response:
|
||
print("⚠️ No se obtuvo respuesta del agente.")
|
||
return
|
||
|
||
agent_response_fields = {
|
||
"run_id": response.run_id,
|
||
"agent_id": response.agent_id,
|
||
"session_id": response.session_id,
|
||
"model": response.model,
|
||
"model_provider": response.model_provider,
|
||
"status": str(response.status),
|
||
"metrics": {
|
||
"input_tokens": response.metrics.input_tokens if response.metrics else 0,
|
||
"output_tokens": response.metrics.output_tokens if response.metrics else 0,
|
||
"total_tokens": response.metrics.total_tokens if response.metrics else 0,
|
||
"duration": response.metrics.duration if response.metrics else 0,
|
||
"time_to_first_token": response.metrics.time_to_first_token if response.metrics else 0,
|
||
},
|
||
"tools_invocadas": [tool.tool_name for tool in response.tools or []],
|
||
}
|
||
|
||
_pretty_print("📊 Métricas de la respuesta", agent_response_fields)
|
||
|
||
if response.tools:
|
||
herramientas = []
|
||
for tool_call in response.tools:
|
||
herramientas.append(
|
||
{
|
||
"tool_call_id": tool_call.tool_call_id,
|
||
"tool_name": tool_call.tool_name,
|
||
"tool_args": _make_json_safe(tool_call.tool_args),
|
||
"result": _make_json_safe(tool_call.result),
|
||
"error": tool_call.tool_call_error,
|
||
"metrics": (
|
||
tool_call.metrics.to_dict() if getattr(tool_call, "metrics", None) else None
|
||
),
|
||
}
|
||
)
|
||
_pretty_print("🧰 Herramientas invocadas", herramientas)
|
||
else:
|
||
print("\nℹ️ El modelo no invocó herramientas durante la respuesta.")
|
||
|
||
if response.content:
|
||
print("\n💬 Respuesta del agente")
|
||
print("----------------------")
|
||
print(response.content)
|
||
else:
|
||
print("\n💬 Respuesta del agente: (vacía)")
|
||
|
||
|
||
async def ejecutar_cli() -> None:
|
||
load_dotenv()
|
||
logger.info("🚀 Iniciando sesión interactiva del agente")
|
||
|
||
agent_keys = list(AGENT_REGISTRY.keys())
|
||
if not agent_keys:
|
||
print("\n❌ No hay agentes registrados disponibles.")
|
||
logger.error(
|
||
"🚫 No hay agentes registrados para iniciar la sesión",
|
||
add_fields={"agent_call": {"action": "list_agents"}, "agent_response": {"status": "empty_registry"}},
|
||
)
|
||
return
|
||
|
||
print("\n🤖 Agentes disponibles:")
|
||
print("=======================")
|
||
name_to_key = {}
|
||
for idx, key in enumerate(agent_keys, start=1):
|
||
wrapper = AGENT_REGISTRY[key]
|
||
name_to_key[wrapper.name.lower()] = key
|
||
print(f"[{idx}] {wrapper.name} → {wrapper.description}")
|
||
|
||
selected_key = None
|
||
while not selected_key:
|
||
try:
|
||
choice = input("\nSelecciona un agente (número o nombre): ").strip().lower()
|
||
except KeyboardInterrupt:
|
||
print("\n\n⛔ Selección interrumpida por el usuario.")
|
||
logger.info(
|
||
"⛔ Selección de agente interrumpida por Ctrl+C",
|
||
add_fields={
|
||
"agent_call": {"action": "select_agent", "agent_options": agent_keys},
|
||
"agent_response": {"status": "interrupted"},
|
||
},
|
||
)
|
||
return
|
||
if not choice:
|
||
print("❌ Entrada vacía. Intenta nuevamente.")
|
||
continue
|
||
|
||
if choice.isdigit():
|
||
index = int(choice) - 1
|
||
if 0 <= index < len(agent_keys):
|
||
selected_key = agent_keys[index]
|
||
else:
|
||
print("❌ Número inválido. Intenta de nuevo.")
|
||
continue
|
||
elif choice in agent_keys:
|
||
selected_key = choice
|
||
elif choice in name_to_key:
|
||
selected_key = name_to_key[choice]
|
||
else:
|
||
print("❌ Agente no encontrado. Intenta nuevamente.")
|
||
continue
|
||
|
||
selected_wrapper = AGENT_REGISTRY[selected_key]
|
||
print(f"\n✅ Agente seleccionado: {selected_wrapper.name}\n")
|
||
logger.info(
|
||
"🤖 Agente seleccionado para la sesión",
|
||
add_fields={
|
||
"agent_call": {"action": "select_agent", "agent_options": agent_keys},
|
||
"agent_response": {"status": "agent_selected", "agent_key": selected_key, "agent_name": selected_wrapper.name},
|
||
},
|
||
)
|
||
|
||
try:
|
||
agent, context = await create_agent(logger, agent=selected_key)
|
||
except Exception as error: # pragma: no cover - salida controlada
|
||
logger.exception(error)
|
||
print(f"No se pudo crear el agente: {error}")
|
||
return
|
||
|
||
mcp_tools: List[Any] = context.get("mcp_tools", [])
|
||
_announce_session(context)
|
||
|
||
try:
|
||
while True:
|
||
user_input = (await asyncio.to_thread(input, "\nTú → ")).strip()
|
||
if not user_input:
|
||
continue
|
||
|
||
if user_input.lower() in {"salir", "exit", "quit"}:
|
||
logger.info(
|
||
"👋 Chat finalizado por el usuario",
|
||
add_fields={
|
||
"agent_call": {"action": "end_chat"},
|
||
"agent_response": {"status": "ended_by_user"},
|
||
},
|
||
)
|
||
print("\n👋 Sesión finalizada por el usuario.")
|
||
break
|
||
|
||
logger.info(
|
||
"📨 Mensaje recibido del usuario",
|
||
add_fields={
|
||
"agent_call": {
|
||
"action": "user_message",
|
||
"content": user_input,
|
||
}
|
||
},
|
||
)
|
||
|
||
try:
|
||
response = await agent.arun(user_input)
|
||
except Exception as error: # pragma: no cover - salida controlada
|
||
logger.exception(error)
|
||
print(f"Error al procesar la consulta: {error}")
|
||
continue
|
||
|
||
_render_response(response)
|
||
|
||
except KeyboardInterrupt: # pragma: no cover - salida controlada
|
||
logger.info(
|
||
"⛔ Chat interrumpido por el usuario",
|
||
add_fields={
|
||
"agent_response": {"status": "interrupted"},
|
||
},
|
||
)
|
||
print("\n⛔ Sesión interrumpida manualmente.")
|
||
finally:
|
||
await close_mcp_tools(mcp_tools, logger=logger)
|
||
logger.info("✅ Sesión del agente finalizada")
|
||
|
||
|
||
def main() -> None:
|
||
asyncio.run(ejecutar_cli())
|
||
|
||
|
||
if __name__ == "__main__": # pragma: no cover - ejecución directa
|
||
main()
|