"""Bot de Telegram para conversar con el agente Kanboard.""" from __future__ import annotations import asyncio import html import os import json from contextlib import suppress from typing import Any, Dict, List, Optional, Set import requests from agno.agent.agent import RunOutput from agno.run.agent import ToolCallStartedEvent from dotenv import load_dotenv from agentes import DEFAULT_AGENT_NAME, create_agent from LokiLogger import LokiLogger from mcp_wrapper import close_mcp_tools TELEGRAM_API_BASE = "https://api.telegram.org" MAX_MESSAGE_LENGTH = 3800 # margen para markdown/formatting def _parse_int(value: Optional[str], *, env_name: str) -> Optional[int]: if value is None or value.strip() == "": return None try: return int(value) except ValueError as error: # pragma: no cover - salida controlada raise ValueError(f"{env_name} debe ser un entero v谩lido, se recibi贸: {value!r}") from error class TelegramKanBot: def __init__( self, *, token: str, allowed_user_id: int, allowed_chat_id: int, agent_key: str, logger: LokiLogger, ) -> None: self.token = token self.allowed_user_id = allowed_user_id self.allowed_chat_id = allowed_chat_id self.agent_key = agent_key self.logger = logger self.api_timeout = 30 self.http_timeout = 35 self.idle_sleep = 1 self.offset: Optional[int] = None self.base_url = f"{TELEGRAM_API_BASE}/bot{token}" self.session = requests.Session() self.agent = None self.context: Dict[str, Any] = {} self.mcp_tool_names: Set[str] = set() async def setup_agent(self) -> None: agent, context = await create_agent(self.logger, agent=self.agent_key) self.agent = agent self.context = context server_tool_map = context.get("server_tool_map") or {} self.mcp_tool_names = { tool_name for tools in server_tool_map.values() for tool_name in (tools or []) } self.logger.info( "馃 Agente Kan para Telegram listo", add_fields={ "agent_call": {"action": "telegram_agent_ready", "agent": self.agent_key}, "agent_response": {"status": "ready", "allowed_chat": self.allowed_chat_id}, }, ) async def close(self) -> None: if self.context.get("mcp_tools"): await close_mcp_tools(self.context["mcp_tools"], logger=self.logger) self.session.close() async def run(self) -> None: if self.agent is None: await self.setup_agent() self.logger.info( "馃殌 Escuchando mensajes de Telegram", add_fields={ "agent_call": {"action": "telegram_poll"}, "agent_response": { "status": "listening", "chat_id": self.allowed_chat_id, "user_id": self.allowed_user_id, }, }, ) try: while True: updates = await self._fetch_updates() for update in updates: self.offset = update["update_id"] + 1 await self._handle_update(update) await asyncio.sleep(self.idle_sleep) except asyncio.CancelledError: # pragma: no cover - interrupci贸n controlada raise except Exception as error: self.logger.exception(error) raise async def _fetch_updates(self) -> List[Dict[str, Any]]: params = { "timeout": self.api_timeout, "allowed_updates": json.dumps(["message", "edited_message"]), } if self.offset is not None: params["offset"] = self.offset try: response = await asyncio.to_thread( self.session.get, f"{self.base_url}/getUpdates", params=params, timeout=self.http_timeout, ) response.raise_for_status() data = response.json() except Exception as error: # pragma: no cover - manejo defensivo self.logger.exception( error, add_fields={ "agent_call": {"action": "telegram_get_updates"}, "agent_response": {"status": "error"}, }, ) await asyncio.sleep(5) return [] if not data.get("ok"): self.logger.error( "La API de Telegram devolvi贸 un estado no-ok", add_fields={ "agent_call": {"action": "telegram_get_updates"}, "agent_response": {"status": data.get("ok"), "description": data.get("description")}, }, ) return [] result = data.get("result", []) if result: self.logger.debug( "馃摡 Nuevos updates recibidos", add_fields={ "agent_call": {"action": "telegram_get_updates"}, "agent_response": {"status": "updates", "count": len(result)}, }, ) return result async def _handle_update(self, update: Dict[str, Any]) -> None: message = update.get("message") or update.get("edited_message") if not message: return chat = message.get("chat", {}) chat_id = chat.get("id") user = message.get("from", {}) user_id = user.get("id") if chat_id != self.allowed_chat_id or user_id != self.allowed_user_id: self.logger.warn( "Mensaje ignorado por no cumplir filtros", add_fields={ "agent_call": {"action": "telegram_filter_message"}, "agent_response": {"status": "ignored", "chat_id": chat_id, "user_id": user_id}, }, ) return text = message.get("text") or "" if not text: await self._send_message(chat_id, "Solo puedo procesar mensajes de texto por ahora.") return if text.strip().lower() in {"/start", "hola", "hi"}: await self._send_message(chat_id, "隆Hola! Soy KAN, listo para ayudarte con Kanboard.") return await self._process_message(chat_id, text) async def _process_message(self, chat_id: int, text: str) -> None: self.logger.info( "馃摠 Mensaje recibido desde Telegram", add_fields={ "agent_call": {"action": "telegram_user_message", "content": text}, "agent_response": {"status": "processing"}, }, ) typing_task = asyncio.create_task(self._typing_indicator(chat_id)) notified_tool_calls: Set[str] = set() response: Optional[RunOutput] = None try: run_result = await self.agent.arun( text, stream=True, stream_events=True, yield_run_response=True, ) if isinstance(run_result, RunOutput): response = run_result else: async for event in run_result: if isinstance(event, RunOutput): response = event elif isinstance(event, ToolCallStartedEvent): await self._maybe_notify_tool_execution(chat_id, event, notified_tool_calls) if response is None: raise RuntimeError("El agente no devolvi贸 respuesta.") content = (response.content or "(respuesta vac铆a)").strip() if not content: content = "(respuesta vac铆a)" await self._send_message(chat_id, content) if response.metrics: self.logger.info( "馃搳 M茅tricas de ejecuci贸n", add_fields={ "agent_call": {"action": "agent_run_metrics"}, "agent_response": { "status": "completed", "metrics": { "input_tokens": response.metrics.input_tokens, "output_tokens": response.metrics.output_tokens, "total_tokens": response.metrics.total_tokens, "duration_seconds": response.metrics.duration, }, }, }, ) except Exception as error: # pragma: no cover - manejo defensivo self.logger.exception( error, add_fields={ "agent_call": {"action": "agent_run", "content": text}, "agent_response": {"status": "error"}, }, ) await self._send_message(chat_id, "Hubo un error procesando tu mensaje. Revisa los logs para m谩s detalles.") return finally: typing_task.cancel() with suppress(asyncio.CancelledError): await typing_task async def _send_message(self, chat_id: int, text: str, *, parse_mode: Optional[str] = None) -> None: for chunk in self._chunk_text(text): payload = {"chat_id": chat_id, "text": chunk} if parse_mode: payload["parse_mode"] = parse_mode try: response = await asyncio.to_thread( self.session.post, f"{self.base_url}/sendMessage", json=payload, timeout=self.http_timeout, ) response.raise_for_status() data = response.json() if not data.get("ok"): self.logger.error( "Telegram rechaz贸 el mensaje", add_fields={ "agent_call": {"action": "telegram_send_message"}, "agent_response": {"status": data.get("ok"), "description": data.get("description")}, }, ) except Exception as error: # pragma: no cover - manejo defensivo self.logger.exception( error, add_fields={ "agent_call": {"action": "telegram_send_message", "payload": payload}, "agent_response": {"status": "error"}, }, ) async def _maybe_notify_tool_execution( self, chat_id: int, event: ToolCallStartedEvent, notified_tool_calls: Set[str], ) -> None: if not self.mcp_tool_names: return tool = getattr(event, "tool", None) if not tool or not tool.tool_name: return tool_identifier = tool.tool_call_id or f"{tool.tool_name}:{len(notified_tool_calls)}" if tool_identifier in notified_tool_calls: return if tool.tool_name not in self.mcp_tool_names: return notified_tool_calls.add(tool_identifier) escaped_name = html.escape(tool.tool_name) await self._send_message(chat_id, f"Ejecutado {escaped_name}", parse_mode="HTML") async def _typing_indicator(self, chat_id: int) -> None: try: while True: await self._send_chat_action(chat_id, "typing") await asyncio.sleep(4) except asyncio.CancelledError: # pragma: no cover - interrupci贸n esperada raise async def _send_chat_action(self, chat_id: int, action: str) -> None: payload = {"chat_id": chat_id, "action": action} try: response = await asyncio.to_thread( self.session.post, f"{self.base_url}/sendChatAction", json=payload, timeout=self.http_timeout, ) response.raise_for_status() data = response.json() if not data.get("ok"): self.logger.error( "Telegram rechaz贸 la acci贸n del chat", add_fields={ "agent_call": {"action": "telegram_send_chat_action"}, "agent_response": {"status": data.get("ok"), "description": data.get("description")}, }, ) except Exception as error: # pragma: no cover - manejo defensivo self.logger.exception( error, add_fields={ "agent_call": {"action": "telegram_send_chat_action", "payload": payload}, "agent_response": {"status": "error"}, }, ) def _chunk_text(self, text: str) -> List[str]: if len(text) <= MAX_MESSAGE_LENGTH: return [text] chunks: List[str] = [] current = [] current_len = 0 for paragraph in text.split("\n\n"): paragraph = paragraph.strip() if not paragraph: continue paragraph_len = len(paragraph) + 2 # margen para doble salto if current_len + paragraph_len > MAX_MESSAGE_LENGTH and current: chunks.append("\n\n".join(current)) current = [paragraph] current_len = len(paragraph) elif paragraph_len > MAX_MESSAGE_LENGTH: chunks.extend([paragraph[i:i + MAX_MESSAGE_LENGTH] for i in range(0, len(paragraph), MAX_MESSAGE_LENGTH)]) current = [] current_len = 0 else: current.append(paragraph) current_len += paragraph_len if current: chunks.append("\n\n".join(current)) return chunks async def run_bot() -> None: load_dotenv() token = os.getenv("TELEGRAM_BOT_TOKEN") allowed_user_id = _parse_int(os.getenv("TELEGRAM_ALLOWED_USER_ID"), env_name="TELEGRAM_ALLOWED_USER_ID") allowed_chat_id = _parse_int(os.getenv("TELEGRAM_ALLOWED_CHAT_ID"), env_name="TELEGRAM_ALLOWED_CHAT_ID") agent_key = (os.getenv("TELEGRAM_AGENT_KEY") or DEFAULT_AGENT_NAME).lower() if not token: raise RuntimeError("TELEGRAM_BOT_TOKEN debe definirse en el entorno") if allowed_user_id is None or allowed_chat_id is None: raise RuntimeError("TELEGRAM_ALLOWED_USER_ID y TELEGRAM_ALLOWED_CHAT_ID son obligatorios") logger = LokiLogger( service_name="agente_kanboard", add_labels={"env": "local", "agent": "telegram"}, ) bot = TelegramKanBot( token=token, allowed_user_id=allowed_user_id, allowed_chat_id=allowed_chat_id, agent_key=agent_key, logger=logger, ) try: await bot.run() finally: await bot.close() def main() -> None: asyncio.run(run_bot()) if __name__ == "__main__": # pragma: no cover - ejecuci贸n directa main()