"""Ejecuta una recipe YAML contra Chrome remoto via CDP.""" import json import re import sys import os import time sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import urllib.request import websocket from core.validate_recipe_yaml import validate_recipe_yaml def _ws_send_recv(ws, msg_id: int, method: str, params: dict, timeout: float = 10.0) -> dict: """Envia un mensaje CDP y espera respuesta con el mismo id.""" import threading result_holder = {} event = threading.Event() original_on_message = ws.on_message def on_message_wrapper(ws_app, message): try: msg = json.loads(message) if msg.get("id") == msg_id: result_holder["result"] = msg event.set() except Exception: pass if original_on_message: original_on_message(ws_app, message) ws.on_message = on_message_wrapper ws.send(json.dumps({"id": msg_id, "method": method, "params": params})) event.wait(timeout=timeout) ws.on_message = original_on_message return result_holder.get("result", {}) def _poll_selector(ws, selector: str, timeout_s: float = 10.0) -> bool: """Polling cada 200ms hasta que document.querySelector(selector) no sea null. Drena eventos CDP (paginas con Page.enable emiten loads, frames, etc.) y matchea por `id` para evitar leer respuestas ajenas o eventos del server. """ deadline = time.time() + timeout_s msg_id = 1000 ws.settimeout(0.5) while time.time() < deadline: ws.send(json.dumps({ "id": msg_id, "method": "Runtime.evaluate", "params": { "expression": f"!!document.querySelector({json.dumps(selector)})", "returnByValue": True, } })) # Leer hasta 30 frames buscando uno con nuestro id; ignorar eventos. got_response = False for _ in range(30): try: raw = ws.recv() except Exception: break if not raw: break try: msg = json.loads(raw) except Exception: continue if msg.get("id") == msg_id: got_response = True val = msg.get("result", {}).get("result", {}).get("value", False) if val: return True break msg_id += 1 if not got_response: time.sleep(0.2) return False def cdp_extract_recipe( recipe_path: str, debug_port: int = 9222, tab_id: str | None = None, record_run: bool = True, ) -> dict: """Ejecuta una recipe YAML contra Chrome remoto via CDP. Args: recipe_path: Ruta al archivo .yaml de la recipe. debug_port: Puerto de depuracion remota de Chrome. Default 9222. tab_id: ID del tab a usar. Si None, busca tab cuyo URL matchee url_pattern. record_run: Si True y output.sink=='data_factory.runs', llama data_factory_record_run. Returns: {status, rows_out, kb_out, duration_ms, error, sample_rows} """ start_ms = int(time.time() * 1000) # Leer y validar recipe try: with open(recipe_path, "r", encoding="utf-8") as f: yaml_text = f.read() except OSError as e: return {"status": "error", "rows_out": 0, "kb_out": 0.0, "duration_ms": 0, "error": str(e), "sample_rows": []} validation = validate_recipe_yaml(yaml_text) if not validation["valid"]: return {"status": "error", "rows_out": 0, "kb_out": 0.0, "duration_ms": 0, "error": "recipe invalida: " + "; ".join(validation["errors"]), "sample_rows": []} recipe = validation["parsed"] url_pattern = recipe["url_pattern"] steps = recipe["steps"] output_cfg = recipe.get("output", {}) sink = output_cfg.get("sink", "stdout") # Obtener lista de tabs try: with urllib.request.urlopen( f"http://127.0.0.1:{debug_port}/json/list", timeout=5 ) as resp: tabs = json.loads(resp.read().decode()) except Exception as e: return {"status": "error", "rows_out": 0, "kb_out": 0.0, "duration_ms": 0, "error": f"no se pudo conectar a Chrome en port {debug_port}: {e}", "sample_rows": []} # Encontrar tab ws_url = None if tab_id: for tab in tabs: if tab.get("id") == tab_id: ws_url = tab.get("webSocketDebuggerUrl") break else: for tab in tabs: tab_url = tab.get("url", "") if re.search(url_pattern, tab_url): ws_url = tab.get("webSocketDebuggerUrl") break if not ws_url: return {"status": "error", "rows_out": 0, "kb_out": 0.0, "duration_ms": 0, "error": f"no tab matching pattern: {url_pattern}", "sample_rows": []} # Ejecutar steps last_result = None try: ws = websocket.create_connection(ws_url, timeout=10) try: for i, step in enumerate(steps): if "wait_selector" in step: selector = step["wait_selector"] found = _poll_selector(ws, selector, timeout_s=10.0) if not found: raise RuntimeError(f"step {i}: timeout esperando selector '{selector}'") elif "js" in step: ws.send(json.dumps({ "id": i + 1, "method": "Runtime.evaluate", "params": { "expression": step["js"], "returnByValue": True, "awaitPromise": True, } })) raw = ws.recv() msg = json.loads(raw) result_obj = msg.get("result", {}).get("result", {}) last_result = result_obj.get("value") finally: ws.close() except Exception as e: return {"status": "error", "rows_out": 0, "kb_out": 0.0, "duration_ms": int(time.time() * 1000) - start_ms, "error": str(e), "sample_rows": []} # Calcular metricas rows = last_result if isinstance(last_result, list) else ( [last_result] if last_result is not None else [] ) rows_out = len(rows) kb_out = len(json.dumps(rows, ensure_ascii=False).encode()) / 1024 sample_rows = rows[:5] duration_ms = int(time.time() * 1000) - start_ms # Sink if sink == "stdout": print(json.dumps(rows, ensure_ascii=False, indent=2)) elif sink == "json_file": out_path = output_cfg.get("path", "output.json") with open(out_path, "w", encoding="utf-8") as f: json.dump(rows, f, ensure_ascii=False, indent=2) elif sink == "duckdb": duckdb_path = output_cfg.get("duckdb_path", "") table_name = output_cfg.get("table", "") if not duckdb_path or not table_name: # not fatal: rows already returned via sample_rows pass else: import duckdb import uuid import datetime # resolve duckdb_path relative to FN_REGISTRY_ROOT if not absolute if not os.path.isabs(duckdb_path): duckdb_path = os.path.join(os.environ.get("FN_REGISTRY_ROOT", ""), duckdb_path) os.makedirs(os.path.dirname(duckdb_path), exist_ok=True) conn = duckdb.connect(duckdb_path) try: if rows: # Detect columns from first row keys (assumes list of dicts). if not isinstance(rows[0], dict): # Fallback: wrap scalar rows as {"value": v}. rows = [{"value": r} for r in rows] cols = list(rows[0].keys()) # Build CREATE TABLE IF NOT EXISTS with VARCHAR for safety # plus extracted_at TIMESTAMP and run_id VARCHAR for lineage. col_defs = ", ".join(f'"{c}" VARCHAR' for c in cols) ddl = ( f'CREATE TABLE IF NOT EXISTS "{table_name}" (' f' run_id VARCHAR, extracted_at TIMESTAMP, {col_defs}' f')' ) conn.execute(ddl) run_id_str = uuid.uuid4().hex[:16] now_iso = datetime.datetime.utcnow().isoformat() + "Z" placeholders = ", ".join(["?"] * (len(cols) + 2)) insert_sql = ( f'INSERT INTO "{table_name}" ' f'(run_id, extracted_at, {", ".join(chr(34) + c + chr(34) for c in cols)}) ' f'VALUES ({placeholders})' ) for r in rows: vals = [run_id_str, now_iso] + [str(r.get(c, "")) for c in cols] conn.execute(insert_sql, vals) # Also record into data_factory.runs with storage info registry_root = os.environ.get("FN_REGISTRY_ROOT", "") if registry_root and record_run: import sqlite3 df_db = os.path.join(registry_root, "apps", "data_factory", "data_factory.db") if os.path.exists(df_db): try: df_conn = sqlite3.connect(df_db) df_conn.execute("PRAGMA foreign_keys = ON") trigger = "dag" if os.environ.get("DAGU_ENV") else "manual" db_id = output_cfg.get("database_id", recipe.get("name", "unknown") + "_db") df_run_id = uuid.uuid4().hex[:16] df_conn.execute( "INSERT INTO runs(id, node_id, started_at, finished_at, status," " rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes," " storage_db_id, storage_table)" " VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", ( df_run_id, recipe.get("name", "unknown"), now_iso, now_iso, "success", 0, rows_out, 0, int(round(kb_out)), duration_ms, trigger, "", json.dumps({"sample": sample_rows[:2]}, ensure_ascii=False)[:1000], db_id, table_name, ), ) df_conn.commit() df_conn.close() except Exception: pass finally: conn.close() elif sink == "data_factory.runs" and record_run: # Escribe DIRECTO a data_factory.db evitando spawn `fn run` (loop infinito # si data_factory_record_run re-ejecuta esta misma funcion). Confia en que # el node ya existe en `nodes` con id == recipe.name. try: import sqlite3 import datetime import uuid registry_root = os.environ.get("FN_REGISTRY_ROOT", "").strip() if not registry_root: # No fatal — el dato ya fue extraido / impreso por otro sink raise RuntimeError("FN_REGISTRY_ROOT not set; cannot locate data_factory.db") db_path = os.path.join(registry_root, "apps", "data_factory", "data_factory.db") trigger = "dag" if os.environ.get("DAGU_ENV") else "manual" run_id = uuid.uuid4().hex[:16] now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") node_id = recipe.get("name", "unknown") conn = sqlite3.connect(db_path) conn.execute("PRAGMA foreign_keys = ON") conn.execute( "INSERT INTO runs(id, node_id, started_at, finished_at, status," " rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes)" " VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)", ( run_id, node_id, now, now, "success", 0, rows_out, 0, int(round(kb_out)), duration_ms, trigger, "", json.dumps({"sample": sample_rows[:2]}, ensure_ascii=False)[:1000], ), ) conn.commit() conn.close() except Exception: # No fatal — el dato ya fue extraido (sample_rows en retorno) pass return { "status": "ok", "rows_out": rows_out, "kb_out": round(kb_out, 2), "duration_ms": duration_ms, "error": "", "sample_rows": sample_rows, }