1c8a86594f
Dos funciones nuevas del grupo de capacidad `dav`: - expand_rrule_py_infra (pure): expande una RRULE iCalendar a las fechas de cada ocurrencia dentro de un rango [from, to]. Solo stdlib (datetime, re). Soporta FREQ DAILY/WEEKLY/MONTHLY/YEARLY, INTERVAL, COUNT, UNTIL, BYDAY. 9 tests. - dav_make_calendar_py_infra (impure): crea una coleccion de calendario nueva via MKCALENDAR + PROPPATCH de nombre/color. Idempotente si ya existe. 11 tests. Consumidas por la app osint_web (eventos recurrentes + creacion de agendas). Pagina del grupo dav actualizada con ambas. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
203 lines
7.2 KiB
Python
203 lines
7.2 KiB
Python
"""Expandir una RRULE iCalendar a las fechas de inicio de cada ocurrencia.
|
|
|
|
Implementacion pura y determinista, solo stdlib (datetime, re). NO usa
|
|
python-dateutil (no disponible en los venvs del ecosistema). Soporta un
|
|
subconjunto pragmatico de RFC 5545 suficiente para una agenda personal:
|
|
FREQ (DAILY/WEEKLY/MONTHLY/YEARLY), INTERVAL, COUNT, UNTIL y BYDAY (solo
|
|
para FREQ=WEEKLY). Cualquier otro componente se ignora silenciosamente.
|
|
"""
|
|
|
|
import re
|
|
from datetime import date, datetime, timedelta
|
|
|
|
# Tope de seguridad para no colgar cuando faltan COUNT y UNTIL.
|
|
_MAX_OCCURRENCES = 1000
|
|
|
|
# Mapeo de codigos BYDAY iCal -> weekday() de Python (lunes=0 .. domingo=6).
|
|
_BYDAY_TO_WEEKDAY = {
|
|
"MO": 0,
|
|
"TU": 1,
|
|
"WE": 2,
|
|
"TH": 3,
|
|
"FR": 4,
|
|
"SA": 5,
|
|
"SU": 6,
|
|
}
|
|
|
|
|
|
def _parse_ical_date(value: str) -> date:
|
|
"""Extrae la parte YYYYMMDD de un valor iCal y la devuelve como date.
|
|
|
|
Acepta YYYYMMDD, YYYYMMDDTHHMMSS y YYYYMMDDTHHMMSSZ.
|
|
"""
|
|
digits = value.strip()[:8]
|
|
return date(int(digits[:4]), int(digits[4:6]), int(digits[6:8]))
|
|
|
|
|
|
def _parse_time_suffix(dtstart_ical: str) -> str:
|
|
"""Devuelve la parte de hora 'THHMMSS' del dtstart, o '' si es all-day."""
|
|
m = re.search(r"T(\d{6})", dtstart_ical.strip())
|
|
return f"T{m.group(1)}" if m else ""
|
|
|
|
|
|
def _parse_rrule(rrule: str) -> dict:
|
|
"""Parsea el cuerpo de una RRULE a un dict de componentes en mayusculas."""
|
|
parts: dict[str, str] = {}
|
|
for token in rrule.strip().split(";"):
|
|
if not token or "=" not in token:
|
|
continue
|
|
key, _, val = token.partition("=")
|
|
parts[key.strip().upper()] = val.strip()
|
|
return parts
|
|
|
|
|
|
def _add_months(d: date, months: int) -> date:
|
|
"""Suma `months` meses a una fecha, recortando el dia al ultimo valido."""
|
|
total = (d.year * 12 + (d.month - 1)) + months
|
|
year, month = divmod(total, 12)
|
|
month += 1
|
|
# Recorte de dia: si el dia no existe en el mes destino, usar el ultimo.
|
|
if month == 12:
|
|
next_first = date(year + 1, 1, 1)
|
|
else:
|
|
next_first = date(year, month + 1, 1)
|
|
last_day = (next_first - timedelta(days=1)).day
|
|
return date(year, month, min(d.day, last_day))
|
|
|
|
|
|
def _format_occurrence(d: date, time_suffix: str, all_day: bool) -> str:
|
|
"""Formatea una fecha de ocurrencia como string DTSTART iCal."""
|
|
ymd = f"{d.year:04d}{d.month:02d}{d.day:02d}"
|
|
if all_day or not time_suffix:
|
|
return ymd
|
|
return f"{ymd}{time_suffix}"
|
|
|
|
|
|
def _generate_dates(dtstart: date, rule: dict) -> list[date]:
|
|
"""Genera las fechas de ocurrencia segun la RRULE (antes de filtrar rango).
|
|
|
|
El recorte por rango lo hace el llamador; aqui se respeta COUNT, UNTIL y
|
|
el tope de seguridad _MAX_OCCURRENCES.
|
|
"""
|
|
freq = rule.get("FREQ", "").upper()
|
|
interval = max(1, int(rule["INTERVAL"])) if rule.get("INTERVAL", "").isdigit() else 1
|
|
count = int(rule["COUNT"]) if rule.get("COUNT", "").isdigit() else None
|
|
until = _parse_ical_date(rule["UNTIL"]) if rule.get("UNTIL") else None
|
|
|
|
results: list[date] = []
|
|
|
|
def reached_limit() -> bool:
|
|
if count is not None and len(results) >= count:
|
|
return True
|
|
return len(results) >= _MAX_OCCURRENCES
|
|
|
|
def past_until(d: date) -> bool:
|
|
return until is not None and d > until
|
|
|
|
if freq == "WEEKLY":
|
|
byday = rule.get("BYDAY", "")
|
|
weekdays = [
|
|
_BYDAY_TO_WEEKDAY[code.strip().upper()]
|
|
for code in byday.split(",")
|
|
if code.strip().upper() in _BYDAY_TO_WEEKDAY
|
|
]
|
|
if not weekdays:
|
|
weekdays = [dtstart.weekday()]
|
|
weekdays = sorted(set(weekdays))
|
|
# Lunes de la semana del dtstart.
|
|
week_anchor = dtstart - timedelta(days=dtstart.weekday())
|
|
week_index = 0
|
|
while True:
|
|
week_start = week_anchor + timedelta(weeks=week_index * interval)
|
|
for wd in weekdays:
|
|
occ = week_start + timedelta(days=wd)
|
|
if occ < dtstart:
|
|
continue
|
|
if past_until(occ):
|
|
return results
|
|
if reached_limit():
|
|
return results
|
|
results.append(occ)
|
|
if reached_limit():
|
|
return results
|
|
# Corte: si no hay COUNT ni UNTIL, el llamador acota por range_end,
|
|
# pero el tope duro evita el bucle infinito.
|
|
if until is None and count is None and len(results) >= _MAX_OCCURRENCES:
|
|
return results
|
|
week_index += 1
|
|
# Salvaguarda extra si UNTIL/COUNT no recortan a tiempo.
|
|
if week_index > _MAX_OCCURRENCES:
|
|
return results
|
|
return results
|
|
|
|
# FREQ por periodos simples (DAILY / MONTHLY / YEARLY).
|
|
step = 0
|
|
while True:
|
|
if freq == "DAILY":
|
|
occ = dtstart + timedelta(days=step * interval)
|
|
elif freq == "MONTHLY":
|
|
occ = _add_months(dtstart, step * interval)
|
|
elif freq == "YEARLY":
|
|
occ = _add_months(dtstart, step * interval * 12)
|
|
else:
|
|
# FREQ desconocido o ausente: solo la primera ocurrencia.
|
|
occ = dtstart
|
|
if not past_until(occ):
|
|
results.append(occ)
|
|
return results
|
|
|
|
if past_until(occ):
|
|
return results
|
|
if reached_limit():
|
|
return results
|
|
results.append(occ)
|
|
if reached_limit():
|
|
return results
|
|
if until is None and count is None and len(results) >= _MAX_OCCURRENCES:
|
|
return results
|
|
step += 1
|
|
if step > _MAX_OCCURRENCES:
|
|
return results
|
|
|
|
|
|
def expand_rrule(
|
|
dtstart_ical: str,
|
|
rrule: str,
|
|
range_start: str,
|
|
range_end: str,
|
|
all_day: bool = False,
|
|
) -> list[str]:
|
|
"""Expande una RRULE iCal a las fechas DTSTART de cada ocurrencia en rango.
|
|
|
|
Args:
|
|
dtstart_ical: fecha de inicio del evento maestro en formato iCal crudo.
|
|
Puede ser YYYYMMDD (all-day), YYYYMMDDTHHMMSS o YYYYMMDDTHHMMSSZ.
|
|
Es la primera ocurrencia (la serie incluye dtstart si cae en rango).
|
|
rrule: cuerpo de la RRULE SIN el prefijo 'RRULE:', p.ej.
|
|
'FREQ=WEEKLY;INTERVAL=1;COUNT=10' o 'FREQ=MONTHLY;UNTIL=20261231;BYDAY=MO,WE'.
|
|
range_start: limite inferior del rango como YYYYMMDD (inclusive).
|
|
range_end: limite superior del rango como YYYYMMDD (inclusive).
|
|
all_day: si True las ocurrencias se devuelven como YYYYMMDD; si False
|
|
conservan la parte de hora del dtstart original (misma hora local en
|
|
cada ocurrencia) y devuelven YYYYMMDDTHHMMSS (sin sufijo Z).
|
|
|
|
Returns:
|
|
Lista ordenada de strings DTSTART iCal, una por ocurrencia cuya fecha
|
|
(parte YYYYMMDD del DTSTART) cae en [range_start, range_end]. Lista
|
|
vacia si la RRULE no produce ninguna en rango.
|
|
"""
|
|
rule = _parse_rrule(rrule)
|
|
dtstart = _parse_ical_date(dtstart_ical)
|
|
time_suffix = _parse_time_suffix(dtstart_ical)
|
|
lo = _parse_ical_date(range_start)
|
|
hi = _parse_ical_date(range_end)
|
|
|
|
occurrences = _generate_dates(dtstart, rule)
|
|
|
|
out: list[str] = []
|
|
for occ in occurrences:
|
|
if lo <= occ <= hi:
|
|
out.append(_format_occurrence(occ, time_suffix, all_day))
|
|
out.sort()
|
|
return out
|