Files
kanboard/ejecucion_telegram.py
T
2025-11-06 10:03:06 +01:00

417 lines
15 KiB
Python

"""Bot de Telegram para conversar con el agente Kanboard."""
from __future__ import annotations
import asyncio
import html
import os
import json
from contextlib import suppress
from inspect import isawaitable
from typing import Any, Dict, List, Optional, Set
import requests
from agno.agent.agent import RunOutput
from agno.run.agent import ToolCallStartedEvent
from dotenv import load_dotenv
from agentes import DEFAULT_AGENT_NAME, create_agent
from LokiLogger import LokiLogger
from mcp_wrapper import close_mcp_tools
TELEGRAM_API_BASE = "https://api.telegram.org"
MAX_MESSAGE_LENGTH = 3800 # margen para markdown/formatting
def _parse_int(value: Optional[str], *, env_name: str) -> Optional[int]:
if value is None or value.strip() == "":
return None
try:
return int(value)
except ValueError as error: # pragma: no cover - salida controlada
raise ValueError(f"{env_name} debe ser un entero válido, se recibió: {value!r}") from error
class TelegramKanBot:
def __init__(
self,
*,
token: str,
allowed_user_id: int,
allowed_chat_id: int,
agent_key: str,
logger: LokiLogger,
) -> None:
self.token = token
self.allowed_user_id = allowed_user_id
self.allowed_chat_id = allowed_chat_id
self.agent_key = agent_key
self.logger = logger
self.api_timeout = 30
self.http_timeout = 35
self.idle_sleep = 1
self.offset: Optional[int] = None
self.base_url = f"{TELEGRAM_API_BASE}/bot{token}"
self.session = requests.Session()
self.agent = None
self.context: Dict[str, Any] = {}
self.mcp_tool_names: Set[str] = set()
async def setup_agent(self) -> None:
agent, context = await create_agent(self.logger, agent=self.agent_key)
self.agent = agent
self.context = context
server_tool_map = context.get("server_tool_map") or {}
self.mcp_tool_names = {
tool_name for tools in server_tool_map.values() for tool_name in (tools or [])
}
self.logger.info(
"🤖 Agente Kan para Telegram listo",
add_fields={
"agent_call": {"action": "telegram_agent_ready", "agent": self.agent_key},
"agent_response": {"status": "ready", "allowed_chat": self.allowed_chat_id},
},
)
async def close(self) -> None:
if self.context.get("mcp_tools"):
await close_mcp_tools(self.context["mcp_tools"], logger=self.logger)
self.session.close()
async def run(self) -> None:
if self.agent is None:
await self.setup_agent()
self.logger.info(
"🚀 Escuchando mensajes de Telegram",
add_fields={
"agent_call": {"action": "telegram_poll"},
"agent_response": {
"status": "listening",
"chat_id": self.allowed_chat_id,
"user_id": self.allowed_user_id,
},
},
)
try:
while True:
updates = await self._fetch_updates()
for update in updates:
self.offset = update["update_id"] + 1
await self._handle_update(update)
await asyncio.sleep(self.idle_sleep)
except asyncio.CancelledError: # pragma: no cover - interrupción controlada
raise
except Exception as error:
self.logger.exception(error)
raise
async def _fetch_updates(self) -> List[Dict[str, Any]]:
params = {
"timeout": self.api_timeout,
"allowed_updates": json.dumps(["message", "edited_message"]),
}
if self.offset is not None:
params["offset"] = self.offset
try:
response = await asyncio.to_thread(
self.session.get,
f"{self.base_url}/getUpdates",
params=params,
timeout=self.http_timeout,
)
response.raise_for_status()
data = response.json()
except Exception as error: # pragma: no cover - manejo defensivo
self.logger.exception(
error,
add_fields={
"agent_call": {"action": "telegram_get_updates"},
"agent_response": {"status": "error"},
},
)
await asyncio.sleep(5)
return []
if not data.get("ok"):
self.logger.error(
"La API de Telegram devolvió un estado no-ok",
add_fields={
"agent_call": {"action": "telegram_get_updates"},
"agent_response": {"status": data.get("ok"), "description": data.get("description")},
},
)
return []
result = data.get("result", [])
if result:
self.logger.debug(
"📩 Nuevos updates recibidos",
add_fields={
"agent_call": {"action": "telegram_get_updates"},
"agent_response": {"status": "updates", "count": len(result)},
},
)
return result
async def _handle_update(self, update: Dict[str, Any]) -> None:
message = update.get("message") or update.get("edited_message")
if not message:
return
chat = message.get("chat", {})
chat_id = chat.get("id")
user = message.get("from", {})
user_id = user.get("id")
if chat_id != self.allowed_chat_id or user_id != self.allowed_user_id:
self.logger.warn(
"Mensaje ignorado por no cumplir filtros",
add_fields={
"agent_call": {"action": "telegram_filter_message"},
"agent_response": {"status": "ignored", "chat_id": chat_id, "user_id": user_id},
},
)
return
text = message.get("text") or ""
if not text:
await self._send_message(chat_id, "Solo puedo procesar mensajes de texto por ahora.")
return
if text.strip().lower() in {"/start", "hola", "hi"}:
await self._send_message(chat_id, "¡Hola! Soy KAN, listo para ayudarte con Kanboard.")
return
await self._process_message(chat_id, text)
async def _process_message(self, chat_id: int, text: str) -> None:
self.logger.info(
"📨 Mensaje recibido desde Telegram",
add_fields={
"agent_call": {"action": "telegram_user_message", "content": text},
"agent_response": {"status": "processing"},
},
)
typing_task = asyncio.create_task(self._typing_indicator(chat_id))
notified_tool_calls: Set[str] = set()
response: Optional[RunOutput] = None
try:
run_result = self.agent.arun(
text,
stream=True,
stream_events=True,
yield_run_response=True,
)
if isinstance(run_result, RunOutput):
response = run_result
elif isawaitable(run_result):
awaited_result = await run_result
if isinstance(awaited_result, RunOutput):
response = awaited_result
else:
raise RuntimeError("El agente devolvió un resultado inesperado.")
else:
async for event in run_result:
if isinstance(event, RunOutput):
response = event
elif isinstance(event, ToolCallStartedEvent):
await self._maybe_notify_tool_execution(chat_id, event, notified_tool_calls)
if response is None:
raise RuntimeError("El agente no devolvió respuesta.")
content = (response.content or "(respuesta vacía)").strip()
if not content:
content = "(respuesta vacía)"
await self._send_message(chat_id, content)
if response.metrics:
self.logger.info(
"📊 Métricas de ejecución",
add_fields={
"agent_call": {"action": "agent_run_metrics"},
"agent_response": {
"status": "completed",
"metrics": {
"input_tokens": response.metrics.input_tokens,
"output_tokens": response.metrics.output_tokens,
"total_tokens": response.metrics.total_tokens,
"duration_seconds": response.metrics.duration,
},
},
},
)
except Exception as error: # pragma: no cover - manejo defensivo
self.logger.exception(
error,
add_fields={
"agent_call": {"action": "agent_run", "content": text},
"agent_response": {"status": "error"},
},
)
await self._send_message(chat_id, "Hubo un error procesando tu mensaje. Revisa los logs para más detalles.")
return
finally:
typing_task.cancel()
with suppress(asyncio.CancelledError):
await typing_task
async def _send_message(self, chat_id: int, text: str, *, parse_mode: Optional[str] = None) -> None:
for chunk in self._chunk_text(text):
payload = {"chat_id": chat_id, "text": chunk}
if parse_mode:
payload["parse_mode"] = parse_mode
try:
response = await asyncio.to_thread(
self.session.post,
f"{self.base_url}/sendMessage",
json=payload,
timeout=self.http_timeout,
)
response.raise_for_status()
data = response.json()
if not data.get("ok"):
self.logger.error(
"Telegram rechazó el mensaje",
add_fields={
"agent_call": {"action": "telegram_send_message"},
"agent_response": {"status": data.get("ok"), "description": data.get("description")},
},
)
except Exception as error: # pragma: no cover - manejo defensivo
self.logger.exception(
error,
add_fields={
"agent_call": {"action": "telegram_send_message", "payload": payload},
"agent_response": {"status": "error"},
},
)
async def _maybe_notify_tool_execution(
self,
chat_id: int,
event: ToolCallStartedEvent,
notified_tool_calls: Set[str],
) -> None:
if not self.mcp_tool_names:
return
tool = getattr(event, "tool", None)
if not tool or not tool.tool_name:
return
tool_identifier = tool.tool_call_id or f"{tool.tool_name}:{len(notified_tool_calls)}"
if tool_identifier in notified_tool_calls:
return
if tool.tool_name not in self.mcp_tool_names:
return
notified_tool_calls.add(tool_identifier)
escaped_name = html.escape(tool.tool_name)
await self._send_message(chat_id, f"<i>Ejecutado {escaped_name}</i>", parse_mode="HTML")
async def _typing_indicator(self, chat_id: int) -> None:
try:
while True:
await self._send_chat_action(chat_id, "typing")
await asyncio.sleep(4)
except asyncio.CancelledError: # pragma: no cover - interrupción esperada
raise
async def _send_chat_action(self, chat_id: int, action: str) -> None:
payload = {"chat_id": chat_id, "action": action}
try:
response = await asyncio.to_thread(
self.session.post,
f"{self.base_url}/sendChatAction",
json=payload,
timeout=self.http_timeout,
)
response.raise_for_status()
data = response.json()
if not data.get("ok"):
self.logger.error(
"Telegram rechazó la acción del chat",
add_fields={
"agent_call": {"action": "telegram_send_chat_action"},
"agent_response": {"status": data.get("ok"), "description": data.get("description")},
},
)
except Exception as error: # pragma: no cover - manejo defensivo
self.logger.exception(
error,
add_fields={
"agent_call": {"action": "telegram_send_chat_action", "payload": payload},
"agent_response": {"status": "error"},
},
)
def _chunk_text(self, text: str) -> List[str]:
if len(text) <= MAX_MESSAGE_LENGTH:
return [text]
chunks: List[str] = []
current = []
current_len = 0
for paragraph in text.split("\n\n"):
paragraph = paragraph.strip()
if not paragraph:
continue
paragraph_len = len(paragraph) + 2 # margen para doble salto
if current_len + paragraph_len > MAX_MESSAGE_LENGTH and current:
chunks.append("\n\n".join(current))
current = [paragraph]
current_len = len(paragraph)
elif paragraph_len > MAX_MESSAGE_LENGTH:
chunks.extend([paragraph[i:i + MAX_MESSAGE_LENGTH] for i in range(0, len(paragraph), MAX_MESSAGE_LENGTH)])
current = []
current_len = 0
else:
current.append(paragraph)
current_len += paragraph_len
if current:
chunks.append("\n\n".join(current))
return chunks
async def run_bot() -> None:
load_dotenv()
token = os.getenv("TELEGRAM_BOT_TOKEN")
allowed_user_id = _parse_int(os.getenv("TELEGRAM_ALLOWED_USER_ID"), env_name="TELEGRAM_ALLOWED_USER_ID")
allowed_chat_id = _parse_int(os.getenv("TELEGRAM_ALLOWED_CHAT_ID"), env_name="TELEGRAM_ALLOWED_CHAT_ID")
agent_key = (os.getenv("TELEGRAM_AGENT_KEY") or DEFAULT_AGENT_NAME).lower()
if not token:
raise RuntimeError("TELEGRAM_BOT_TOKEN debe definirse en el entorno")
if allowed_user_id is None or allowed_chat_id is None:
raise RuntimeError("TELEGRAM_ALLOWED_USER_ID y TELEGRAM_ALLOWED_CHAT_ID son obligatorios")
logger = LokiLogger(
service_name="agente_kanboard",
add_labels={"env": "local", "agent": "telegram"},
)
bot = TelegramKanBot(
token=token,
allowed_user_id=allowed_user_id,
allowed_chat_id=allowed_chat_id,
agent_key=agent_key,
logger=logger,
)
try:
await bot.run()
finally:
await bot.close()
def main() -> None:
asyncio.run(run_bot())
if __name__ == "__main__": # pragma: no cover - ejecución directa
main()