Files
osint_db/server/davparse.py
T

134 lines
4.5 KiB
Python

"""Parseo ligero de vCard y de iCalendar para el ingest DAV.
Lógica propia de la app (glue específico del dominio): el registry baja las
colecciones en bruto con dav_get_collection y aquí se extraen los campos
mínimos que las tablas maestras contacts y events necesitan. Mismo enfoque de
regex con unfold que projects/osint/tools/sync_dav_to_osint.py.
"""
from __future__ import annotations
import re
def _unfold(text: str) -> str:
"""Deshace el folding de líneas (continuación con espacio o tab)."""
return re.sub(r"\r?\n[ \t]", "", text)
def _values(text: str, prop: str) -> list:
"""Devuelve todos los valores de una propiedad (TEL, UID, DTSTART, ...).
Acepta PROP;PARAMS:valor y PROP:valor (con prefijo itemN. opcional) y
decodifica los escapes simples (\\n, \\,, \\;, \\\\).
"""
vals = []
for line in text.splitlines():
m = re.match(rf"^(?:item\d+\.)?{prop}(?:;[^:]*)?:(.*)$", line, re.IGNORECASE)
if m:
v = m.group(1).strip()
v = (
v.replace("\\n", "\n")
.replace("\\,", ",")
.replace("\\;", ";")
.replace("\\\\", "\\")
).strip()
if v:
vals.append(v)
return vals
def _prop_line(text: str, prop: str) -> str:
"""Devuelve la primera línea completa de una propiedad (con sus params)."""
for line in text.splitlines():
if re.match(rf"^(?:item\d+\.)?{prop}(?:;|:)", line, re.IGNORECASE):
return line
return ""
def parse_vcard(vcard_text: str) -> dict:
"""Extrae los campos mínimos de un vCard para la tabla contacts.
Devuelve {uid, fn, tels, emails}. tels y emails son listas deduplicadas
preservando el orden.
"""
txt = _unfold(vcard_text)
return {
"uid": (_values(txt, "UID") or [""])[0],
"fn": (_values(txt, "FN") or [""])[0],
"tels": _dedup(_values(txt, "TEL")),
"emails": _dedup(_values(txt, "EMAIL")),
}
def parse_ical_events(ical_text: str) -> list:
"""Extrae los VEVENT de un VCALENDAR para la tabla events.
Devuelve una lista de dicts {uid, dtstart, dtend, all_day, summary,
location, rrule, raw}. dtstart/dtend se normalizan a ISO básico
(YYYY-MM-DD o YYYY-MM-DDTHH:MM:SS, conservando la Z de UTC si la trae).
all_day es True cuando DTSTART es VALUE=DATE (sin componente de hora).
"""
events = []
txt = _unfold(ical_text)
for block in re.findall(
r"BEGIN:VEVENT(.*?)END:VEVENT", txt, re.DOTALL | re.IGNORECASE
):
dtstart_line = _prop_line(block, "DTSTART")
dtstart_raw = (_values(block, "DTSTART") or [""])[0]
all_day = bool(
re.search(r";VALUE=DATE(?:;|:)", dtstart_line, re.IGNORECASE)
) or bool(re.fullmatch(r"\d{8}", dtstart_raw))
events.append(
{
"uid": (_values(block, "UID") or [""])[0],
"dtstart": _norm_dt(dtstart_raw),
"dtend": _norm_dt((_values(block, "DTEND") or [""])[0]),
"all_day": all_day,
"summary": (_values(block, "SUMMARY") or [""])[0],
"location": (_values(block, "LOCATION") or [None])[0],
"rrule": (_values(block, "RRULE") or [None])[0],
"raw": "BEGIN:VEVENT" + block + "END:VEVENT",
}
)
return events
def _norm_dt(value: str) -> str:
"""Normaliza un DATE/DATE-TIME de iCalendar a ISO legible.
20260115 -> 2026-01-15; 20260115T093000Z -> 2026-01-15T09:30:00Z. Valores
ya ISO o vacíos se devuelven tal cual.
"""
if not value:
return ""
m = re.fullmatch(r"(\d{4})(\d{2})(\d{2})", value)
if m:
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
m = re.fullmatch(r"(\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})(Z?)", value)
if m:
return (
f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
f"T{m.group(4)}:{m.group(5)}:{m.group(6)}{m.group(7)}"
)
return value
def _dedup(items: list) -> list:
"""Deduplica una lista de strings preservando el orden (case-insensitive)."""
seen, out = set(), []
for it in items:
key = str(it).strip().lower()
if key and key not in seen:
seen.add(key)
out.append(str(it).strip())
return out
def norm_phone(p) -> str:
"""Normaliza un teléfono a sus últimos 9 dígitos (número nacional ES)."""
if not p:
return ""
d = re.sub(r"\D", "", str(p))
return d[-9:] if len(d) >= 9 else d