763e06c127
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
221 lines
7.8 KiB
Python
221 lines
7.8 KiB
Python
"""Descarga y parsea un mensaje IMAP por UID a un dict estructurado.
|
|
|
|
Funcion IMPURA: hace I/O de red sobre una conexion `imaplib` viva (la produce
|
|
`imap_connect`). Ejecuta `conn.uid("FETCH", uid, "(BODY.PEEK[])")` (que NO marca
|
|
el mensaje como leido) o `"(RFC822)"` (que SI lo marca) segun `mark_seen`,
|
|
parsea los bytes con `email.message_from_bytes` y extrae las cabeceras y el
|
|
cuerpo a un dict.
|
|
|
|
Las cabeceras codificadas (RFC 2047, ej. `=?UTF-8?B?...?=`) se decodifican a
|
|
Unicode con `email.header.decode_header`. Los cuerpos de texto se decodifican
|
|
respetando el charset declarado en cada parte (con fallback a utf-8/latin-1).
|
|
Los adjuntos se listan con metadatos (nombre, tipo, tamano) SIN incluir el
|
|
binario completo en el resultado.
|
|
|
|
NUNCA lanza: devuelve un dict con `status` ("ok"/"error").
|
|
"""
|
|
|
|
import email
|
|
from email.header import decode_header
|
|
from email.utils import parseaddr, getaddresses
|
|
|
|
|
|
def imap_fetch_message(conn, uid: int, mark_seen: bool = False) -> dict:
|
|
"""Descarga el mensaje de UID `uid` y lo devuelve parseado.
|
|
|
|
Args:
|
|
conn: objeto `imaplib.IMAP4[_SSL]` vivo y autenticado (de `imap_connect`).
|
|
uid: UID del mensaje (de `imap_search`). Numero de secuencia NO valido.
|
|
mark_seen: False (default) usa `BODY.PEEK[]` y NO marca leido; True usa
|
|
`RFC822` y marca el mensaje como `\\Seen`.
|
|
|
|
Returns:
|
|
Dict de estado. En exito::
|
|
|
|
{
|
|
"status": "ok",
|
|
"message": {
|
|
"uid": <int>,
|
|
"from": <str>, "to": <str>, "cc": <str>,
|
|
"subject": <str>, "date": <str>, "message_id": <str>,
|
|
"body_text": <str>, # text/plain concatenado
|
|
"body_html": <str>, # text/html concatenado
|
|
"attachments": [
|
|
{"filename": <str>, "content_type": <str>, "size_bytes": <int>},
|
|
...
|
|
],
|
|
},
|
|
}
|
|
|
|
En fallo (conn invalido, UID inexistente, FETCH no OK)::
|
|
|
|
{"status": "error", "error": <str>}
|
|
"""
|
|
if conn is None:
|
|
return {"status": "error", "error": "imap_fetch_message: conn es None"}
|
|
try:
|
|
uid_int = int(uid)
|
|
except (ValueError, TypeError):
|
|
return {"status": "error", "error": f"imap_fetch_message: uid invalido: {uid!r}"}
|
|
|
|
fetch_spec = "(RFC822)" if mark_seen else "(BODY.PEEK[])"
|
|
try:
|
|
typ, data = conn.uid("FETCH", str(uid_int), fetch_spec)
|
|
if typ != "OK":
|
|
return {
|
|
"status": "error",
|
|
"error": f"imap_fetch_message: FETCH uid {uid_int} devolvio {typ}",
|
|
}
|
|
|
|
raw = _extract_rfc822(data)
|
|
if raw is None:
|
|
return {
|
|
"status": "error",
|
|
"error": f"imap_fetch_message: UID {uid_int} sin contenido (inexistente?)",
|
|
}
|
|
|
|
msg = email.message_from_bytes(raw)
|
|
parsed = _parse_message(msg, uid_int)
|
|
return {"status": "ok", "message": parsed}
|
|
except Exception as exc: # noqa: BLE001 — contrato: nunca lanzar.
|
|
return {"status": "error", "error": f"imap_fetch_message: {exc}"}
|
|
|
|
|
|
def _extract_rfc822(data):
|
|
"""Extrae los bytes RFC822 de la respuesta de FETCH.
|
|
|
|
imaplib devuelve algo como ``[(b'1 (BODY[] {N}', b'<bytes>'), b')']``.
|
|
Buscamos la primera tupla cuyo segundo elemento sean los bytes del mensaje.
|
|
"""
|
|
if not data:
|
|
return None
|
|
for item in data:
|
|
if isinstance(item, tuple) and len(item) >= 2:
|
|
payload = item[1]
|
|
if isinstance(payload, (bytes, bytearray)):
|
|
return bytes(payload)
|
|
return None
|
|
|
|
|
|
def _parse_message(msg, uid_int: int) -> dict:
|
|
"""Convierte un email.message.Message en el dict del contrato."""
|
|
body_text_parts: list[str] = []
|
|
body_html_parts: list[str] = []
|
|
attachments: list[dict] = []
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.is_multipart():
|
|
continue
|
|
_consume_part(part, body_text_parts, body_html_parts, attachments)
|
|
else:
|
|
_consume_part(msg, body_text_parts, body_html_parts, attachments)
|
|
|
|
return {
|
|
"uid": uid_int,
|
|
"from": _decode_header(msg.get("From", "")),
|
|
"to": _decode_addr_list(msg.get_all("To", [])),
|
|
"cc": _decode_addr_list(msg.get_all("Cc", [])),
|
|
"subject": _decode_header(msg.get("Subject", "")),
|
|
"date": _decode_header(msg.get("Date", "")),
|
|
"message_id": (msg.get("Message-ID", "") or "").strip(),
|
|
"body_text": "\n".join(p for p in body_text_parts if p),
|
|
"body_html": "\n".join(p for p in body_html_parts if p),
|
|
"attachments": attachments,
|
|
}
|
|
|
|
|
|
def _consume_part(part, body_text_parts, body_html_parts, attachments) -> None:
|
|
"""Clasifica una parte: adjunto, text/plain o text/html."""
|
|
content_type = part.get_content_type()
|
|
disposition = (part.get("Content-Disposition") or "").lower()
|
|
filename = part.get_filename()
|
|
if filename:
|
|
filename = _decode_header(filename)
|
|
|
|
is_attachment = "attachment" in disposition or (
|
|
filename and content_type not in ("text/plain", "text/html")
|
|
)
|
|
|
|
if is_attachment:
|
|
payload = part.get_payload(decode=True) or b""
|
|
attachments.append(
|
|
{
|
|
"filename": filename or "",
|
|
"content_type": content_type,
|
|
"size_bytes": len(payload),
|
|
}
|
|
)
|
|
return
|
|
|
|
if content_type == "text/plain":
|
|
body_text_parts.append(_decode_body(part))
|
|
elif content_type == "text/html":
|
|
body_html_parts.append(_decode_body(part))
|
|
# Otros tipos inline sin filename (ej. multipart/alternative wrappers) se ignoran.
|
|
|
|
|
|
def _decode_body(part) -> str:
|
|
"""Decodifica el payload de una parte de texto respetando su charset."""
|
|
payload = part.get_payload(decode=True)
|
|
if payload is None:
|
|
return ""
|
|
charset = part.get_content_charset()
|
|
candidates = []
|
|
if charset:
|
|
candidates.append(charset)
|
|
candidates += ["utf-8", "latin-1"]
|
|
for enc in candidates:
|
|
try:
|
|
return payload.decode(enc)
|
|
except (LookupError, UnicodeDecodeError):
|
|
continue
|
|
# Ultimo recurso: nunca falla.
|
|
return payload.decode("utf-8", errors="replace")
|
|
|
|
|
|
def _decode_header(value: str) -> str:
|
|
"""Decodifica una cabecera RFC 2047 (=?charset?enc?...?=) a Unicode."""
|
|
if value is None:
|
|
return ""
|
|
if isinstance(value, bytes):
|
|
value = value.decode("latin-1", errors="replace")
|
|
parts = []
|
|
try:
|
|
for chunk, enc in decode_header(value):
|
|
if isinstance(chunk, bytes):
|
|
if enc:
|
|
try:
|
|
parts.append(chunk.decode(enc, errors="replace"))
|
|
except (LookupError, UnicodeDecodeError):
|
|
parts.append(chunk.decode("utf-8", errors="replace"))
|
|
else:
|
|
# Sin charset declarado: ASCII con fallback latin-1.
|
|
parts.append(chunk.decode("utf-8", errors="replace"))
|
|
else:
|
|
parts.append(chunk)
|
|
except Exception: # noqa: BLE001 — cabecera mal formada: best-effort.
|
|
return str(value)
|
|
return "".join(parts).strip()
|
|
|
|
|
|
def _decode_addr_list(values) -> str:
|
|
"""Decodifica una lista de cabeceras de direcciones a una cadena unica.
|
|
|
|
Une multiples cabeceras (To/Cc pueden repetirse) y decodifica el nombre
|
|
de cada direccion (RFC 2047) preservando la parte addr-spec.
|
|
"""
|
|
if not values:
|
|
return ""
|
|
addrs = getaddresses(values)
|
|
out = []
|
|
for name, addr in addrs:
|
|
name = _decode_header(name) if name else ""
|
|
if name and addr:
|
|
out.append(f"{name} <{addr}>")
|
|
elif addr:
|
|
out.append(addr)
|
|
elif name:
|
|
out.append(name)
|
|
return ", ".join(out)
|