763e06c127
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
101 lines
3.7 KiB
Python
101 lines
3.7 KiB
Python
"""Busca mensajes en un buzon IMAP por criterio y devuelve sus UIDs.
|
|
|
|
Funcion IMPURA: hace I/O de red sobre una conexion `imaplib` viva (la produce
|
|
`imap_connect`). Opcionalmente cambia de buzon con `select(mailbox)` y luego
|
|
ejecuta `conn.uid("SEARCH", None, criteria)`.
|
|
|
|
Usa SIEMPRE UIDs (Unique IDentifiers), no numeros de secuencia: los UID son
|
|
estables dentro de un buzon mientras no cambie el UIDVALIDITY, mientras que los
|
|
numeros de secuencia se renumeran cuando se borran mensajes. Asi un UID
|
|
guardado sigue apuntando al mismo mensaje en una sesion posterior.
|
|
|
|
NUNCA lanza: devuelve un dict con `status` ("ok"/"error").
|
|
"""
|
|
|
|
|
|
def imap_search(conn, criteria: str = "UNSEEN", mailbox: str = "") -> dict:
|
|
"""Busca mensajes y devuelve la lista de UIDs que casan el criterio.
|
|
|
|
Si `mailbox` no esta vacio, hace `conn.select(mailbox)` antes de buscar.
|
|
Luego ejecuta `conn.uid("SEARCH", None, criteria)` y parsea la respuesta a
|
|
una lista de enteros (UIDs).
|
|
|
|
Args:
|
|
conn: objeto `imaplib.IMAP4[_SSL]` vivo y autenticado (de `imap_connect`).
|
|
criteria: expresion de busqueda IMAP cruda (RFC 3501 SEARCH). Ejemplos:
|
|
``"UNSEEN"`` (no leidos), ``"ALL"`` (todos),
|
|
``"FROM foo@bar.com"``, ``"SUBJECT factura"``,
|
|
``"SINCE 01-Jan-2026"``, ``"UNSEEN SINCE 01-Jun-2026"``,
|
|
``'HEADER Message-ID "<id@host>"'``.
|
|
mailbox: si no esta vacio, se selecciona ese buzon antes de buscar
|
|
(ej. ``"[Gmail]/Sent Mail"``). Vacio usa el buzon ya seleccionado.
|
|
|
|
Returns:
|
|
Dict de estado. En exito::
|
|
|
|
{"status": "ok", "uids": [123, 456, ...], "count": <int>}
|
|
|
|
En fallo (conn invalido, criterio mal formado, buzon inexistente)::
|
|
|
|
{"status": "error", "error": <str>}
|
|
"""
|
|
if conn is None:
|
|
return {"status": "error", "error": "imap_search: conn es None"}
|
|
if not criteria or not str(criteria).strip():
|
|
return {"status": "error", "error": "imap_search: criteria vacio"}
|
|
|
|
criteria = str(criteria).strip()
|
|
try:
|
|
if mailbox:
|
|
typ, data = conn.select(mailbox)
|
|
if typ != "OK":
|
|
reason = _first_str(data)
|
|
return {
|
|
"status": "error",
|
|
"error": f"imap_search: SELECT {mailbox!r} fallo: {reason}",
|
|
}
|
|
|
|
typ, data = conn.uid("SEARCH", None, criteria)
|
|
if typ != "OK":
|
|
reason = _first_str(data)
|
|
return {
|
|
"status": "error",
|
|
"error": f"imap_search: SEARCH {criteria!r} devolvio {typ}: {reason}",
|
|
}
|
|
|
|
uids = _parse_uids(data)
|
|
return {"status": "ok", "uids": uids, "count": len(uids)}
|
|
except Exception as exc: # noqa: BLE001 — contrato: nunca lanzar.
|
|
return {"status": "error", "error": f"imap_search: {exc}"}
|
|
|
|
|
|
def _first_str(data) -> str:
|
|
"""Devuelve el primer elemento de una respuesta imaplib como str legible."""
|
|
if not data:
|
|
return ""
|
|
item = data[0]
|
|
if isinstance(item, bytes):
|
|
return item.decode("utf-8", errors="replace")
|
|
return str(item)
|
|
|
|
|
|
def _parse_uids(data) -> list:
|
|
"""Parsea la respuesta de SEARCH (lista con un bytes de UIDs separados por espacio)."""
|
|
uids: list[int] = []
|
|
if not data:
|
|
return uids
|
|
for chunk in data:
|
|
if chunk is None:
|
|
continue
|
|
if isinstance(chunk, bytes):
|
|
text = chunk.decode("ascii", errors="replace")
|
|
else:
|
|
text = str(chunk)
|
|
for token in text.split():
|
|
try:
|
|
uids.append(int(token))
|
|
except ValueError:
|
|
# Token no numerico (raro): lo ignoramos.
|
|
continue
|
|
return uids
|