Files
kanboard/ejecucion_cli.py

250 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""CLI interactiva para conversar con los agentes registrados."""
from __future__ import annotations
import asyncio
from pprint import pformat
from typing import Any, Dict, List
from dotenv import load_dotenv
from agentes import AGENT_REGISTRY, create_agent
from LokiLogger import LokiLogger
from mcp_wrapper import close_mcp_tools
# Logger reutilizado en toda la sesión
logger = LokiLogger(
service_name="agente_kanboard",
add_labels={"env": "local", "agent": "CLI"},
)
def _make_json_safe(value: Any) -> Any:
if isinstance(value, dict):
return {k: _make_json_safe(v) for k, v in value.items()}
if isinstance(value, list):
return [_make_json_safe(v) for v in value]
if hasattr(value, "to_dict"):
return _make_json_safe(value.to_dict())
if hasattr(value, "model_dump"):
return _make_json_safe(value.model_dump())
if hasattr(value, "dict"):
return _make_json_safe(value.dict())
if isinstance(value, (str, int, float, bool)) or value is None:
return value
return str(value)
def _pretty_print(title: str, payload: Any) -> None:
safe_payload = _make_json_safe(payload)
print(f"\n{title}")
print("=" * len(title))
print(pformat(safe_payload, width=100, compact=False, sort_dicts=False))
def _announce_session(context: Dict[str, Any]) -> None:
summary = {
"agente": context.get("agent_name"),
"usuario": context.get("user_id"),
"sesion": context.get("session_id"),
"herramientas_locales": context.get("local_tools", []),
"servidores_mcp_activos": context.get("active_servers", []),
"herramientas_mcp": context.get("server_tool_map", {}),
"base_datos": context.get("db_file"),
"reanudar_memoria": context.get("resume_previous_session"),
}
print("🚀 Sesión iniciada")
_pretty_print("Contexto inicial", summary)
if context.get("db_file"):
print(f"\n🗄️ Memoria persistente: {context['db_file']}")
if context.get("active_servers"):
print("\n🔗 MCP habilitado. Servidores activos detectados.")
else:
print("\n⚠️ MCP no tiene servidores activos configurados.")
print("\nEscribe 'salir' para terminar la conversación.")
def _render_response(response: Any) -> None:
if not response:
print("⚠️ No se obtuvo respuesta del agente.")
return
agent_response_fields = {
"run_id": response.run_id,
"agent_id": response.agent_id,
"session_id": response.session_id,
"model": response.model,
"model_provider": response.model_provider,
"status": str(response.status),
"metrics": {
"input_tokens": response.metrics.input_tokens if response.metrics else 0,
"output_tokens": response.metrics.output_tokens if response.metrics else 0,
"total_tokens": response.metrics.total_tokens if response.metrics else 0,
"duration": response.metrics.duration if response.metrics else 0,
"time_to_first_token": response.metrics.time_to_first_token if response.metrics else 0,
},
"tools_invocadas": [tool.tool_name for tool in response.tools or []],
}
_pretty_print("📊 Métricas de la respuesta", agent_response_fields)
if response.tools:
herramientas = []
for tool_call in response.tools:
herramientas.append(
{
"tool_call_id": tool_call.tool_call_id,
"tool_name": tool_call.tool_name,
"tool_args": _make_json_safe(tool_call.tool_args),
"result": _make_json_safe(tool_call.result),
"error": tool_call.tool_call_error,
"metrics": (
tool_call.metrics.to_dict() if getattr(tool_call, "metrics", None) else None
),
}
)
_pretty_print("🧰 Herramientas invocadas", herramientas)
else:
print("\n️ El modelo no invocó herramientas durante la respuesta.")
if response.content:
print("\n💬 Respuesta del agente")
print("----------------------")
print(response.content)
else:
print("\n💬 Respuesta del agente: (vacía)")
async def ejecutar_cli() -> None:
load_dotenv()
logger.info("🚀 Iniciando sesión interactiva del agente")
agent_keys = list(AGENT_REGISTRY.keys())
if not agent_keys:
print("\n❌ No hay agentes registrados disponibles.")
logger.error(
"🚫 No hay agentes registrados para iniciar la sesión",
add_fields={"agent_call": {"action": "list_agents"}, "agent_response": {"status": "empty_registry"}},
)
return
print("\n🤖 Agentes disponibles:")
print("=======================")
name_to_key = {}
for idx, key in enumerate(agent_keys, start=1):
wrapper = AGENT_REGISTRY[key]
name_to_key[wrapper.name.lower()] = key
print(f"[{idx}] {wrapper.name}{wrapper.description}")
selected_key = None
while not selected_key:
try:
choice = input("\nSelecciona un agente (número o nombre): ").strip().lower()
except KeyboardInterrupt:
print("\n\n⛔ Selección interrumpida por el usuario.")
logger.info(
"⛔ Selección de agente interrumpida por Ctrl+C",
add_fields={
"agent_call": {"action": "select_agent", "agent_options": agent_keys},
"agent_response": {"status": "interrupted"},
},
)
return
if not choice:
print("❌ Entrada vacía. Intenta nuevamente.")
continue
if choice.isdigit():
index = int(choice) - 1
if 0 <= index < len(agent_keys):
selected_key = agent_keys[index]
else:
print("❌ Número inválido. Intenta de nuevo.")
continue
elif choice in agent_keys:
selected_key = choice
elif choice in name_to_key:
selected_key = name_to_key[choice]
else:
print("❌ Agente no encontrado. Intenta nuevamente.")
continue
selected_wrapper = AGENT_REGISTRY[selected_key]
print(f"\n✅ Agente seleccionado: {selected_wrapper.name}\n")
logger.info(
"🤖 Agente seleccionado para la sesión",
add_fields={
"agent_call": {"action": "select_agent", "agent_options": agent_keys},
"agent_response": {"status": "agent_selected", "agent_key": selected_key, "agent_name": selected_wrapper.name},
},
)
try:
agent, context = await create_agent(logger, agent=selected_key)
except Exception as error: # pragma: no cover - salida controlada
logger.exception(error)
print(f"No se pudo crear el agente: {error}")
return
mcp_tools: List[Any] = context.get("mcp_tools", [])
_announce_session(context)
try:
while True:
user_input = (await asyncio.to_thread(input, "\nTú → ")).strip()
if not user_input:
continue
if user_input.lower() in {"salir", "exit", "quit"}:
logger.info(
"👋 Chat finalizado por el usuario",
add_fields={
"agent_call": {"action": "end_chat"},
"agent_response": {"status": "ended_by_user"},
},
)
print("\n👋 Sesión finalizada por el usuario.")
break
logger.info(
"📨 Mensaje recibido del usuario",
add_fields={
"agent_call": {
"action": "user_message",
"content": user_input,
}
},
)
try:
response = await agent.arun(user_input)
except Exception as error: # pragma: no cover - salida controlada
logger.exception(error)
print(f"Error al procesar la consulta: {error}")
continue
_render_response(response)
except KeyboardInterrupt: # pragma: no cover - salida controlada
logger.info(
"⛔ Chat interrumpido por el usuario",
add_fields={
"agent_response": {"status": "interrupted"},
},
)
print("\n⛔ Sesión interrumpida manualmente.")
finally:
await close_mcp_tools(mcp_tools, logger=logger)
logger.info("✅ Sesión del agente finalizada")
def main() -> None:
asyncio.run(ejecutar_cli())
if __name__ == "__main__": # pragma: no cover - ejecución directa
main()