"""Detecta el CMP (Consent Management Platform) de un sitio web y lee su objeto IAB TCF. Navega por CDP a un Chrome con remote debugging, identifica que CMP usa la pagina (Didomi, OneTrust, Sourcepoint, Quantcast u otro TCF generico) y vuelca su TC Data v2 (`__tcfapi('getTCData', 2, ...)`) para contar vendors (data brokers) y propositos declarados. Pensado para escanear masivamente periodicos espanoles y cruzar los vendor IDs contra la GVL (Global Vendor List). Reutiliza la primitiva de transport CDP `cdp_eval_py_browser`: navega via `location.href = url` y evalua el JS de deteccion/volcado con la misma pestana. """ import json import os import sys import time # Permite importar funciones del registry tanto si se ejecuta desde la raiz del # repo (cwd) como si se invoca el modulo directamente. _FN_ROOT = os.path.join(os.path.dirname(__file__), "..") if _FN_ROOT not in sys.path: sys.path.insert(0, _FN_ROOT) from browser.cdp_eval import cdp_eval # noqa: E402 from browser.find_consent_controls_llm import find_consent_controls_llm # noqa: E402 # JS de deteccion del CMP + arranque del volcado TCF. El stub de __tcfapi encola # el callback hasta que el CMP termina de inicializar; el resultado queda en # window.__tcdump y se lee en la segunda pasada. _JS_DETECT = r""" (function(){ var out={url:location.href,title:document.title, has_tcfapi:typeof window.__tcfapi==='function', has_gpp:typeof window.__gpp==='function', didomi:!!(window.Didomi||window.didomiConfig||document.getElementById('didomi-host')||document.querySelector('[id*=didomi]')), onetrust:!!(window.OneTrust||window.Optanon||document.getElementById('onetrust-banner-sdk')), sourcepoint:!!(window._sp_||window.__sp||document.querySelector('[id^=sp_message_container]')), quantcast:!!(window.__cmp||document.querySelector('.qc-cmp2-container,.qc-cmp-cleanslate'))}; window.__tcdump=null; if(out.has_tcfapi){try{window.__tcfapi('getTCData',2,function(d,ok){ var vc=(d&&d.vendor&&d.vendor.consents)||{}; var vl=(d&&d.vendor&&d.vendor.legitimateInterests)||{}; var ids={}; Object.keys(vc).forEach(function(k){ids[k]=1;}); Object.keys(vl).forEach(function(k){ids[k]=1;}); window.__tcdump={ok:ok,cmpId:d&&d.cmpId,cmpVersion:d&&d.cmpVersion,tcfPolicyVersion:d&&d.tcfPolicyVersion, gdprApplies:d&&d.gdprApplies,tcString_len:((d&&d.tcString)||'').length, n_vendor_consents:Object.keys(vc).length, n_vendor_li:Object.keys(vl).length, n_purposes:(d&&d.purpose&&d.purpose.consents)?Object.keys(d.purpose.consents).length:0, tcf_vendor_ids:Object.keys(ids).map(function(x){return parseInt(x,10);}).filter(function(x){return x>0;})};});}catch(e){window.__tcdump={err:String(e)};}} return JSON.stringify(out); })() """ # JS de clic en el boton "aceptar todo" del banner de consentimiento. Devuelve # que metodo funciono: 'sel:', 'text:' o 'no-button'. Usado solo # cuando accept_first=True, para CMPs (Quantcast) que no exponen vendors pre-consent. _JS_ACCEPT = r""" (function(){ function clk(el){ if(el){el.click(); return true;} return false; } // 1) selectores conocidos por CMP var sels=['#didomi-notice-agree-button','#onetrust-accept-btn-handler', '.qc-cmp2-summary-buttons button[mode=primary]', 'button[aria-label*=Aceptar]','button[aria-label*=Accept]']; for(var i=0;i dict: """Convierte el string JSON devuelto por cdp_eval en dict; {} si falla.""" if isinstance(value, dict): return value if not isinstance(value, str): return {} try: parsed = json.loads(value) return parsed if isinstance(parsed, dict) else {} except (ValueError, TypeError): return {} def _coerce_int(value): """Devuelve int(value) si es un entero/float valido, si no None.""" if isinstance(value, bool): return None if isinstance(value, int): return value if isinstance(value, float): return int(value) return None def _ids_from_list(raw): """Normaliza una lista heterogenea de IDs a una lista de int positivos.""" ids = [] if isinstance(raw, list): for vid in raw: iv = _coerce_int(vid) if iv is None and isinstance(vid, str) and vid.isdigit(): iv = int(vid) if iv is not None and iv > 0: ids.append(iv) return ids def _read_vendors(port: int, timeout_s: float) -> dict: """Re-ejecuta el volcado + lectura del TCF y consolida los vendor_ids. Pone `__tcdump=null` y vuelve a pedir getTCData (`_JS_DETECT`), espera un settle corto, lee el volcado (`_JS_READ`) y resuelve los vendor_ids de forma generica (Didomi required ids o union de consents+legitimateInterests). Returns: dict con {"ok":bool, "error":str|None, "read":dict, "vendor_ids":[int], "n_vendors":int, "n_vendors_total":int|None, "n_vendors_required":int|None}. Reusado por el flujo normal y por la re-lectura tras el clic del fallback LLM. """ det = cdp_eval(_JS_DETECT, port=port, timeout_s=timeout_s) if not det.get("ok"): return {"ok": False, "error": "detect eval failed: " + str(det.get("error", ""))} time.sleep(2.0) rd = cdp_eval(_JS_READ, port=port, timeout_s=timeout_s) if not rd.get("ok"): return {"ok": False, "error": "read eval failed: " + str(rd.get("error", ""))} read = _parse_json_value(rd.get("value")) tcdump = read.get("tcdump") or {} if not isinstance(tcdump, dict): tcdump = {} n_vendor_li = _coerce_int(tcdump.get("n_vendor_li")) or 0 n_vendors_total = _coerce_int(read.get("didomi_total_vendors")) n_vendors_required = _coerce_int(read.get("didomi_required")) didomi_ids = _ids_from_list(read.get("didomi_required_ids")) if didomi_ids: vendor_ids = didomi_ids else: vendor_ids = _ids_from_list(tcdump.get("tcf_vendor_ids")) if vendor_ids: n_vendors = len(vendor_ids) elif n_vendors_required: n_vendors = n_vendors_required elif n_vendors_total: n_vendors = n_vendors_total else: n_vendors = n_vendor_li return { "ok": True, "error": None, "read": read, "tcdump": tcdump, "vendor_ids": vendor_ids, "n_vendors": n_vendors, "n_vendors_total": n_vendors_total, "n_vendors_required": n_vendors_required, } def extract_cmp_tcf( url: str, *, port: int = 9222, wait_load_s: float = 7.0, settle_s: float = 5.0, timeout_s: float = 30.0, accept_first: bool = False, settle_accept_s: float = 4.0, llm_fallback: bool = False, ) -> dict: """Detecta el CMP de `url` y lee su TC Data v2 via CDP. Args: url: URL del sitio a escanear (se navega la pestana activa del Chrome). port: Puerto de remote debugging de Chrome. Default 9222. wait_load_s: Segundos a esperar tras navegar para que la pagina cargue. settle_s: Segundos extra a esperar para que el CMP inicialice antes de arrancar el volcado del TCF. timeout_s: Timeout (segundos) para cada evaluacion CDP. accept_first: Si True, ANTES de leer el TCData definitivo intenta ACEPTAR el banner de consentimiento (clic en "aceptar todo": selectores conocidos de Didomi/OneTrust/Quantcast + fallback por texto del boton), espera `settle_accept_s` y re-ejecuta el volcado del TCF. Necesario para CMPs (Quantcast) que no exponen vendors pre-consent. Default False (comportamiento identico al historico, no toca el banner). settle_accept_s: Segundos a esperar tras aceptar el banner para que el CMP re-emita el TCData poblado. Default 4.0. Solo aplica si accept_first=True. llm_fallback: Si True (y accept_first=True), SOLO cuando el intento normal de aceptar el banner deja `vendor_ids` vacio tras leer el TCData, recurre a `find_consent_controls_llm` (haiku) para localizar el control "aceptar todo" cuyos selectores hardcodeados no encajaban, lo clica via cdp_eval, espera `settle_accept_s` y RE-EJECUTA el volcado del TCF. Default False (no llama nunca al LLM, comportamiento identico). El LLM solo se invoca cuando de verdad hace falta: si el flujo de selectores/texto ya recupero vendors, NO gasta la llamada a ask_llm — incluso si el clic salio 'no-button' (caso Didomi, que expone getRequiredVendorIds sin necesidad de consentir). Returns: dict plano consolidado. En el caso feliz: {"status":"ok","url":...,"final_url":...,"title":..., "cmp":"didomi"|"onetrust"|"sourcepoint"|"quantcast"|"otro_tcf"|"ninguno", "cmp_id":int|None,"tcf_policy":int|None,"gdpr_applies":bool|None, "n_vendors":int,"n_vendors_total":int|None,"n_vendors_required":int|None, "n_purposes":int|None,"tcstring_len":int,"paywall_consent":bool, "vendor_ids":[int]} Cuando accept_first=True se anade ademas "accept_method": lo que devolvio el JS de clic ('sel:', 'text:' o 'no-button'). Cuando ademas se dispara el fallback LLM (llm_fallback=True y el intento normal fallo) se anaden "llm_used":True y "llm_reason":str (la explicacion del locator), y accept_method pasa a 'llm:' (clic LLM exitoso) o 'llm:no-control' (el LLM no encontro un control aceptable / el clic fallo). En cualquier fallo (navegacion, eval, JSON parse): {"status":"error","url":url,"error":"..."} Nunca lanza. """ try: # 1. Navegar la pestana activa via JS (reutiliza el transport CDP). nav_expr = "location.href=" + json.dumps(url) + "; true" nav = cdp_eval(nav_expr, port=port, timeout_s=timeout_s) if not nav.get("ok"): return { "status": "error", "url": url, "error": "navigate failed: " + str(nav.get("error", "")), } # 2. Esperar carga + settle para que el CMP inicialice. time.sleep(max(0.0, wait_load_s)) time.sleep(max(0.0, settle_s)) # 3. Deteccion del CMP + arranque del volcado del TCF. det = cdp_eval(_JS_DETECT, port=port, timeout_s=timeout_s) if not det.get("ok"): return { "status": "error", "url": url, "error": "detect eval failed: " + str(det.get("error", "")), } detect = _parse_json_value(det.get("value")) # 3b. Si accept_first: aceptar el banner y re-arrancar el volcado del TCF. # Algunos CMP (Quantcast) no exponen ningun vendor en getTCData hasta que # el usuario interactua con el banner. Tras aceptar, re-ejecutamos _JS_DETECT # (que pone __tcdump=null y vuelve a pedir getTCData), ahora ya poblado. accept_method = None if accept_first: ac = cdp_eval(_JS_ACCEPT, port=port, timeout_s=timeout_s) accept_method = ac.get("value") if ac.get("ok") else "eval-failed" time.sleep(max(0.0, settle_accept_s)) # 4. Lectura del volcado + consolidacion de vendors (helper reutilizable). rv = _read_vendors(port, timeout_s) if not rv.get("ok"): return {"status": "error", "url": url, "error": rv.get("error", "read failed")} read = rv["read"] tcdump = rv["tcdump"] vendor_ids = rv["vendor_ids"] n_vendors = rv["n_vendors"] n_vendors_total = rv["n_vendors_total"] n_vendors_required = rv["n_vendors_required"] # 4b. Fallback LLM — SOLO si el flujo normal de selectores fallo de verdad. # "Fallo de verdad" = no se recuperaron vendors (vendor_ids vacio). El criterio # rector del encargo es no malgastar ask_llm en sitios que ya dieron vendors: # por eso un clic 'no-button' que aun asi dejo vendor_ids poblado (caso Didomi, # que expone getRequiredVendorIds sin consentir) NO dispara el LLM. El LLM solo # entra cuando ni los selectores ni el texto lograron poblar vendor_ids. llm_used = False llm_reason = None normal_failed = not vendor_ids if accept_first and llm_fallback and normal_failed: llm_used = True locator = find_consent_controls_llm(port=port, max_candidates=80) llm_reason = locator.get("reason") accept_selector = locator.get("accept_selector") if accept_selector: # Clicar el control elegido por el LLM. accept_selector tiene # comillas dobles ([data-fnllm="N"]); json.dumps lo escapa bien # al incrustarlo como string-literal JS. sel_lit = json.dumps(accept_selector) click_expr = ( "(function(){var e=document.querySelector(" + sel_lit + ");" "if(e){e.click();return true;}return false;})()" ) cdp_eval(click_expr, port=port, timeout_s=timeout_s) time.sleep(max(0.0, settle_accept_s)) rv2 = _read_vendors(port, timeout_s) if rv2.get("ok"): read = rv2["read"] tcdump = rv2["tcdump"] vendor_ids = rv2["vendor_ids"] n_vendors = rv2["n_vendors"] n_vendors_total = rv2["n_vendors_total"] n_vendors_required = rv2["n_vendors_required"] accept_method = "llm:" + accept_selector else: # El LLM no encontro un control aceptable (accept_idx null) o # status error: marcar sin romper y seguir con lo que haya. accept_method = "llm:no-control" # 5. Consolidar el resto de campos a partir del tcdump/detect. cmp_id = _coerce_int(tcdump.get("cmpId")) tcf_policy = _coerce_int(tcdump.get("tcfPolicyVersion")) gdpr_applies = tcdump.get("gdprApplies") if not isinstance(gdpr_applies, bool): gdpr_applies = None n_purposes = _coerce_int(tcdump.get("n_purposes")) tcstring_len = _coerce_int(tcdump.get("tcString_len")) or 0 # Derivar el CMP. if cmp_id == 7 or detect.get("didomi"): cmp = "didomi" elif detect.get("onetrust"): cmp = "onetrust" elif detect.get("sourcepoint"): cmp = "sourcepoint" elif detect.get("quantcast"): cmp = "quantcast" elif detect.get("has_tcfapi"): cmp = "otro_tcf" else: cmp = "ninguno" result = { "status": "ok", "url": url, "final_url": detect.get("url") or read.get("url") or url, "title": detect.get("title", ""), "cmp": cmp, "cmp_id": cmp_id, "tcf_policy": tcf_policy, "gdpr_applies": gdpr_applies, "n_vendors": n_vendors, "n_vendors_total": n_vendors_total, "n_vendors_required": n_vendors_required, "n_purposes": n_purposes, "tcstring_len": tcstring_len, "paywall_consent": bool(read.get("paywall_consent")), "vendor_ids": vendor_ids, } if accept_first: result["accept_method"] = accept_method if llm_used: result["llm_used"] = True result["llm_reason"] = llm_reason return result except Exception as e: # noqa: BLE001 — nunca relanzar, devolver status error return {"status": "error", "url": url, "error": str(e)} if __name__ == "__main__": target = sys.argv[1] if len(sys.argv) > 1 else "https://www.lavanguardia.com" p = int(sys.argv[2]) if len(sys.argv) > 2 else 9222 accept = len(sys.argv) > 3 and sys.argv[3] in ("1", "true", "accept", "--accept") llm = len(sys.argv) > 4 and sys.argv[4] in ("1", "true", "llm", "--llm") out = extract_cmp_tcf(target, port=p, accept_first=accept, llm_fallback=llm) print(json.dumps(out, ensure_ascii=False, indent=2))