merge: hardening seguridad osint_db (TrustedHost + escape iCal)
This commit is contained in:
@@ -154,6 +154,16 @@ class CalendarBody(BaseModel):
|
||||
def create_app(cfg: Config) -> FastAPI:
|
||||
"""Construye la app FastAPI con la configuración dada (inyectable en tests)."""
|
||||
app = FastAPI(title="osint_db", docs_url=None, redoc_url=None)
|
||||
# Anti DNS-rebinding: solo acepta requests cuyo Host sea localhost. Sin esto, una
|
||||
# web maliciosa podría rebindear su dominio a 127.0.0.1 y, desde el navegador del
|
||||
# usuario, alcanzar este service (que no tiene auth por ser de uso local) y abusar
|
||||
# de /api/query. "testserver" permite el TestClient de los tests.
|
||||
from starlette.middleware.trustedhost import TrustedHostMiddleware
|
||||
|
||||
app.add_middleware(
|
||||
TrustedHostMiddleware,
|
||||
allowed_hosts=["127.0.0.1", "localhost", "testserver"],
|
||||
)
|
||||
|
||||
def run_readonly(sql: str, params: list, max_rows: int) -> dict:
|
||||
"""Ejecuta un SELECT con la conexión read_only del registry, acotado."""
|
||||
|
||||
+30
-6
@@ -396,6 +396,30 @@ def _safe_resource(uid: str) -> str:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _ical_escape(value) -> str:
|
||||
"""Escapa un valor de texto para una propiedad iCalendar (RFC 5545).
|
||||
|
||||
Evita inyección de propiedades/componentes: un summary/location con saltos de
|
||||
línea o `;`/`,` no puede cerrar el VEVENT ni abrir otro. El `\\r` se elimina
|
||||
(el folding lo aporta el `\\r\\n` de la serialización).
|
||||
"""
|
||||
return (
|
||||
str(value)
|
||||
.replace("\\", "\\\\")
|
||||
.replace("\r", "")
|
||||
.replace("\n", "\\n")
|
||||
.replace(",", "\\,")
|
||||
.replace(";", "\\;")
|
||||
)
|
||||
|
||||
|
||||
def _ical_sanitize(value) -> str:
|
||||
"""Quita saltos de línea de un valor estructurado (UID, RRULE) para evitar
|
||||
que se inyecten propiedades nuevas. No escapa `;`/`,` porque son separadores
|
||||
legítimos en RRULE."""
|
||||
return str(value).replace("\r", "").replace("\n", "")
|
||||
|
||||
|
||||
def _build_vcalendar(uid: str, fields: dict) -> str:
|
||||
"""Compone un VCALENDAR mínimo con un VEVENT desde los campos del evento."""
|
||||
dtstart = (fields.get("dtstart") or "").replace("-", "").replace(":", "")
|
||||
@@ -405,17 +429,17 @@ def _build_vcalendar(uid: str, fields: dict) -> str:
|
||||
"VERSION:2.0",
|
||||
"PRODID:-//osint_db//events//EN",
|
||||
"BEGIN:VEVENT",
|
||||
f"UID:{uid}",
|
||||
f"SUMMARY:{fields.get('summary') or ''}",
|
||||
f"UID:{_ical_sanitize(uid)}",
|
||||
f"SUMMARY:{_ical_escape(fields.get('summary') or '')}",
|
||||
]
|
||||
if dtstart:
|
||||
lines.append(f"DTSTART:{dtstart}")
|
||||
lines.append(f"DTSTART:{_ical_sanitize(dtstart)}")
|
||||
if dtend:
|
||||
lines.append(f"DTEND:{dtend}")
|
||||
lines.append(f"DTEND:{_ical_sanitize(dtend)}")
|
||||
if fields.get("location"):
|
||||
lines.append(f"LOCATION:{fields['location']}")
|
||||
lines.append(f"LOCATION:{_ical_escape(fields['location'])}")
|
||||
if fields.get("rrule"):
|
||||
lines.append(f"RRULE:{fields['rrule']}")
|
||||
lines.append(f"RRULE:{_ical_sanitize(fields['rrule'])}")
|
||||
lines += ["END:VEVENT", "END:VCALENDAR"]
|
||||
return "\r\n".join(lines) + "\r\n"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user