chore: auto-commit (286 archivos)

- .claude/agents/fn-orquestador/SKILL.md
- .claude/commands/fn_claude.md
- .claude/rules/INDEX.md
- .claude/rules/cpp_apps.md
- .claude/rules/ids_naming.md
- CHANGELOG.md
- apps/dag_engine/README.md
- apps/dag_engine/api.go
- apps/dag_engine/dags_migrated/example.yaml
- apps/dag_engine/dags_migrated/example_lineage_tracking.yaml
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-16 16:33:22 +02:00
parent 0b9af8f1bb
commit a03675113a
281 changed files with 12596 additions and 19526 deletions
@@ -0,0 +1,69 @@
---
name: cdp_extract_recipe
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def cdp_extract_recipe(recipe_path: str, debug_port: int = 9222, tab_id: str | None = None, record_run: bool = True) -> dict"
description: "Ejecuta una recipe YAML contra Chrome remoto via CDP. Valida recipe, busca tab por url_pattern, ejecuta steps (wait_selector/js) y envia resultado al sink declarado."
tags: [navegator, cdp, recipe, scraping, pipeline]
uses_functions: [validate_recipe_yaml_py_core, data_factory_record_run_py_pipelines]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [json, re, sys, os, time, urllib.request, websocket]
params:
- name: recipe_path
desc: "Ruta al archivo .yaml de la recipe (absoluta o relativa al cwd)."
- name: debug_port
desc: "Puerto de depuracion remota de Chrome. Default 9222."
- name: tab_id
desc: "ID del tab a usar. Si None, busca tab cuyo URL matchee url_pattern de la recipe."
- name: record_run
desc: "Si True y output.sink=='data_factory.runs', registra la ejecucion en data_factory."
output: "dict {status: ok|error, rows_out: int, kb_out: float, duration_ms: int, error: str, sample_rows: list}"
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/cdp_extract_recipe.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
from pipelines.cdp_extract_recipe import cdp_extract_recipe
result = cdp_extract_recipe(
recipe_path="recipes/product_list.yaml",
debug_port=9222,
)
print(result["status"], result["rows_out"], "rows")
# ok 42 rows
```
Recipe de ejemplo (`recipes/product_list.yaml`):
```yaml
name: product_list
url_pattern: "https://shop\\.example\\.com/products.*"
steps:
- wait_selector: ".product-card"
- js: "Array.from(document.querySelectorAll('.product-card')).map(e => ({name: e.querySelector('h2').innerText, price: e.querySelector('.price').innerText}))"
output:
sink: stdout
```
## Cuando usarla
Cuando tienes una recipe YAML validada y Chrome corriendo con remote debugging, y quieres extraer datos en un solo paso sin montar pipeline manualmente. Encadena con `cdp_open_url_and_wait` si necesitas abrir la URL primero.
## Gotchas
- Chrome debe estar corriendo con `--remote-debugging-port=<debug_port>`.
- `wait_selector` usa polling sync sobre el WebSocket (200ms interval, 10s timeout) — no apto para paginas con lazy load muy largo.
- El ultimo step `js` debe devolver el dato final (array o valor). Steps intermedios pueden preparar el DOM.
- `data_factory_record_run` falla silenciosamente si no hay DB configurada — el dato ya fue extraido y devuelto.
- `websocket-client` debe estar instalado en el venv.
@@ -0,0 +1,210 @@
"""Ejecuta una recipe YAML contra Chrome remoto via CDP."""
import json
import re
import sys
import os
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import urllib.request
import websocket
from core.validate_recipe_yaml import validate_recipe_yaml
def _ws_send_recv(ws, msg_id: int, method: str, params: dict, timeout: float = 10.0) -> dict:
"""Envia un mensaje CDP y espera respuesta con el mismo id."""
import threading
result_holder = {}
event = threading.Event()
original_on_message = ws.on_message
def on_message_wrapper(ws_app, message):
try:
msg = json.loads(message)
if msg.get("id") == msg_id:
result_holder["result"] = msg
event.set()
except Exception:
pass
if original_on_message:
original_on_message(ws_app, message)
ws.on_message = on_message_wrapper
ws.send(json.dumps({"id": msg_id, "method": method, "params": params}))
event.wait(timeout=timeout)
ws.on_message = original_on_message
return result_holder.get("result", {})
def _poll_selector(ws, selector: str, timeout_s: float = 10.0) -> bool:
"""Polling cada 200ms hasta que document.querySelector(selector) no sea null."""
deadline = time.time() + timeout_s
msg_id = 1000
while time.time() < deadline:
ws.send(json.dumps({
"id": msg_id,
"method": "Runtime.evaluate",
"params": {
"expression": f"!!document.querySelector({json.dumps(selector)})",
"returnByValue": True,
}
}))
time.sleep(0.2)
msg_id += 1
# Leer respuesta en loop simple (websocket-client sync)
# Para modo sync usamos recv()
try:
raw = ws.sock.recv()
if raw:
msg = json.loads(raw)
val = msg.get("result", {}).get("result", {}).get("value", False)
if val:
return True
except Exception:
pass
return False
def cdp_extract_recipe(
recipe_path: str,
debug_port: int = 9222,
tab_id: str | None = None,
record_run: bool = True,
) -> dict:
"""Ejecuta una recipe YAML contra Chrome remoto via CDP.
Args:
recipe_path: Ruta al archivo .yaml de la recipe.
debug_port: Puerto de depuracion remota de Chrome. Default 9222.
tab_id: ID del tab a usar. Si None, busca tab cuyo URL matchee url_pattern.
record_run: Si True y output.sink=='data_factory.runs', llama data_factory_record_run.
Returns:
{status, rows_out, kb_out, duration_ms, error, sample_rows}
"""
start_ms = int(time.time() * 1000)
# Leer y validar recipe
try:
with open(recipe_path, "r", encoding="utf-8") as f:
yaml_text = f.read()
except OSError as e:
return {"status": "error", "rows_out": 0, "kb_out": 0.0,
"duration_ms": 0, "error": str(e), "sample_rows": []}
validation = validate_recipe_yaml(yaml_text)
if not validation["valid"]:
return {"status": "error", "rows_out": 0, "kb_out": 0.0,
"duration_ms": 0, "error": "recipe invalida: " + "; ".join(validation["errors"]),
"sample_rows": []}
recipe = validation["parsed"]
url_pattern = recipe["url_pattern"]
steps = recipe["steps"]
output_cfg = recipe.get("output", {})
sink = output_cfg.get("sink", "stdout")
# Obtener lista de tabs
try:
with urllib.request.urlopen(
f"http://127.0.0.1:{debug_port}/json/list", timeout=5
) as resp:
tabs = json.loads(resp.read().decode())
except Exception as e:
return {"status": "error", "rows_out": 0, "kb_out": 0.0,
"duration_ms": 0,
"error": f"no se pudo conectar a Chrome en port {debug_port}: {e}",
"sample_rows": []}
# Encontrar tab
ws_url = None
if tab_id:
for tab in tabs:
if tab.get("id") == tab_id:
ws_url = tab.get("webSocketDebuggerUrl")
break
else:
for tab in tabs:
tab_url = tab.get("url", "")
if re.search(url_pattern, tab_url):
ws_url = tab.get("webSocketDebuggerUrl")
break
if not ws_url:
return {"status": "error", "rows_out": 0, "kb_out": 0.0,
"duration_ms": 0,
"error": f"no tab matching pattern: {url_pattern}",
"sample_rows": []}
# Ejecutar steps
last_result = None
try:
ws = websocket.create_connection(ws_url, timeout=10)
try:
for i, step in enumerate(steps):
if "wait_selector" in step:
selector = step["wait_selector"]
found = _poll_selector(ws, selector, timeout_s=10.0)
if not found:
raise RuntimeError(f"step {i}: timeout esperando selector '{selector}'")
elif "js" in step:
ws.send(json.dumps({
"id": i + 1,
"method": "Runtime.evaluate",
"params": {
"expression": step["js"],
"returnByValue": True,
"awaitPromise": True,
}
}))
raw = ws.recv()
msg = json.loads(raw)
result_obj = msg.get("result", {}).get("result", {})
last_result = result_obj.get("value")
finally:
ws.close()
except Exception as e:
return {"status": "error", "rows_out": 0, "kb_out": 0.0,
"duration_ms": int(time.time() * 1000) - start_ms,
"error": str(e), "sample_rows": []}
# Calcular metricas
rows = last_result if isinstance(last_result, list) else (
[last_result] if last_result is not None else []
)
rows_out = len(rows)
kb_out = len(json.dumps(rows, ensure_ascii=False).encode()) / 1024
sample_rows = rows[:5]
duration_ms = int(time.time() * 1000) - start_ms
# Sink
if sink == "stdout":
print(json.dumps(rows, ensure_ascii=False, indent=2))
elif sink == "json_file":
out_path = output_cfg.get("path", "output.json")
with open(out_path, "w", encoding="utf-8") as f:
json.dump(rows, f, ensure_ascii=False, indent=2)
elif sink == "data_factory.runs" and record_run:
try:
from pipelines.data_factory_record_run import data_factory_record_run
data_factory_record_run(
node_id=recipe.get("name", "unknown"),
function_id="cdp_extract_recipe_py_pipelines",
args={"recipe_path": recipe_path, "debug_port": debug_port},
)
except Exception as e:
# No fatal — el dato ya fue extraido
pass
return {
"status": "ok",
"rows_out": rows_out,
"kb_out": round(kb_out, 2),
"duration_ms": duration_ms,
"error": "",
"sample_rows": sample_rows,
}
@@ -0,0 +1,79 @@
---
name: cdp_get_ax_tree
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def cdp_get_ax_tree(debug_port: int, tab_id: str, depth: int = -1) -> list[dict]"
description: "Conecta a Chrome via CDP WebSocket, habilita Accessibility y devuelve el AX tree completo del tab indicado. Usa websocket-client si está disponible, sino websockets async."
tags: [navegator, cdp, chrome, browser, accessibility, ax-tree]
uses_functions: [trim_ax_tree_py_core, chunk_ax_tree_py_core]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [json, threading, urllib.request, urllib.error, websocket]
params:
- name: debug_port
desc: "Puerto de debug remoto de Chrome (ej. 9222). Lanzar Chrome con --remote-debugging-port=9222."
- name: tab_id
desc: "ID del tab CDP obtenido via GET /json/list (campo 'id'). Usar cdp_list_tabs_go_browser para listarlo."
- name: depth
desc: "Profundidad del árbol a obtener. -1 = completo (default)."
output: "Lista de AXNode en formato CDP. Lista vacía si la página no tiene contenido accesible."
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/cdp_get_ax_tree.py"
---
## Ejemplo
```python
import urllib.request, json
from pipelines.cdp_get_ax_tree import cdp_get_ax_tree
from core.trim_ax_tree import trim_ax_tree
from core.chunk_ax_tree import chunk_ax_tree
# 1. Listar tabs para obtener tab_id
with urllib.request.urlopen("http://127.0.0.1:9222/json/list") as r:
tabs = json.loads(r.read())
tab_id = tabs[0]["id"]
# 2. Obtener AX tree
nodes = cdp_get_ax_tree(debug_port=9222, tab_id=tab_id)
# 3. Reducir y chunkear para LLM
trimmed = trim_ax_tree(nodes)
chunks = chunk_ax_tree(trimmed, max_chars=25000)
print(f"{len(nodes)} nodos → {len(trimmed)} trimmed → {len(chunks)} chunks")
```
## Cuando usarla
Cuando necesitas obtener el árbol de accesibilidad de una página Chrome ya abierta para procesarlo con un LLM o para automatización accesible (más estable que selectores CSS). Requiere Chrome lanzado con `--remote-debugging-port=PORT`.
## Gotchas
- Chrome debe estar corriendo con `--remote-debugging-port=<port>` y `--no-sandbox` en CI.
- En WSL2 usar `--remote-debugging-address=0.0.0.0` y conectar al IP del host Windows, no a 127.0.0.1.
- El tab no puede tener otro debugger adjunto (DevTools abierto) — cierra DevTools antes de llamar.
- `Accessibility.getFullAXTree` puede tardar 2-5s en páginas grandes.
- Timeout total de 15s — aumentar si la página es muy pesada.
- Tests automáticos requieren Chrome corriendo. Para probar manualmente:
```bash
# Lanzar Chrome en WSL2
chrome.exe --remote-debugging-port=9222 --headless=new https://example.com
# Verificar
curl http://127.0.0.1:9222/json/list | python3 -m json.tool
# Ejecutar
python3 -c "
import json, urllib.request
from pipelines.cdp_get_ax_tree import cdp_get_ax_tree
with urllib.request.urlopen('http://127.0.0.1:9222/json/list') as r:
tabs = json.loads(r.read())
nodes = cdp_get_ax_tree(9222, tabs[0]['id'])
print(f'{len(nodes)} nodos')
"
```
@@ -0,0 +1,211 @@
"""Obtiene el AX tree completo de un tab Chrome via CDP WebSocket."""
import json
import threading
import urllib.request
import urllib.error
def cdp_get_ax_tree(
debug_port: int,
tab_id: str,
depth: int = -1,
) -> list[dict]:
"""Conecta al Chrome remoto via WebSocket (CDP) y devuelve el AX tree completo.
Pasos:
1. HTTP GET /json/list para obtener webSocketDebuggerUrl del tab.
2. WebSocket connect (usa websocket-client si disponible, sino implementa
minimal RFC6455 con socket stdlib).
3. Envía Accessibility.enable y espera ack.
4. Envía Accessibility.getFullAXTree con depth=-1.
5. Lee response y devuelve la lista de AXNode.
Args:
debug_port: Puerto de debug remoto de Chrome (ej. 9222).
tab_id: ID del tab obtenido via /json/list (campo "id").
depth: Profundidad del árbol. -1 = completo.
Returns:
Lista de AXNode en formato CDP.
Raises:
RuntimeError: Si no se encuentra el tab, falla la conexión WS,
o la respuesta CDP contiene error.
TimeoutError: Si el servidor no responde en 10 segundos.
"""
# 1. Obtener webSocketDebuggerUrl del tab
ws_url = _get_ws_url(debug_port, tab_id)
# 2. Conectar y obtener nodos
return _cdp_get_ax_nodes(ws_url, depth)
def _get_ws_url(debug_port: int, tab_id: str) -> str:
"""Obtiene el webSocketDebuggerUrl del tab via HTTP /json/list."""
url = f"http://127.0.0.1:{debug_port}/json/list"
try:
with urllib.request.urlopen(url, timeout=10) as resp:
tabs = json.loads(resp.read().decode())
except urllib.error.URLError as e:
raise RuntimeError(
f"No se pudo conectar a Chrome en puerto {debug_port}: {e}"
) from e
for tab in tabs:
if tab.get("id") == tab_id:
ws_url = tab.get("webSocketDebuggerUrl")
if not ws_url:
raise RuntimeError(
f"Tab {tab_id} no tiene webSocketDebuggerUrl "
"(puede estar adjunto a otro debugger)"
)
return ws_url
raise RuntimeError(
f"Tab {tab_id} no encontrado. Tabs disponibles: "
f"{[t.get('id') for t in tabs]}"
)
def _cdp_get_ax_nodes(ws_url: str, depth: int) -> list[dict]:
"""Conecta via WebSocket y ejecuta la secuencia CDP para obtener AX tree."""
try:
import websocket # websocket-client
return _cdp_via_websocket_client(ws_url, depth)
except ImportError:
pass
# Fallback: websockets (async) via threading
try:
import websockets # noqa: F401
return _cdp_via_websockets(ws_url, depth)
except ImportError:
pass
raise RuntimeError(
"Ninguna librería WebSocket disponible. "
"Instala websocket-client: pip install websocket-client"
)
def _cdp_via_websocket_client(ws_url: str, depth: int) -> list[dict]:
"""Implementación usando websocket-client (síncrono)."""
import websocket
results: dict = {}
error_container: list = []
def on_message(ws, message):
try:
msg = json.loads(message)
msg_id = msg.get("id")
if msg_id in (1, 2):
results[msg_id] = msg
if msg_id == 2 or "error" in msg:
ws.close()
except Exception as e:
error_container.append(e)
ws.close()
def on_error(ws, error):
error_container.append(RuntimeError(f"WebSocket error: {error}"))
def on_open(ws):
# Paso 3: habilitar Accessibility
ws.send(json.dumps({"id": 1, "method": "Accessibility.enable"}))
# Paso 4: obtener AX tree completo
params: dict = {}
if depth != -1:
params["depth"] = depth
ws.send(json.dumps({
"id": 2,
"method": "Accessibility.getFullAXTree",
"params": params,
}))
ws_app = websocket.WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
)
t = threading.Thread(
target=lambda: ws_app.run_forever(ping_timeout=10),
daemon=True,
)
t.start()
t.join(timeout=15)
if error_container:
raise error_container[0]
if 2 not in results:
raise TimeoutError(
"No se recibió respuesta de Accessibility.getFullAXTree en 15s"
)
resp = results[2]
if "error" in resp:
raise RuntimeError(f"CDP error: {resp['error']}")
result_data = resp.get("result", {})
nodes = result_data.get("nodes", [])
return nodes
def _cdp_via_websockets(ws_url: str, depth: int) -> list[dict]:
"""Fallback usando websockets (async), ejecutado en thread con asyncio."""
import asyncio
async def _run():
import websockets
async with websockets.connect(ws_url, open_timeout=10) as ws:
# Habilitar Accessibility
await ws.send(json.dumps({"id": 1, "method": "Accessibility.enable"}))
await ws.recv() # ack
# Obtener AX tree
params: dict = {}
if depth != -1:
params["depth"] = depth
await ws.send(json.dumps({
"id": 2,
"method": "Accessibility.getFullAXTree",
"params": params,
}))
# Leer hasta recibir respuesta con id=2
import asyncio as _asyncio
async with _asyncio.timeout(10):
while True:
raw = await ws.recv()
msg = json.loads(raw)
if msg.get("id") == 2:
if "error" in msg:
raise RuntimeError(f"CDP error: {msg['error']}")
return msg.get("result", {}).get("nodes", [])
result_holder: list = []
error_holder: list = []
def _thread_run():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
nodes = loop.run_until_complete(_run())
result_holder.append(nodes)
except Exception as e:
error_holder.append(e)
t = threading.Thread(target=_thread_run, daemon=True)
t.start()
t.join(timeout=15)
if error_holder:
raise error_holder[0]
if not result_holder:
raise TimeoutError("No se recibió respuesta en 15s")
return result_holder[0]
@@ -0,0 +1,51 @@
---
name: cdp_open_url_and_wait
kind: function
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def cdp_open_url_and_wait(debug_port: int, url: str, timeout_s: int = 30) -> str"
description: "Crea tab nuevo en Chrome remoto via CDP, navega a URL y espera Page.loadEventFired. Devuelve tab_id."
tags: [navegator, cdp, chrome, browser]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [json, threading, urllib.request, urllib.parse, websocket]
params:
- name: debug_port
desc: "Puerto de depuracion remota de Chrome (tipicamente 9222)."
- name: url
desc: "URL completa a la que navegar en el tab nuevo."
- name: timeout_s
desc: "Segundos maximos esperando Page.loadEventFired. Default 30."
output: "tab_id (str) del tab recien creado en Chrome."
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/cdp_open_url_and_wait.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
from pipelines.cdp_open_url_and_wait import cdp_open_url_and_wait
tab_id = cdp_open_url_and_wait(9222, "https://example.com", timeout_s=15)
print(tab_id) # "B1C2D3E4-..."
```
## Cuando usarla
Cuando necesites abrir una URL nueva en Chrome remoto y asegurarte de que la pagina cargo antes de interactuar con ella via CDP. Paso previo a cualquier extraccion de AX tree o ejecucion de JS.
## Gotchas
- Chrome debe estar corriendo con `--remote-debugging-port=<debug_port>` y `--remote-allow-origins=*`.
- PUT a `/json/new?<url>` crea el tab; si Chrome no acepta PUT responde 404 (version antigua).
- `Page.loadEventFired` puede no dispararse en SPAs con routing sin recarga — usar `timeout_s` conservador o esperar selector via `cdp_extract_recipe`.
- `websocket-client` debe estar instalado en el venv.
@@ -0,0 +1,79 @@
"""Abre tab nuevo en Chrome remoto, navega a URL, espera Page.loadEventFired."""
import json
import threading
import urllib.request
import urllib.parse
import websocket
def cdp_open_url_and_wait(
debug_port: int,
url: str,
timeout_s: int = 30,
) -> str:
"""Crea tab nuevo en Chrome remoto, navega a url, espera Page.loadEventFired.
Args:
debug_port: Puerto de depuracion remota de Chrome (ej. 9222).
url: URL a la que navegar.
timeout_s: Timeout total en segundos para esperar el load event.
Returns:
tab_id (string) del tab recien creado.
Raises:
RuntimeError: Si Chrome no responde, la navegacion falla o se agota timeout.
"""
encoded = urllib.parse.quote(url, safe=":/?#[]@!$&'()*+,;=%")
new_tab_url = f"http://127.0.0.1:{debug_port}/json/new?{encoded}"
req = urllib.request.Request(new_tab_url, method="PUT")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
tab_info = json.loads(resp.read().decode())
except Exception as e:
raise RuntimeError(f"cdp_open_url_and_wait: no se pudo crear tab en port {debug_port}: {e}") from e
tab_id = tab_info.get("id", "")
ws_url = tab_info.get("webSocketDebuggerUrl", "")
if not ws_url:
raise RuntimeError(f"cdp_open_url_and_wait: tab sin webSocketDebuggerUrl: {tab_info}")
load_event = threading.Event()
errors = []
def on_message(ws_app, message):
try:
msg = json.loads(message)
if msg.get("method") == "Page.loadEventFired":
load_event.set()
except Exception:
pass
def on_error(ws_app, error):
errors.append(str(error))
load_event.set()
def on_open(ws_app):
ws_app.send(json.dumps({"id": 1, "method": "Page.enable", "params": {}}))
ws = websocket.WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
)
t = threading.Thread(target=ws.run_forever, daemon=True)
t.start()
fired = load_event.wait(timeout=timeout_s)
ws.close()
if errors:
raise RuntimeError(f"cdp_open_url_and_wait: WS error: {errors[0]}")
if not fired:
raise RuntimeError(f"cdp_open_url_and_wait: timeout ({timeout_s}s) esperando Page.loadEventFired para {url}")
return tab_id
@@ -0,0 +1,62 @@
---
name: data_factory_record_run
kind: function
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def data_factory_record_run(node_id, function_id, args=None, db_path=None, trigger='manual') -> dict"
description: "Wrappea `fn run <function_id>` capturando rows/kb/duration y persiste el resultado en data_factory.db.runs. Requiere que el node_id exista previamente en nodes."
tags: [data-pipeline, factory, record-run, pipelines, subprocess, registry]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
params:
- name: node_id
desc: "ID del nodo en data_factory.db.nodes que es propietario de esta ejecucion. FK enforced — debe existir antes de llamar."
- name: function_id
desc: "ID de funcion del registry a ejecutar (se pasa a `fn run`). Ejemplo: 'bq_query_py_infra'."
- name: args
desc: "Lista de args CLI adicionales que se reenvian a `fn run` despues del function_id. Default None = sin args extra."
- name: db_path
desc: "Ruta absoluta a data_factory.db. Default: ${FN_REGISTRY_ROOT}/apps/data_factory/data_factory.db."
- name: trigger
desc: "Origen de la ejecucion: 'manual'|'cron'|'dag'|'api'. Default 'manual'."
output: "dict con claves: run_id (str), status ('success'|'failed'), rows_out (int), kb_out (int), duration_ms (int), stdout (str), stderr (str)."
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/data_factory_record_run.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.environ["FN_REGISTRY_ROOT"] + "/python/functions/pipelines")
from data_factory_record_run import data_factory_record_run
result = data_factory_record_run(
node_id="bq_users_extractor",
function_id="bq_query_py_infra",
args=["--project", "my-gcp", "--sql", "SELECT * FROM users LIMIT 1000"],
)
print(f"run {result['run_id']}: {result['rows_out']} rows in {result['duration_ms']}ms")
# run a3f1c8e2d7b04e91: 1000 rows in 4230ms
```
## Cuando usarla
Cuando un nodo del data_factory deba ejecutar una funcion del registry y dejar trazabilidad completa (duration, rows, error) en `data_factory.db`. Usa este wrapper en lugar de llamar `fn run` directamente desde el DAG engine o desde scripts de ingesta.
## Gotchas
- `FN_REGISTRY_ROOT` debe estar en el entorno — sin ella la funcion lanza `RuntimeError` inmediato.
- El `node_id` debe existir en `nodes` antes del INSERT (FK con `ON DELETE CASCADE`). Si no existe, la funcion devuelve error claro en vez de silencio.
- `rows_out` se parsea buscando patron `^(rows|extracted|written|count)[:= ]+(\d+)` en stdout. Si la funcion destino no imprime nada con ese patron, `rows_out=0` — esto es correcto, no un bug.
- El binario `fn` se busca en `${FN_REGISTRY_ROOT}/fn`. Si no esta compilado, compilar con `CGO_ENABLED=1 go build -tags fts5 -o fn ./cmd/fn/` desde la raiz del registry.
- `db_path` apunta a la BD de la app data_factory, NO a `registry.db`.
- Solo stdlib Python — sin pandas, polars ni dependencias externas.
@@ -0,0 +1,152 @@
"""data_factory_record_run — wraps `fn run <function_id>` and persists metrics in data_factory.db."""
import os
import re
import sqlite3
import subprocess
import uuid
from datetime import datetime, timezone
from pathlib import Path
def _now_iso8601() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
def _elapsed_ms(start: float) -> int:
import time
return int((time.monotonic() - start) * 1000)
def _parse_rows_out(stdout: str) -> int:
"""Parse first line matching rows/extracted/written/count[:= ]+N (case insensitive)."""
pattern = re.compile(r'^(?:rows|extracted|written|count)[:=\s]+(\d+)', re.IGNORECASE | re.MULTILINE)
m = pattern.search(stdout)
return int(m.group(1)) if m else 0
def _kb_out(stdout: str) -> int:
return round(len(stdout.encode("utf-8")) / 1024)
def data_factory_record_run(
node_id: str,
function_id: str,
args: list | None = None,
db_path: str | None = None,
trigger: str = "manual",
) -> dict:
"""Wrap `fn run <function_id>` and record execution metrics in data_factory.db.
Args:
node_id: ID of the node in data_factory.db.nodes that owns this run.
function_id: Registry function ID to execute (passed to `fn run`).
args: Extra CLI args forwarded to `fn run` after function_id.
db_path: Absolute path to data_factory.db. Defaults to
${FN_REGISTRY_ROOT}/apps/data_factory/data_factory.db.
trigger: Origin of the run — 'manual'|'cron'|'dag'|'api'.
Returns:
dict with keys: run_id, status, rows_out, kb_out, duration_ms, stdout, stderr.
"""
import time
# --- resolve FN_REGISTRY_ROOT ---
registry_root = os.environ.get("FN_REGISTRY_ROOT", "").strip()
if not registry_root:
raise RuntimeError(
"FN_REGISTRY_ROOT env var is not set. "
"Export it before calling data_factory_record_run."
)
registry_root = Path(registry_root)
# --- resolve db_path ---
if db_path is None:
db_path = registry_root / "apps" / "data_factory" / "data_factory.db"
db_path = Path(db_path)
if not db_path.exists():
raise FileNotFoundError(f"data_factory.db not found at {db_path}")
# --- resolve fn binary ---
fn_bin = registry_root / "fn"
if not fn_bin.exists():
raise FileNotFoundError(
f"fn binary not found at {fn_bin}. "
"Run `CGO_ENABLED=1 go build -tags fts5 -o fn ./cmd/fn/` in FN_REGISTRY_ROOT."
)
# --- generate run_id ---
run_id = uuid.uuid4().hex[:16]
# --- INSERT running record ---
started_at = _now_iso8601()
try:
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA foreign_keys = ON")
try:
conn.execute(
"INSERT INTO runs(id, node_id, started_at, status, trigger) VALUES (?,?,?,?,?)",
(run_id, node_id, started_at, "running", trigger),
)
conn.commit()
except sqlite3.IntegrityError as e:
conn.close()
raise RuntimeError(
f"FK violation — node_id '{node_id}' does not exist in nodes table. "
f"Insert the node first. SQLite error: {e}"
)
except sqlite3.Error as e:
raise RuntimeError(f"Failed to open/write data_factory.db at {db_path}: {e}")
# --- run fn ---
cmd = [str(fn_bin), "run", function_id] + (args or [])
t0 = time.monotonic()
try:
result = subprocess.run(cmd, capture_output=True, text=True, cwd=str(registry_root))
except Exception as e:
duration_ms = _elapsed_ms(t0)
finished_at = _now_iso8601()
conn.execute(
"UPDATE runs SET finished_at=?, status=?, duration_ms=?, error=? WHERE id=?",
(finished_at, "failed", duration_ms, str(e)[:2000], run_id),
)
conn.commit()
conn.close()
return {
"run_id": run_id,
"status": "failed",
"rows_out": 0,
"kb_out": 0,
"duration_ms": duration_ms,
"stdout": "",
"stderr": str(e),
}
duration_ms = _elapsed_ms(t0)
finished_at = _now_iso8601()
stdout = result.stdout or ""
stderr = result.stderr or ""
status = "success" if result.returncode == 0 else "failed"
rows_out = _parse_rows_out(stdout)
kb = _kb_out(stdout)
error_text = stderr[:2000] if status == "failed" else ""
conn.execute(
"""UPDATE runs
SET finished_at=?, status=?, rows_out=?, kb_out=?,
duration_ms=?, error=?
WHERE id=?""",
(finished_at, status, rows_out, kb, duration_ms, error_text, run_id),
)
conn.commit()
conn.close()
return {
"run_id": run_id,
"status": status,
"rows_out": rows_out,
"kb_out": kb,
"duration_ms": duration_ms,
"stdout": stdout,
"stderr": stderr,
}