417 lines
15 KiB
Python
417 lines
15 KiB
Python
"""Bot de Telegram para conversar con el agente Kanboard."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import html
|
|
import os
|
|
import json
|
|
from contextlib import suppress
|
|
from inspect import isawaitable
|
|
from typing import Any, Dict, List, Optional, Set
|
|
|
|
import requests
|
|
from agno.agent.agent import RunOutput
|
|
from agno.run.agent import ToolCallStartedEvent
|
|
from dotenv import load_dotenv
|
|
|
|
from agentes import DEFAULT_AGENT_NAME, create_agent
|
|
from LokiLogger import LokiLogger
|
|
from mcp_wrapper import close_mcp_tools
|
|
|
|
TELEGRAM_API_BASE = "https://api.telegram.org"
|
|
MAX_MESSAGE_LENGTH = 3800 # margen para markdown/formatting
|
|
|
|
|
|
def _parse_int(value: Optional[str], *, env_name: str) -> Optional[int]:
|
|
if value is None or value.strip() == "":
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except ValueError as error: # pragma: no cover - salida controlada
|
|
raise ValueError(f"{env_name} debe ser un entero válido, se recibió: {value!r}") from error
|
|
|
|
|
|
class TelegramKanBot:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
token: str,
|
|
allowed_user_id: int,
|
|
allowed_chat_id: int,
|
|
agent_key: str,
|
|
logger: LokiLogger,
|
|
) -> None:
|
|
self.token = token
|
|
self.allowed_user_id = allowed_user_id
|
|
self.allowed_chat_id = allowed_chat_id
|
|
self.agent_key = agent_key
|
|
self.logger = logger
|
|
self.api_timeout = 30
|
|
self.http_timeout = 35
|
|
self.idle_sleep = 1
|
|
self.offset: Optional[int] = None
|
|
self.base_url = f"{TELEGRAM_API_BASE}/bot{token}"
|
|
self.session = requests.Session()
|
|
|
|
self.agent = None
|
|
self.context: Dict[str, Any] = {}
|
|
self.mcp_tool_names: Set[str] = set()
|
|
|
|
async def setup_agent(self) -> None:
|
|
agent, context = await create_agent(self.logger, agent=self.agent_key)
|
|
self.agent = agent
|
|
self.context = context
|
|
server_tool_map = context.get("server_tool_map") or {}
|
|
self.mcp_tool_names = {
|
|
tool_name for tools in server_tool_map.values() for tool_name in (tools or [])
|
|
}
|
|
self.logger.info(
|
|
"🤖 Agente Kan para Telegram listo",
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_agent_ready", "agent": self.agent_key},
|
|
"agent_response": {"status": "ready", "allowed_chat": self.allowed_chat_id},
|
|
},
|
|
)
|
|
|
|
async def close(self) -> None:
|
|
if self.context.get("mcp_tools"):
|
|
await close_mcp_tools(self.context["mcp_tools"], logger=self.logger)
|
|
self.session.close()
|
|
|
|
async def run(self) -> None:
|
|
if self.agent is None:
|
|
await self.setup_agent()
|
|
|
|
self.logger.info(
|
|
"🚀 Escuchando mensajes de Telegram",
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_poll"},
|
|
"agent_response": {
|
|
"status": "listening",
|
|
"chat_id": self.allowed_chat_id,
|
|
"user_id": self.allowed_user_id,
|
|
},
|
|
},
|
|
)
|
|
|
|
try:
|
|
while True:
|
|
updates = await self._fetch_updates()
|
|
for update in updates:
|
|
self.offset = update["update_id"] + 1
|
|
await self._handle_update(update)
|
|
await asyncio.sleep(self.idle_sleep)
|
|
except asyncio.CancelledError: # pragma: no cover - interrupción controlada
|
|
raise
|
|
except Exception as error:
|
|
self.logger.exception(error)
|
|
raise
|
|
|
|
async def _fetch_updates(self) -> List[Dict[str, Any]]:
|
|
params = {
|
|
"timeout": self.api_timeout,
|
|
"allowed_updates": json.dumps(["message", "edited_message"]),
|
|
}
|
|
if self.offset is not None:
|
|
params["offset"] = self.offset
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
self.session.get,
|
|
f"{self.base_url}/getUpdates",
|
|
params=params,
|
|
timeout=self.http_timeout,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except Exception as error: # pragma: no cover - manejo defensivo
|
|
self.logger.exception(
|
|
error,
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_get_updates"},
|
|
"agent_response": {"status": "error"},
|
|
},
|
|
)
|
|
await asyncio.sleep(5)
|
|
return []
|
|
|
|
if not data.get("ok"):
|
|
self.logger.error(
|
|
"La API de Telegram devolvió un estado no-ok",
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_get_updates"},
|
|
"agent_response": {"status": data.get("ok"), "description": data.get("description")},
|
|
},
|
|
)
|
|
return []
|
|
|
|
result = data.get("result", [])
|
|
if result:
|
|
self.logger.debug(
|
|
"📩 Nuevos updates recibidos",
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_get_updates"},
|
|
"agent_response": {"status": "updates", "count": len(result)},
|
|
},
|
|
)
|
|
return result
|
|
|
|
async def _handle_update(self, update: Dict[str, Any]) -> None:
|
|
message = update.get("message") or update.get("edited_message")
|
|
if not message:
|
|
return
|
|
|
|
chat = message.get("chat", {})
|
|
chat_id = chat.get("id")
|
|
user = message.get("from", {})
|
|
user_id = user.get("id")
|
|
|
|
if chat_id != self.allowed_chat_id or user_id != self.allowed_user_id:
|
|
self.logger.warn(
|
|
"Mensaje ignorado por no cumplir filtros",
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_filter_message"},
|
|
"agent_response": {"status": "ignored", "chat_id": chat_id, "user_id": user_id},
|
|
},
|
|
)
|
|
return
|
|
|
|
text = message.get("text") or ""
|
|
if not text:
|
|
await self._send_message(chat_id, "Solo puedo procesar mensajes de texto por ahora.")
|
|
return
|
|
|
|
if text.strip().lower() in {"/start", "hola", "hi"}:
|
|
await self._send_message(chat_id, "¡Hola! Soy KAN, listo para ayudarte con Kanboard.")
|
|
return
|
|
|
|
await self._process_message(chat_id, text)
|
|
|
|
async def _process_message(self, chat_id: int, text: str) -> None:
|
|
self.logger.info(
|
|
"📨 Mensaje recibido desde Telegram",
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_user_message", "content": text},
|
|
"agent_response": {"status": "processing"},
|
|
},
|
|
)
|
|
typing_task = asyncio.create_task(self._typing_indicator(chat_id))
|
|
notified_tool_calls: Set[str] = set()
|
|
response: Optional[RunOutput] = None
|
|
try:
|
|
run_result = self.agent.arun(
|
|
text,
|
|
stream=True,
|
|
stream_events=True,
|
|
yield_run_response=True,
|
|
)
|
|
|
|
if isinstance(run_result, RunOutput):
|
|
response = run_result
|
|
elif isawaitable(run_result):
|
|
awaited_result = await run_result
|
|
if isinstance(awaited_result, RunOutput):
|
|
response = awaited_result
|
|
else:
|
|
raise RuntimeError("El agente devolvió un resultado inesperado.")
|
|
else:
|
|
async for event in run_result:
|
|
if isinstance(event, RunOutput):
|
|
response = event
|
|
elif isinstance(event, ToolCallStartedEvent):
|
|
await self._maybe_notify_tool_execution(chat_id, event, notified_tool_calls)
|
|
if response is None:
|
|
raise RuntimeError("El agente no devolvió respuesta.")
|
|
|
|
content = (response.content or "(respuesta vacía)").strip()
|
|
if not content:
|
|
content = "(respuesta vacía)"
|
|
|
|
await self._send_message(chat_id, content)
|
|
|
|
if response.metrics:
|
|
self.logger.info(
|
|
"📊 Métricas de ejecución",
|
|
add_fields={
|
|
"agent_call": {"action": "agent_run_metrics"},
|
|
"agent_response": {
|
|
"status": "completed",
|
|
"metrics": {
|
|
"input_tokens": response.metrics.input_tokens,
|
|
"output_tokens": response.metrics.output_tokens,
|
|
"total_tokens": response.metrics.total_tokens,
|
|
"duration_seconds": response.metrics.duration,
|
|
},
|
|
},
|
|
},
|
|
)
|
|
except Exception as error: # pragma: no cover - manejo defensivo
|
|
self.logger.exception(
|
|
error,
|
|
add_fields={
|
|
"agent_call": {"action": "agent_run", "content": text},
|
|
"agent_response": {"status": "error"},
|
|
},
|
|
)
|
|
await self._send_message(chat_id, "Hubo un error procesando tu mensaje. Revisa los logs para más detalles.")
|
|
return
|
|
finally:
|
|
typing_task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await typing_task
|
|
|
|
async def _send_message(self, chat_id: int, text: str, *, parse_mode: Optional[str] = None) -> None:
|
|
for chunk in self._chunk_text(text):
|
|
payload = {"chat_id": chat_id, "text": chunk}
|
|
if parse_mode:
|
|
payload["parse_mode"] = parse_mode
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
self.session.post,
|
|
f"{self.base_url}/sendMessage",
|
|
json=payload,
|
|
timeout=self.http_timeout,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if not data.get("ok"):
|
|
self.logger.error(
|
|
"Telegram rechazó el mensaje",
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_send_message"},
|
|
"agent_response": {"status": data.get("ok"), "description": data.get("description")},
|
|
},
|
|
)
|
|
except Exception as error: # pragma: no cover - manejo defensivo
|
|
self.logger.exception(
|
|
error,
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_send_message", "payload": payload},
|
|
"agent_response": {"status": "error"},
|
|
},
|
|
)
|
|
|
|
async def _maybe_notify_tool_execution(
|
|
self,
|
|
chat_id: int,
|
|
event: ToolCallStartedEvent,
|
|
notified_tool_calls: Set[str],
|
|
) -> None:
|
|
if not self.mcp_tool_names:
|
|
return
|
|
tool = getattr(event, "tool", None)
|
|
if not tool or not tool.tool_name:
|
|
return
|
|
tool_identifier = tool.tool_call_id or f"{tool.tool_name}:{len(notified_tool_calls)}"
|
|
if tool_identifier in notified_tool_calls:
|
|
return
|
|
if tool.tool_name not in self.mcp_tool_names:
|
|
return
|
|
|
|
notified_tool_calls.add(tool_identifier)
|
|
|
|
escaped_name = html.escape(tool.tool_name)
|
|
await self._send_message(chat_id, f"<i>Ejecutado {escaped_name}</i>", parse_mode="HTML")
|
|
|
|
async def _typing_indicator(self, chat_id: int) -> None:
|
|
try:
|
|
while True:
|
|
await self._send_chat_action(chat_id, "typing")
|
|
await asyncio.sleep(4)
|
|
except asyncio.CancelledError: # pragma: no cover - interrupción esperada
|
|
raise
|
|
|
|
async def _send_chat_action(self, chat_id: int, action: str) -> None:
|
|
payload = {"chat_id": chat_id, "action": action}
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
self.session.post,
|
|
f"{self.base_url}/sendChatAction",
|
|
json=payload,
|
|
timeout=self.http_timeout,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if not data.get("ok"):
|
|
self.logger.error(
|
|
"Telegram rechazó la acción del chat",
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_send_chat_action"},
|
|
"agent_response": {"status": data.get("ok"), "description": data.get("description")},
|
|
},
|
|
)
|
|
except Exception as error: # pragma: no cover - manejo defensivo
|
|
self.logger.exception(
|
|
error,
|
|
add_fields={
|
|
"agent_call": {"action": "telegram_send_chat_action", "payload": payload},
|
|
"agent_response": {"status": "error"},
|
|
},
|
|
)
|
|
|
|
def _chunk_text(self, text: str) -> List[str]:
|
|
if len(text) <= MAX_MESSAGE_LENGTH:
|
|
return [text]
|
|
|
|
chunks: List[str] = []
|
|
current = []
|
|
current_len = 0
|
|
for paragraph in text.split("\n\n"):
|
|
paragraph = paragraph.strip()
|
|
if not paragraph:
|
|
continue
|
|
paragraph_len = len(paragraph) + 2 # margen para doble salto
|
|
if current_len + paragraph_len > MAX_MESSAGE_LENGTH and current:
|
|
chunks.append("\n\n".join(current))
|
|
current = [paragraph]
|
|
current_len = len(paragraph)
|
|
elif paragraph_len > MAX_MESSAGE_LENGTH:
|
|
chunks.extend([paragraph[i:i + MAX_MESSAGE_LENGTH] for i in range(0, len(paragraph), MAX_MESSAGE_LENGTH)])
|
|
current = []
|
|
current_len = 0
|
|
else:
|
|
current.append(paragraph)
|
|
current_len += paragraph_len
|
|
if current:
|
|
chunks.append("\n\n".join(current))
|
|
return chunks
|
|
|
|
|
|
async def run_bot() -> None:
|
|
load_dotenv()
|
|
|
|
token = os.getenv("TELEGRAM_BOT_TOKEN")
|
|
allowed_user_id = _parse_int(os.getenv("TELEGRAM_ALLOWED_USER_ID"), env_name="TELEGRAM_ALLOWED_USER_ID")
|
|
allowed_chat_id = _parse_int(os.getenv("TELEGRAM_ALLOWED_CHAT_ID"), env_name="TELEGRAM_ALLOWED_CHAT_ID")
|
|
agent_key = (os.getenv("TELEGRAM_AGENT_KEY") or DEFAULT_AGENT_NAME).lower()
|
|
|
|
if not token:
|
|
raise RuntimeError("TELEGRAM_BOT_TOKEN debe definirse en el entorno")
|
|
if allowed_user_id is None or allowed_chat_id is None:
|
|
raise RuntimeError("TELEGRAM_ALLOWED_USER_ID y TELEGRAM_ALLOWED_CHAT_ID son obligatorios")
|
|
|
|
logger = LokiLogger(
|
|
service_name="agente_kanboard",
|
|
add_labels={"env": "local", "agent": "telegram"},
|
|
)
|
|
|
|
bot = TelegramKanBot(
|
|
token=token,
|
|
allowed_user_id=allowed_user_id,
|
|
allowed_chat_id=allowed_chat_id,
|
|
agent_key=agent_key,
|
|
logger=logger,
|
|
)
|
|
|
|
try:
|
|
await bot.run()
|
|
finally:
|
|
await bot.close()
|
|
|
|
|
|
def main() -> None:
|
|
asyncio.run(run_bot())
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover - ejecución directa
|
|
main()
|