Files
fn_registry/python/functions/browser/extract_cmp_tcf.py
T
egutierrez 763e06c127 feat(browser): auto-commit con 178 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-20 18:22:23 +02:00

386 lines
17 KiB
Python

"""Detecta el CMP (Consent Management Platform) de un sitio web y lee su objeto IAB TCF.
Navega por CDP a un Chrome con remote debugging, identifica que CMP usa la pagina
(Didomi, OneTrust, Sourcepoint, Quantcast u otro TCF generico) y vuelca su TC Data
v2 (`__tcfapi('getTCData', 2, ...)`) para contar vendors (data brokers) y propositos
declarados. Pensado para escanear masivamente periodicos espanoles y cruzar los
vendor IDs contra la GVL (Global Vendor List).
Reutiliza la primitiva de transport CDP `cdp_eval_py_browser`: navega via
`location.href = url` y evalua el JS de deteccion/volcado con la misma pestana.
"""
import json
import os
import sys
import time
# Permite importar funciones del registry tanto si se ejecuta desde la raiz del
# repo (cwd) como si se invoca el modulo directamente.
_FN_ROOT = os.path.join(os.path.dirname(__file__), "..")
if _FN_ROOT not in sys.path:
sys.path.insert(0, _FN_ROOT)
from browser.cdp_eval import cdp_eval # noqa: E402
from browser.find_consent_controls_llm import find_consent_controls_llm # noqa: E402
# JS de deteccion del CMP + arranque del volcado TCF. El stub de __tcfapi encola
# el callback hasta que el CMP termina de inicializar; el resultado queda en
# window.__tcdump y se lee en la segunda pasada.
_JS_DETECT = r"""
(function(){
var out={url:location.href,title:document.title,
has_tcfapi:typeof window.__tcfapi==='function',
has_gpp:typeof window.__gpp==='function',
didomi:!!(window.Didomi||window.didomiConfig||document.getElementById('didomi-host')||document.querySelector('[id*=didomi]')),
onetrust:!!(window.OneTrust||window.Optanon||document.getElementById('onetrust-banner-sdk')),
sourcepoint:!!(window._sp_||window.__sp||document.querySelector('[id^=sp_message_container]')),
quantcast:!!(window.__cmp||document.querySelector('.qc-cmp2-container,.qc-cmp-cleanslate'))};
window.__tcdump=null;
if(out.has_tcfapi){try{window.__tcfapi('getTCData',2,function(d,ok){
var vc=(d&&d.vendor&&d.vendor.consents)||{};
var vl=(d&&d.vendor&&d.vendor.legitimateInterests)||{};
var ids={};
Object.keys(vc).forEach(function(k){ids[k]=1;});
Object.keys(vl).forEach(function(k){ids[k]=1;});
window.__tcdump={ok:ok,cmpId:d&&d.cmpId,cmpVersion:d&&d.cmpVersion,tcfPolicyVersion:d&&d.tcfPolicyVersion,
gdprApplies:d&&d.gdprApplies,tcString_len:((d&&d.tcString)||'').length,
n_vendor_consents:Object.keys(vc).length,
n_vendor_li:Object.keys(vl).length,
n_purposes:(d&&d.purpose&&d.purpose.consents)?Object.keys(d.purpose.consents).length:0,
tcf_vendor_ids:Object.keys(ids).map(function(x){return parseInt(x,10);}).filter(function(x){return x>0;})};});}catch(e){window.__tcdump={err:String(e)};}}
return JSON.stringify(out);
})()
"""
# JS de clic en el boton "aceptar todo" del banner de consentimiento. Devuelve
# que metodo funciono: 'sel:<selector>', 'text:<texto>' o 'no-button'. Usado solo
# cuando accept_first=True, para CMPs (Quantcast) que no exponen vendors pre-consent.
_JS_ACCEPT = r"""
(function(){
function clk(el){ if(el){el.click(); return true;} return false; }
// 1) selectores conocidos por CMP
var sels=['#didomi-notice-agree-button','#onetrust-accept-btn-handler',
'.qc-cmp2-summary-buttons button[mode=primary]',
'button[aria-label*=Aceptar]','button[aria-label*=Accept]'];
for(var i=0;i<sels.length;i++){var e=document.querySelector(sels[i]); if(e){e.click(); return 'sel:'+sels[i];}}
// 2) fallback por texto del boton
var btns=[].slice.call(document.querySelectorAll('button, a[role=button], [role=button]'));
var rx=/^(aceptar y continuar|aceptar todo|aceptar|consentir|estoy de acuerdo|de acuerdo|accept all|i agree|agree)$/i;
for(var j=0;j<btns.length;j++){var t=((btns[j].innerText||btns[j].textContent||'').trim()); if(rx.test(t)){btns[j].click(); return 'text:'+t;}}
return 'no-button';
})()
"""
# JS de lectura del volcado + recuento de vendors de Didomi + deteccion de muro
# "pago o consientes".
_JS_READ = r"""
(function(){var r={tcdump:window.__tcdump};
try{if(window.Didomi){var v=Didomi.getVendors?Didomi.getVendors():null;
r.didomi_total_vendors=v?v.length:null;
var req=Didomi.getRequiredVendorIds?Didomi.getRequiredVendorIds():null;
r.didomi_required=req?req.length:null;
r.didomi_required_ids=req?req:null;}
}catch(e){r.didomi_err=String(e);}
try{var t=(document.body.innerText||'').toLowerCase();
r.paywall_consent=/(acepta y suscr|suscr[ií]bete|pago o|aceptar y continuar gratis|pay or|consent or pay|navega sin publicidad|acceder pagando)/.test(t);
}catch(e){}
return JSON.stringify(r);})()
"""
def _parse_json_value(value) -> dict:
"""Convierte el string JSON devuelto por cdp_eval en dict; {} si falla."""
if isinstance(value, dict):
return value
if not isinstance(value, str):
return {}
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except (ValueError, TypeError):
return {}
def _coerce_int(value):
"""Devuelve int(value) si es un entero/float valido, si no None."""
if isinstance(value, bool):
return None
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
return None
def _ids_from_list(raw):
"""Normaliza una lista heterogenea de IDs a una lista de int positivos."""
ids = []
if isinstance(raw, list):
for vid in raw:
iv = _coerce_int(vid)
if iv is None and isinstance(vid, str) and vid.isdigit():
iv = int(vid)
if iv is not None and iv > 0:
ids.append(iv)
return ids
def _read_vendors(port: int, timeout_s: float) -> dict:
"""Re-ejecuta el volcado + lectura del TCF y consolida los vendor_ids.
Pone `__tcdump=null` y vuelve a pedir getTCData (`_JS_DETECT`), espera un
settle corto, lee el volcado (`_JS_READ`) y resuelve los vendor_ids de forma
generica (Didomi required ids o union de consents+legitimateInterests).
Returns:
dict con {"ok":bool, "error":str|None, "read":dict, "vendor_ids":[int],
"n_vendors":int, "n_vendors_total":int|None,
"n_vendors_required":int|None}. Reusado por el flujo normal y
por la re-lectura tras el clic del fallback LLM.
"""
det = cdp_eval(_JS_DETECT, port=port, timeout_s=timeout_s)
if not det.get("ok"):
return {"ok": False, "error": "detect eval failed: " + str(det.get("error", ""))}
time.sleep(2.0)
rd = cdp_eval(_JS_READ, port=port, timeout_s=timeout_s)
if not rd.get("ok"):
return {"ok": False, "error": "read eval failed: " + str(rd.get("error", ""))}
read = _parse_json_value(rd.get("value"))
tcdump = read.get("tcdump") or {}
if not isinstance(tcdump, dict):
tcdump = {}
n_vendor_li = _coerce_int(tcdump.get("n_vendor_li")) or 0
n_vendors_total = _coerce_int(read.get("didomi_total_vendors"))
n_vendors_required = _coerce_int(read.get("didomi_required"))
didomi_ids = _ids_from_list(read.get("didomi_required_ids"))
if didomi_ids:
vendor_ids = didomi_ids
else:
vendor_ids = _ids_from_list(tcdump.get("tcf_vendor_ids"))
if vendor_ids:
n_vendors = len(vendor_ids)
elif n_vendors_required:
n_vendors = n_vendors_required
elif n_vendors_total:
n_vendors = n_vendors_total
else:
n_vendors = n_vendor_li
return {
"ok": True,
"error": None,
"read": read,
"tcdump": tcdump,
"vendor_ids": vendor_ids,
"n_vendors": n_vendors,
"n_vendors_total": n_vendors_total,
"n_vendors_required": n_vendors_required,
}
def extract_cmp_tcf(
url: str,
*,
port: int = 9222,
wait_load_s: float = 7.0,
settle_s: float = 5.0,
timeout_s: float = 30.0,
accept_first: bool = False,
settle_accept_s: float = 4.0,
llm_fallback: bool = False,
) -> dict:
"""Detecta el CMP de `url` y lee su TC Data v2 via CDP.
Args:
url: URL del sitio a escanear (se navega la pestana activa del Chrome).
port: Puerto de remote debugging de Chrome. Default 9222.
wait_load_s: Segundos a esperar tras navegar para que la pagina cargue.
settle_s: Segundos extra a esperar para que el CMP inicialice antes de
arrancar el volcado del TCF.
timeout_s: Timeout (segundos) para cada evaluacion CDP.
accept_first: Si True, ANTES de leer el TCData definitivo intenta ACEPTAR
el banner de consentimiento (clic en "aceptar todo": selectores
conocidos de Didomi/OneTrust/Quantcast + fallback por texto del boton),
espera `settle_accept_s` y re-ejecuta el volcado del TCF. Necesario para
CMPs (Quantcast) que no exponen vendors pre-consent. Default False
(comportamiento identico al historico, no toca el banner).
settle_accept_s: Segundos a esperar tras aceptar el banner para que el CMP
re-emita el TCData poblado. Default 4.0. Solo aplica si accept_first=True.
llm_fallback: Si True (y accept_first=True), SOLO cuando el intento normal de
aceptar el banner deja `vendor_ids` vacio tras leer el TCData, recurre a
`find_consent_controls_llm` (haiku) para localizar el control "aceptar todo"
cuyos selectores hardcodeados no encajaban, lo clica via cdp_eval, espera
`settle_accept_s` y RE-EJECUTA el volcado del TCF. Default False (no llama
nunca al LLM, comportamiento identico). El LLM solo se invoca cuando de
verdad hace falta: si el flujo de selectores/texto ya recupero vendors, NO
gasta la llamada a ask_llm — incluso si el clic salio 'no-button' (caso
Didomi, que expone getRequiredVendorIds sin necesidad de consentir).
Returns:
dict plano consolidado. En el caso feliz:
{"status":"ok","url":...,"final_url":...,"title":...,
"cmp":"didomi"|"onetrust"|"sourcepoint"|"quantcast"|"otro_tcf"|"ninguno",
"cmp_id":int|None,"tcf_policy":int|None,"gdpr_applies":bool|None,
"n_vendors":int,"n_vendors_total":int|None,"n_vendors_required":int|None,
"n_purposes":int|None,"tcstring_len":int,"paywall_consent":bool,
"vendor_ids":[int]}
Cuando accept_first=True se anade ademas "accept_method": lo que devolvio el
JS de clic ('sel:<selector>', 'text:<texto>' o 'no-button').
Cuando ademas se dispara el fallback LLM (llm_fallback=True y el intento normal
fallo) se anaden "llm_used":True y "llm_reason":str (la explicacion del locator),
y accept_method pasa a 'llm:<selector>' (clic LLM exitoso) o 'llm:no-control'
(el LLM no encontro un control aceptable / el clic fallo).
En cualquier fallo (navegacion, eval, JSON parse):
{"status":"error","url":url,"error":"..."}
Nunca lanza.
"""
try:
# 1. Navegar la pestana activa via JS (reutiliza el transport CDP).
nav_expr = "location.href=" + json.dumps(url) + "; true"
nav = cdp_eval(nav_expr, port=port, timeout_s=timeout_s)
if not nav.get("ok"):
return {
"status": "error",
"url": url,
"error": "navigate failed: " + str(nav.get("error", "")),
}
# 2. Esperar carga + settle para que el CMP inicialice.
time.sleep(max(0.0, wait_load_s))
time.sleep(max(0.0, settle_s))
# 3. Deteccion del CMP + arranque del volcado del TCF.
det = cdp_eval(_JS_DETECT, port=port, timeout_s=timeout_s)
if not det.get("ok"):
return {
"status": "error",
"url": url,
"error": "detect eval failed: " + str(det.get("error", "")),
}
detect = _parse_json_value(det.get("value"))
# 3b. Si accept_first: aceptar el banner y re-arrancar el volcado del TCF.
# Algunos CMP (Quantcast) no exponen ningun vendor en getTCData hasta que
# el usuario interactua con el banner. Tras aceptar, re-ejecutamos _JS_DETECT
# (que pone __tcdump=null y vuelve a pedir getTCData), ahora ya poblado.
accept_method = None
if accept_first:
ac = cdp_eval(_JS_ACCEPT, port=port, timeout_s=timeout_s)
accept_method = ac.get("value") if ac.get("ok") else "eval-failed"
time.sleep(max(0.0, settle_accept_s))
# 4. Lectura del volcado + consolidacion de vendors (helper reutilizable).
rv = _read_vendors(port, timeout_s)
if not rv.get("ok"):
return {"status": "error", "url": url, "error": rv.get("error", "read failed")}
read = rv["read"]
tcdump = rv["tcdump"]
vendor_ids = rv["vendor_ids"]
n_vendors = rv["n_vendors"]
n_vendors_total = rv["n_vendors_total"]
n_vendors_required = rv["n_vendors_required"]
# 4b. Fallback LLM — SOLO si el flujo normal de selectores fallo de verdad.
# "Fallo de verdad" = no se recuperaron vendors (vendor_ids vacio). El criterio
# rector del encargo es no malgastar ask_llm en sitios que ya dieron vendors:
# por eso un clic 'no-button' que aun asi dejo vendor_ids poblado (caso Didomi,
# que expone getRequiredVendorIds sin consentir) NO dispara el LLM. El LLM solo
# entra cuando ni los selectores ni el texto lograron poblar vendor_ids.
llm_used = False
llm_reason = None
normal_failed = not vendor_ids
if accept_first and llm_fallback and normal_failed:
llm_used = True
locator = find_consent_controls_llm(port=port, max_candidates=80)
llm_reason = locator.get("reason")
accept_selector = locator.get("accept_selector")
if accept_selector:
# Clicar el control elegido por el LLM. accept_selector tiene
# comillas dobles ([data-fnllm="N"]); json.dumps lo escapa bien
# al incrustarlo como string-literal JS.
sel_lit = json.dumps(accept_selector)
click_expr = (
"(function(){var e=document.querySelector(" + sel_lit + ");"
"if(e){e.click();return true;}return false;})()"
)
cdp_eval(click_expr, port=port, timeout_s=timeout_s)
time.sleep(max(0.0, settle_accept_s))
rv2 = _read_vendors(port, timeout_s)
if rv2.get("ok"):
read = rv2["read"]
tcdump = rv2["tcdump"]
vendor_ids = rv2["vendor_ids"]
n_vendors = rv2["n_vendors"]
n_vendors_total = rv2["n_vendors_total"]
n_vendors_required = rv2["n_vendors_required"]
accept_method = "llm:" + accept_selector
else:
# El LLM no encontro un control aceptable (accept_idx null) o
# status error: marcar sin romper y seguir con lo que haya.
accept_method = "llm:no-control"
# 5. Consolidar el resto de campos a partir del tcdump/detect.
cmp_id = _coerce_int(tcdump.get("cmpId"))
tcf_policy = _coerce_int(tcdump.get("tcfPolicyVersion"))
gdpr_applies = tcdump.get("gdprApplies")
if not isinstance(gdpr_applies, bool):
gdpr_applies = None
n_purposes = _coerce_int(tcdump.get("n_purposes"))
tcstring_len = _coerce_int(tcdump.get("tcString_len")) or 0
# Derivar el CMP.
if cmp_id == 7 or detect.get("didomi"):
cmp = "didomi"
elif detect.get("onetrust"):
cmp = "onetrust"
elif detect.get("sourcepoint"):
cmp = "sourcepoint"
elif detect.get("quantcast"):
cmp = "quantcast"
elif detect.get("has_tcfapi"):
cmp = "otro_tcf"
else:
cmp = "ninguno"
result = {
"status": "ok",
"url": url,
"final_url": detect.get("url") or read.get("url") or url,
"title": detect.get("title", ""),
"cmp": cmp,
"cmp_id": cmp_id,
"tcf_policy": tcf_policy,
"gdpr_applies": gdpr_applies,
"n_vendors": n_vendors,
"n_vendors_total": n_vendors_total,
"n_vendors_required": n_vendors_required,
"n_purposes": n_purposes,
"tcstring_len": tcstring_len,
"paywall_consent": bool(read.get("paywall_consent")),
"vendor_ids": vendor_ids,
}
if accept_first:
result["accept_method"] = accept_method
if llm_used:
result["llm_used"] = True
result["llm_reason"] = llm_reason
return result
except Exception as e: # noqa: BLE001 — nunca relanzar, devolver status error
return {"status": "error", "url": url, "error": str(e)}
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else "https://www.lavanguardia.com"
p = int(sys.argv[2]) if len(sys.argv) > 2 else 9222
accept = len(sys.argv) > 3 and sys.argv[3] in ("1", "true", "accept", "--accept")
llm = len(sys.argv) > 4 and sys.argv[4] in ("1", "true", "llm", "--llm")
out = extract_cmp_tcf(target, port=p, accept_first=accept, llm_fallback=llm)
print(json.dumps(out, ensure_ascii=False, indent=2))