Files
egutierrez a03675113a chore: auto-commit (286 archivos)
- .claude/agents/fn-orquestador/SKILL.md
- .claude/commands/fn_claude.md
- .claude/rules/INDEX.md
- .claude/rules/cpp_apps.md
- .claude/rules/ids_naming.md
- CHANGELOG.md
- apps/dag_engine/README.md
- apps/dag_engine/api.go
- apps/dag_engine/dags_migrated/example.yaml
- apps/dag_engine/dags_migrated/example_lineage_tracking.yaml
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:33:22 +02:00

212 lines
6.3 KiB
Python

"""Obtiene el AX tree completo de un tab Chrome via CDP WebSocket."""
import json
import threading
import urllib.request
import urllib.error
def cdp_get_ax_tree(
debug_port: int,
tab_id: str,
depth: int = -1,
) -> list[dict]:
"""Conecta al Chrome remoto via WebSocket (CDP) y devuelve el AX tree completo.
Pasos:
1. HTTP GET /json/list para obtener webSocketDebuggerUrl del tab.
2. WebSocket connect (usa websocket-client si disponible, sino implementa
minimal RFC6455 con socket stdlib).
3. Envía Accessibility.enable y espera ack.
4. Envía Accessibility.getFullAXTree con depth=-1.
5. Lee response y devuelve la lista de AXNode.
Args:
debug_port: Puerto de debug remoto de Chrome (ej. 9222).
tab_id: ID del tab obtenido via /json/list (campo "id").
depth: Profundidad del árbol. -1 = completo.
Returns:
Lista de AXNode en formato CDP.
Raises:
RuntimeError: Si no se encuentra el tab, falla la conexión WS,
o la respuesta CDP contiene error.
TimeoutError: Si el servidor no responde en 10 segundos.
"""
# 1. Obtener webSocketDebuggerUrl del tab
ws_url = _get_ws_url(debug_port, tab_id)
# 2. Conectar y obtener nodos
return _cdp_get_ax_nodes(ws_url, depth)
def _get_ws_url(debug_port: int, tab_id: str) -> str:
"""Obtiene el webSocketDebuggerUrl del tab via HTTP /json/list."""
url = f"http://127.0.0.1:{debug_port}/json/list"
try:
with urllib.request.urlopen(url, timeout=10) as resp:
tabs = json.loads(resp.read().decode())
except urllib.error.URLError as e:
raise RuntimeError(
f"No se pudo conectar a Chrome en puerto {debug_port}: {e}"
) from e
for tab in tabs:
if tab.get("id") == tab_id:
ws_url = tab.get("webSocketDebuggerUrl")
if not ws_url:
raise RuntimeError(
f"Tab {tab_id} no tiene webSocketDebuggerUrl "
"(puede estar adjunto a otro debugger)"
)
return ws_url
raise RuntimeError(
f"Tab {tab_id} no encontrado. Tabs disponibles: "
f"{[t.get('id') for t in tabs]}"
)
def _cdp_get_ax_nodes(ws_url: str, depth: int) -> list[dict]:
"""Conecta via WebSocket y ejecuta la secuencia CDP para obtener AX tree."""
try:
import websocket # websocket-client
return _cdp_via_websocket_client(ws_url, depth)
except ImportError:
pass
# Fallback: websockets (async) via threading
try:
import websockets # noqa: F401
return _cdp_via_websockets(ws_url, depth)
except ImportError:
pass
raise RuntimeError(
"Ninguna librería WebSocket disponible. "
"Instala websocket-client: pip install websocket-client"
)
def _cdp_via_websocket_client(ws_url: str, depth: int) -> list[dict]:
"""Implementación usando websocket-client (síncrono)."""
import websocket
results: dict = {}
error_container: list = []
def on_message(ws, message):
try:
msg = json.loads(message)
msg_id = msg.get("id")
if msg_id in (1, 2):
results[msg_id] = msg
if msg_id == 2 or "error" in msg:
ws.close()
except Exception as e:
error_container.append(e)
ws.close()
def on_error(ws, error):
error_container.append(RuntimeError(f"WebSocket error: {error}"))
def on_open(ws):
# Paso 3: habilitar Accessibility
ws.send(json.dumps({"id": 1, "method": "Accessibility.enable"}))
# Paso 4: obtener AX tree completo
params: dict = {}
if depth != -1:
params["depth"] = depth
ws.send(json.dumps({
"id": 2,
"method": "Accessibility.getFullAXTree",
"params": params,
}))
ws_app = websocket.WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
)
t = threading.Thread(
target=lambda: ws_app.run_forever(ping_timeout=10),
daemon=True,
)
t.start()
t.join(timeout=15)
if error_container:
raise error_container[0]
if 2 not in results:
raise TimeoutError(
"No se recibió respuesta de Accessibility.getFullAXTree en 15s"
)
resp = results[2]
if "error" in resp:
raise RuntimeError(f"CDP error: {resp['error']}")
result_data = resp.get("result", {})
nodes = result_data.get("nodes", [])
return nodes
def _cdp_via_websockets(ws_url: str, depth: int) -> list[dict]:
"""Fallback usando websockets (async), ejecutado en thread con asyncio."""
import asyncio
async def _run():
import websockets
async with websockets.connect(ws_url, open_timeout=10) as ws:
# Habilitar Accessibility
await ws.send(json.dumps({"id": 1, "method": "Accessibility.enable"}))
await ws.recv() # ack
# Obtener AX tree
params: dict = {}
if depth != -1:
params["depth"] = depth
await ws.send(json.dumps({
"id": 2,
"method": "Accessibility.getFullAXTree",
"params": params,
}))
# Leer hasta recibir respuesta con id=2
import asyncio as _asyncio
async with _asyncio.timeout(10):
while True:
raw = await ws.recv()
msg = json.loads(raw)
if msg.get("id") == 2:
if "error" in msg:
raise RuntimeError(f"CDP error: {msg['error']}")
return msg.get("result", {}).get("nodes", [])
result_holder: list = []
error_holder: list = []
def _thread_run():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
nodes = loop.run_until_complete(_run())
result_holder.append(nodes)
except Exception as e:
error_holder.append(e)
t = threading.Thread(target=_thread_run, daemon=True)
t.start()
t.join(timeout=15)
if error_holder:
raise error_holder[0]
if not result_holder:
raise TimeoutError("No se recibió respuesta en 15s")
return result_holder[0]