134 lines
4.5 KiB
Python
134 lines
4.5 KiB
Python
"""Parseo ligero de vCard y de iCalendar para el ingest DAV.
|
|
|
|
Lógica propia de la app (glue específico del dominio): el registry baja las
|
|
colecciones en bruto con dav_get_collection y aquí se extraen los campos
|
|
mínimos que las tablas maestras contacts y events necesitan. Mismo enfoque de
|
|
regex con unfold que projects/osint/tools/sync_dav_to_osint.py.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
|
|
|
|
def _unfold(text: str) -> str:
|
|
"""Deshace el folding de líneas (continuación con espacio o tab)."""
|
|
return re.sub(r"\r?\n[ \t]", "", text)
|
|
|
|
|
|
def _values(text: str, prop: str) -> list:
|
|
"""Devuelve todos los valores de una propiedad (TEL, UID, DTSTART, ...).
|
|
|
|
Acepta PROP;PARAMS:valor y PROP:valor (con prefijo itemN. opcional) y
|
|
decodifica los escapes simples (\\n, \\,, \\;, \\\\).
|
|
"""
|
|
vals = []
|
|
for line in text.splitlines():
|
|
m = re.match(rf"^(?:item\d+\.)?{prop}(?:;[^:]*)?:(.*)$", line, re.IGNORECASE)
|
|
if m:
|
|
v = m.group(1).strip()
|
|
v = (
|
|
v.replace("\\n", "\n")
|
|
.replace("\\,", ",")
|
|
.replace("\\;", ";")
|
|
.replace("\\\\", "\\")
|
|
).strip()
|
|
if v:
|
|
vals.append(v)
|
|
return vals
|
|
|
|
|
|
def _prop_line(text: str, prop: str) -> str:
|
|
"""Devuelve la primera línea completa de una propiedad (con sus params)."""
|
|
for line in text.splitlines():
|
|
if re.match(rf"^(?:item\d+\.)?{prop}(?:;|:)", line, re.IGNORECASE):
|
|
return line
|
|
return ""
|
|
|
|
|
|
def parse_vcard(vcard_text: str) -> dict:
|
|
"""Extrae los campos mínimos de un vCard para la tabla contacts.
|
|
|
|
Devuelve {uid, fn, tels, emails}. tels y emails son listas deduplicadas
|
|
preservando el orden.
|
|
"""
|
|
txt = _unfold(vcard_text)
|
|
return {
|
|
"uid": (_values(txt, "UID") or [""])[0],
|
|
"fn": (_values(txt, "FN") or [""])[0],
|
|
"tels": _dedup(_values(txt, "TEL")),
|
|
"emails": _dedup(_values(txt, "EMAIL")),
|
|
}
|
|
|
|
|
|
def parse_ical_events(ical_text: str) -> list:
|
|
"""Extrae los VEVENT de un VCALENDAR para la tabla events.
|
|
|
|
Devuelve una lista de dicts {uid, dtstart, dtend, all_day, summary,
|
|
location, rrule, raw}. dtstart/dtend se normalizan a ISO básico
|
|
(YYYY-MM-DD o YYYY-MM-DDTHH:MM:SS, conservando la Z de UTC si la trae).
|
|
all_day es True cuando DTSTART es VALUE=DATE (sin componente de hora).
|
|
"""
|
|
events = []
|
|
txt = _unfold(ical_text)
|
|
for block in re.findall(
|
|
r"BEGIN:VEVENT(.*?)END:VEVENT", txt, re.DOTALL | re.IGNORECASE
|
|
):
|
|
dtstart_line = _prop_line(block, "DTSTART")
|
|
dtstart_raw = (_values(block, "DTSTART") or [""])[0]
|
|
all_day = bool(
|
|
re.search(r";VALUE=DATE(?:;|:)", dtstart_line, re.IGNORECASE)
|
|
) or bool(re.fullmatch(r"\d{8}", dtstart_raw))
|
|
events.append(
|
|
{
|
|
"uid": (_values(block, "UID") or [""])[0],
|
|
"dtstart": _norm_dt(dtstart_raw),
|
|
"dtend": _norm_dt((_values(block, "DTEND") or [""])[0]),
|
|
"all_day": all_day,
|
|
"summary": (_values(block, "SUMMARY") or [""])[0],
|
|
"location": (_values(block, "LOCATION") or [None])[0],
|
|
"rrule": (_values(block, "RRULE") or [None])[0],
|
|
"raw": "BEGIN:VEVENT" + block + "END:VEVENT",
|
|
}
|
|
)
|
|
return events
|
|
|
|
|
|
def _norm_dt(value: str) -> str:
|
|
"""Normaliza un DATE/DATE-TIME de iCalendar a ISO legible.
|
|
|
|
20260115 -> 2026-01-15; 20260115T093000Z -> 2026-01-15T09:30:00Z. Valores
|
|
ya ISO o vacíos se devuelven tal cual.
|
|
"""
|
|
if not value:
|
|
return ""
|
|
m = re.fullmatch(r"(\d{4})(\d{2})(\d{2})", value)
|
|
if m:
|
|
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
|
|
m = re.fullmatch(r"(\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})(Z?)", value)
|
|
if m:
|
|
return (
|
|
f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
|
|
f"T{m.group(4)}:{m.group(5)}:{m.group(6)}{m.group(7)}"
|
|
)
|
|
return value
|
|
|
|
|
|
def _dedup(items: list) -> list:
|
|
"""Deduplica una lista de strings preservando el orden (case-insensitive)."""
|
|
seen, out = set(), []
|
|
for it in items:
|
|
key = str(it).strip().lower()
|
|
if key and key not in seen:
|
|
seen.add(key)
|
|
out.append(str(it).strip())
|
|
return out
|
|
|
|
|
|
def norm_phone(p) -> str:
|
|
"""Normaliza un teléfono a sus últimos 9 dígitos (número nacional ES)."""
|
|
if not p:
|
|
return ""
|
|
d = re.sub(r"\D", "", str(p))
|
|
return d[-9:] if len(d) >= 9 else d
|