feat(browser): auto-commit con 60 cambios

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 11:42:31 +02:00
parent 37aacfcfa9
commit 8742cb25be
71 changed files with 5660 additions and 192 deletions
+92
View File
@@ -0,0 +1,92 @@
---
name: cdp_click_xy
kind: function
lang: py
domain: browser
version: "1.0.0"
purity: impure
signature: "def cdp_click_xy(x: int, y: int, *, port: int = 9222, target_url_substr: str = '', move_first: bool = True, timeout_s: float = 10.0) -> dict"
description: "Hace un click izquierdo de raton REAL en coordenadas (x, y) del viewport de una pestana de un Chrome con remote debugging, via CDP Input.dispatchMouseEvent (mouseMoved opcional + mousePressed + mouseReleased). Primitiva de input CDP reutilizable: necesaria cuando element.click() de JavaScript NO dispara los handlers de React de SPAs (WhatsApp Web): abrir un chat de la lista o un resultado de busqueda requiere un click de raton sintetico real. El caller resuelve las coordenadas con cdp_eval (getBoundingClientRect -> centro) y las pasa aqui."
tags: [cdp, browser, automation, python, navegator]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["json", "time", "urllib.request", "websocket"]
params_schema:
params:
- name: x
desc: "Coordenada X en CSS px del viewport donde hacer click. Normalmente el centro del elemento (getBoundingClientRect via cdp_eval)."
- name: y
desc: "Coordenada Y en CSS px del viewport donde hacer click. Normalmente el centro del elemento (getBoundingClientRect via cdp_eval)."
- name: port
desc: "Puerto de remote debugging de Chrome. Default 9222."
- name: target_url_substr
desc: "Substring que debe contener la URL del target (pestana). Si vacio, usa el primer target de tipo 'page'."
- name: move_first
desc: "Si True, emite un mouseMoved a (x, y) antes del click para que la SPA registre el hover. Default True."
- name: timeout_s
desc: "Timeout en segundos para la conexion WebSocket. Default 10.0."
output: "dict {ok: bool, error: str, x: int, y: int}. ok=True si los eventos de raton (mouseMoved opcional, mousePressed, mouseReleased) se emitieron sin error; x/y son eco de los argumentos. Nunca lanza: errores de red/conexion/transport se devuelven en 'error' con ok=False."
tested: true
tests: ["test_golden_click_emite_movido_pressed_released_left", "test_edge_move_first_false_omite_mousemoved", "test_error_create_connection_lanza_ok_false"]
test_file_path: "python/functions/browser/cdp_click_xy_test.py"
file_path: "python/functions/browser/cdp_click_xy.py"
---
## Ejemplo
```python
import sys, os, json
sys.path.insert(0, os.path.join("python", "functions"))
from browser.cdp_eval import cdp_eval
from browser.cdp_click_xy import cdp_click_xy
# Requiere un Chrome lanzado con --remote-debugging-port=9222
# y una pestana de WhatsApp Web abierta.
# Localizar el row de un chat por nombre exacto y abrirlo con click REAL.
r = cdp_eval(r'''(() => {
const row = [...document.querySelectorAll('#side [role="row"]')]
.find(x => /^NOTAS WASAP\b/.test(x.innerText.replace(/\s+/g,' ').trim()));
if(!row) return null;
const b = row.getBoundingClientRect();
return JSON.stringify({x: Math.round(b.x+b.width/2), y: Math.round(b.y+b.height/2)});
})()''', target_url_substr="whatsapp")
c = json.loads(r["value"])
res = cdp_click_xy(c["x"], c["y"], target_url_substr="whatsapp") # abre el chat
print(res["ok"], res["error"])
```
O directo por CLI: `python3 python/functions/browser/cdp_click_xy.py 100 200 "whatsapp"`.
## Cuando usarla
Cuando necesites **clickar un elemento** y `element.click()` de JavaScript NO dispara
sus handlers (SPAs React como WhatsApp Web): **abrir un chat de la lista**, abrir un
**resultado de busqueda**, pulsar un boton que React renderiza con listeners propios.
Resuelve primero las coordenadas del elemento con `cdp_eval_py_browser`
(`getBoundingClientRect` -> centro) y pasa `x, y` aqui. Es la primitiva de input de
raton sobre la que se construyen funciones `whatsapp_*_py_browser` y cualquier script
que opere una pestana existente via CDP. Para teclas usa `cdp_press_key_py_browser`;
para escribir texto, `cdp_type_chars_py_browser`.
## Gotchas
- **Coordenadas en CSS px del viewport**: `getBoundingClientRect()` ya las devuelve en
ese sistema, por eso encaja directo. No uses `pageX/pageY` ni coords absolutas de
documento.
- **El elemento debe estar VISIBLE en el viewport**: si esta fuera de pantalla por
scroll, las coords del rect son invalidas (negativas o fuera de rango) y el click cae
en el lugar equivocado. Haz scroll al elemento primero (via `cdp_eval` con
`scrollIntoView`) y vuelve a leer el rect.
- Es un **click izquierdo simple** (clickCount=1, button=left). No hace doble click,
click derecho ni drag.
- `move_first=True` (default) emite un `mouseMoved` previo para que la SPA registre el
hover; algunas UIs solo muestran/activan controles tras hover. Ponlo en False si no
lo necesitas.
- Requiere un Chrome lanzado con `--remote-debugging-port=9222` (o el puerto que pases).
Sin remote debugging, `GET /json` falla y devuelve `ok=False`.
- Nunca lanza: errores de red, conexion WS o transport se reportan en el campo `error`
con `ok=False`.
+166
View File
@@ -0,0 +1,166 @@
"""Hace un click de raton real en coordenadas (x, y) de una pestana de Chrome via CDP.
Primitiva de input CDP: localiza un target (pestana) por substring de su URL, abre el
WebSocket de depuracion y emite los eventos `Input.dispatchMouseEvent` que componen un
click izquierdo sintetico real (mousePressed + mouseReleased), opcionalmente precedido
de un mouseMoved para que la SPA registre el hover.
Necesario porque `element.click()` de JavaScript NO dispara los handlers de React de
muchas SPAs (WhatsApp Web entre ellas): abrir un chat de la lista o un resultado de
busqueda requiere un click de raton sintetico real. El caller resuelve las coordenadas
del elemento con `cdp_eval_py_browser` (getBoundingClientRect -> centro) y las pasa aqui.
"""
import json
import time
import urllib.request
import websocket
def cdp_click_xy(
x: int,
y: int,
*,
port: int = 9222,
target_url_substr: str = "",
move_first: bool = True,
timeout_s: float = 10.0,
) -> dict:
"""Hace un click izquierdo real en (x, y) del viewport de una pestana de Chrome.
Localiza el target `page` por substring de URL, abre el WebSocket CDP y emite un
click izquierdo simple via `Input.dispatchMouseEvent`: si `move_first`, primero un
`mouseMoved` a (x, y) (para que la SPA registre hover), luego `mousePressed` y
`mouseReleased` con `button=left`, `buttons=1`, `clickCount=1`. Las coordenadas son
CSS px del viewport; el caller las resuelve normalmente con `cdp_eval_py_browser`
(getBoundingClientRect -> centro).
Args:
x: Coordenada X en CSS px del viewport donde hacer click.
y: Coordenada Y en CSS px del viewport donde hacer click.
port: Puerto de remote debugging de Chrome. Default 9222.
target_url_substr: Substring que debe contener la URL del target (pestana).
Si "", usa el primer target de tipo "page".
move_first: Si True, emite un mouseMoved a (x, y) antes del click para que la
SPA registre el hover. Default True.
timeout_s: Timeout (segundos) para la conexion WebSocket. Default 10.0.
Returns:
dict con claves:
ok: bool — True si los eventos de raton se emitieron sin error.
error: str — mensaje de error (vacio si ok).
x: int — coordenada X usada (eco del argumento).
y: int — coordenada Y usada (eco del argumento).
Nunca lanza: errores de red/conexion/transport se devuelven en "error" con
ok=False.
"""
# 1. Listar targets via HTTP.
try:
with urllib.request.urlopen(
f"http://127.0.0.1:{port}/json", timeout=5
) as resp:
targets = json.loads(resp.read().decode())
except Exception as e: # noqa: BLE001 — red/HTTP/JSON, no relanzar
return {"ok": False, "error": str(e), "x": x, "y": y}
# 2. Elegir el primer target type=="page" cuya url contenga el substring.
chosen = None
for t in targets:
if t.get("type") != "page":
continue
url = t.get("url", "")
if target_url_substr == "" or target_url_substr in url:
chosen = t
break
if chosen is None:
return {
"ok": False,
"error": f"no target matching {target_url_substr}",
"x": x,
"y": y,
}
ws_url = chosen.get("webSocketDebuggerUrl", "")
# 3. Abrir WS.
try:
ws = websocket.create_connection(ws_url, timeout=timeout_s)
except Exception as e: # noqa: BLE001 — conexion WS
return {"ok": False, "error": str(e), "x": x, "y": y}
try:
msg_id = 1
# 3b. Hover previo opcional: ayuda a que la SPA registre el mouseover.
if move_first:
ws.send(json.dumps({
"id": msg_id,
"method": "Input.dispatchMouseEvent",
"params": {"type": "mouseMoved", "x": x, "y": y},
}))
msg_id += 1
time.sleep(0.03)
# 4. Click izquierdo real: mousePressed + mouseReleased.
press_id = msg_id
ws.send(json.dumps({
"id": press_id,
"method": "Input.dispatchMouseEvent",
"params": {
"type": "mousePressed",
"x": x,
"y": y,
"button": "left",
"buttons": 1,
"clickCount": 1,
},
}))
time.sleep(0.04)
release_id = msg_id + 1
ws.send(json.dumps({
"id": release_id,
"method": "Input.dispatchMouseEvent",
"params": {
"type": "mouseReleased",
"x": x,
"y": y,
"button": "left",
"buttons": 1,
"clickCount": 1,
},
}))
# Drenar respuestas hasta ver el id del release (o agotar el stream).
# Ignoramos eventos del server y frames no-JSON.
while True:
raw = ws.recv()
if not raw:
break
try:
parsed = json.loads(raw)
except Exception: # noqa: BLE001 — frame no-JSON, ignorar
continue
if parsed.get("id") == release_id:
break
except Exception as e: # noqa: BLE001 — fallo de transport durante send/recv
return {"ok": False, "error": str(e), "x": x, "y": y}
finally:
try:
ws.close()
except Exception: # noqa: BLE001 — cierre best-effort
pass
return {"ok": True, "error": "", "x": x, "y": y}
if __name__ == "__main__":
import sys
arg_x = int(sys.argv[1]) if len(sys.argv) > 1 else 0
arg_y = int(sys.argv[2]) if len(sys.argv) > 2 else 0
substr = sys.argv[3] if len(sys.argv) > 3 else ""
out = cdp_click_xy(arg_x, arg_y, port=9222, target_url_substr=substr)
print(json.dumps(out, ensure_ascii=False, indent=2))
@@ -0,0 +1,143 @@
"""Tests para cdp_click_xy — mockean urlopen + create_connection.
Mockean la capa de red de CDP: urllib.request.urlopen (lista de targets) y
websocket.create_connection (un fake que captura los mensajes enviados y devuelve
las respuestas CDP con el id correspondiente).
"""
import json
import os
import sys
from contextlib import contextmanager
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from browser import cdp_click_xy as mod # noqa: E402
from browser.cdp_click_xy import cdp_click_xy # noqa: E402
class _FakeResp:
"""Context manager que imita la respuesta de urllib.request.urlopen."""
def __init__(self, payload: list):
self._payload = payload
def __enter__(self):
return self
def __exit__(self, *exc):
return False
def read(self):
return json.dumps(self._payload).encode()
class _FakeWS:
"""WebSocket falso: captura los mensajes enviados y responde por id."""
def __init__(self):
self.sent = []
self._inbox = []
self.closed = False
def send(self, raw: str):
msg = json.loads(raw)
self.sent.append(msg)
# Encola una respuesta CDP vacia con el mismo id (como Chrome devuelve).
self._inbox.append(json.dumps({"id": msg["id"], "result": {}}))
def recv(self):
if self._inbox:
return self._inbox.pop(0)
return ""
def close(self):
self.closed = True
@contextmanager
def _patch(targets, ws_obj=None, urlopen_exc=None, create_conn_exc=None):
"""Parchea urlopen y create_connection del modulo. Restaura al salir."""
orig_urlopen = mod.urllib.request.urlopen
orig_create = mod.websocket.create_connection
def fake_urlopen(url, timeout=5):
if urlopen_exc is not None:
raise urlopen_exc
return _FakeResp(targets)
def fake_create(ws_url, timeout=10):
if create_conn_exc is not None:
raise create_conn_exc
return ws_obj
mod.urllib.request.urlopen = fake_urlopen
mod.websocket.create_connection = fake_create
try:
yield
finally:
mod.urllib.request.urlopen = orig_urlopen
mod.websocket.create_connection = orig_create
_TARGETS = [
{"type": "page", "url": "https://web.whatsapp.com/", "webSocketDebuggerUrl": "ws://x/1"},
]
def test_golden_click_emite_movido_pressed_released_left():
"""Click en (100, 200) emite mouseMoved + mousePressed + mouseReleased correctos."""
ws = _FakeWS()
with _patch(_TARGETS, ws_obj=ws):
res = cdp_click_xy(100, 200, target_url_substr="whatsapp")
assert res == {"ok": True, "error": "", "x": 100, "y": 200}
assert len(ws.sent) == 3
moved, pressed, released = ws.sent
assert moved["method"] == "Input.dispatchMouseEvent"
assert moved["params"]["type"] == "mouseMoved"
assert moved["params"]["x"] == 100
assert moved["params"]["y"] == 200
assert pressed["params"]["type"] == "mousePressed"
assert pressed["params"]["x"] == 100
assert pressed["params"]["y"] == 200
assert pressed["params"]["button"] == "left"
assert pressed["params"]["buttons"] == 1
assert pressed["params"]["clickCount"] == 1
assert released["params"]["type"] == "mouseReleased"
assert released["params"]["x"] == 100
assert released["params"]["y"] == 200
assert released["params"]["button"] == "left"
assert released["params"]["buttons"] == 1
assert released["params"]["clickCount"] == 1
assert ws.closed is True
def test_edge_move_first_false_omite_mousemoved():
"""Con move_first=False no se emite el mouseMoved previo, solo press + release."""
ws = _FakeWS()
with _patch(_TARGETS, ws_obj=ws):
res = cdp_click_xy(50, 60, target_url_substr="whatsapp", move_first=False)
assert res["ok"] is True
assert len(ws.sent) == 2
types = [m["params"]["type"] for m in ws.sent]
assert types == ["mousePressed", "mouseReleased"]
assert all(m["params"]["type"] != "mouseMoved" for m in ws.sent)
def test_error_create_connection_lanza_ok_false():
"""Si create_connection lanza, se captura y devuelve ok=False sin relanzar."""
with _patch(_TARGETS, create_conn_exc=ConnectionRefusedError("ws down")):
res = cdp_click_xy(10, 20, target_url_substr="whatsapp")
assert res["ok"] is False
assert "ws down" in res["error"]
assert res["x"] == 10
assert res["y"] == 20
+65
View File
@@ -0,0 +1,65 @@
---
name: cdp_eval
kind: function
lang: py
domain: browser
version: "1.0.0"
purity: impure
signature: "def cdp_eval(expression: str, *, port: int = 9222, target_url_substr: str = '', await_promise: bool = False, timeout_s: float = 10.0) -> dict"
description: "Evalua una expresion JavaScript en una pestana de un Chrome con remote debugging, eligiendo el target por substring de su URL. Primitiva de transport CDP reutilizable para operar el navegador diario por codigo sin abrir ventana nueva."
tags: [cdp, browser, automation, python, navegator]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["json", "urllib.request", "websocket"]
params_schema:
params:
- name: expression
desc: "Expresion JavaScript a evaluar en el contexto de la pagina (ej. 'document.title', 'document.querySelector(\".x\").click()')."
- name: port
desc: "Puerto de remote debugging de Chrome. Default 9222."
- name: target_url_substr
desc: "Substring que debe contener la URL del target (pestana). Si vacio, usa el primer target de tipo 'page'."
- name: await_promise
desc: "Si True, espera a que la expresion resuelva una Promise antes de devolver el valor (awaitPromise de CDP). Default False."
- name: timeout_s
desc: "Timeout en segundos para la conexion WebSocket. Default 10.0."
output: "dict {ok: bool, value: <valor Python serializable o None>, error: str, target_url: str}. ok=True si la evaluacion produjo valor sin excepcion. Nunca lanza: errores de red/conexion/excepcion JS se devuelven en 'error'."
tested: true
tests: ["test_golden_selecciona_target_por_substr_y_devuelve_value", "test_edge_substr_sin_match_devuelve_ok_false", "test_error_urlopen_lanza_devuelve_ok_false"]
test_file_path: "python/functions/browser/cdp_eval_test.py"
file_path: "python/functions/browser/cdp_eval.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from browser.cdp_eval import cdp_eval
# Requiere un Chrome lanzado con --remote-debugging-port=9222
# y una pestana de WhatsApp Web abierta.
res = cdp_eval("document.title", port=9222, target_url_substr="whatsapp")
print(res["value"]) # -> "WhatsApp" (o None si no hay target)
print(res["ok"], res["target_url"])
```
O directo por CLI: `python3 python/functions/browser/cdp_eval.py "document.title" "whatsapp"`.
## Cuando usarla
Cuando quieras leer datos o ejecutar JS (focus, querySelector, `element.click()`, scroll)
sobre una pestana **ya abierta** del navegador diario sin abrir ventana nueva ni darle
foco al sistema. Es la primitiva de transport sobre la que se construyen las funciones
`whatsapp_*_py_browser` y cualquier script que opere una pestana existente via CDP.
## Gotchas
- Requiere un Chrome lanzado con `--remote-debugging-port=9222` (o el puerto que pases). Sin remote debugging, `GET /json` falla y devuelve `ok=False`.
- `target_url_substr` hace match de **substring** sobre la URL del target (no regex). El primer `page` que contenga el substring gana.
- `returnByValue` solo serializa valores JSON (str, num, bool, list, dict, None). Los nodos del DOM NO son serializables: devuelve la expresion ya reducida a un valor (ej. `el.textContent`, no `el`).
- Para **escribir** en inputs o `contenteditable` NO uses `document.execCommand` ni `el.value = ...`: editores como React/Lexical (WhatsApp Web) ignoran esos cambios programaticos. Usa `cdp_type_chars_py_browser` (teclea caracter a caracter via CDP `Input.dispatchKeyEvent`).
- Nunca lanza: errores de red, conexion WS o excepciones de la propia evaluacion JS se reportan en el campo `error` con `ok=False`.
+139
View File
@@ -0,0 +1,139 @@
"""Evalua una expresion JavaScript en una pestana de Chrome via Chrome DevTools Protocol.
Primitiva de transport CDP: localiza un target (pestana) por substring de su URL,
abre el WebSocket de depuracion, ejecuta `Runtime.evaluate` y devuelve el valor
serializado. Base reutilizable para automatizar el navegador diario sin abrir
ventana nueva ni darle foco.
"""
import json
import urllib.request
import websocket
def cdp_eval(
expression: str,
*,
port: int = 9222,
target_url_substr: str = "",
await_promise: bool = False,
timeout_s: float = 10.0,
) -> dict:
"""Evalua una expresion JS en una pestana de Chrome elegida por substring de URL.
Args:
expression: Expresion JavaScript a evaluar en el contexto de la pagina.
port: Puerto de remote debugging de Chrome. Default 9222.
target_url_substr: Substring que debe contener la URL del target. Si "",
usa el primer target de tipo "page".
await_promise: Si True, espera a que se resuelva una Promise antes de
devolver el valor (`awaitPromise` de CDP).
timeout_s: Timeout (segundos) para la conexion WebSocket.
Returns:
dict con claves:
ok: bool — True si la evaluacion produjo un valor sin excepcion.
value: valor Python serializable devuelto por la expresion, o None.
error: str — mensaje de error (vacio si ok).
target_url: str — URL del target usado (vacio si ninguno).
"""
# 1. Listar targets via HTTP.
try:
with urllib.request.urlopen(
f"http://127.0.0.1:{port}/json", timeout=5
) as resp:
targets = json.loads(resp.read().decode())
except Exception as e: # noqa: BLE001 — red/HTTP/JSON, no relanzar
return {"ok": False, "value": None, "error": str(e), "target_url": ""}
# 2. Elegir el primer target type=="page" cuya url contenga el substring.
chosen = None
for t in targets:
if t.get("type") != "page":
continue
url = t.get("url", "")
if target_url_substr == "" or target_url_substr in url:
chosen = t
break
if chosen is None:
return {
"ok": False,
"value": None,
"error": f"no target matching {target_url_substr}",
"target_url": "",
}
target_url = chosen.get("url", "")
ws_url = chosen.get("webSocketDebuggerUrl", "")
# 3-4. Abrir WS, enviar Runtime.evaluate, drenar eventos hasta id==1.
try:
ws = websocket.create_connection(ws_url, timeout=timeout_s)
except Exception as e: # noqa: BLE001 — conexion WS
return {"ok": False, "value": None, "error": str(e), "target_url": ""}
try:
ws.send(json.dumps({
"id": 1,
"method": "Runtime.evaluate",
"params": {
"expression": expression,
"returnByValue": True,
"awaitPromise": await_promise,
},
}))
msg = None
# Drenar eventos intermedios hasta encontrar la respuesta con id==1.
while True:
raw = ws.recv()
if not raw:
break
try:
parsed = json.loads(raw)
except Exception: # noqa: BLE001 — frame no-JSON, ignorar
continue
if parsed.get("id") == 1:
msg = parsed
break
except Exception as e: # noqa: BLE001 — fallo de transport durante recv/send
return {"ok": False, "value": None, "error": str(e), "target_url": target_url}
finally:
try:
ws.close()
except Exception: # noqa: BLE001 — cierre best-effort
pass
if msg is None:
return {
"ok": False,
"value": None,
"error": "no response for evaluate (id=1)",
"target_url": target_url,
}
result = msg.get("result", {})
# 5. Si hubo excepcion en la evaluacion, devolverla como error.
exc = result.get("exceptionDetails")
if exc:
text = exc.get("text", "evaluation exception")
exception = exc.get("exception", {})
detail = exception.get("description") or exception.get("value")
error = f"{text}: {detail}" if detail else text
return {"ok": False, "value": None, "error": error, "target_url": target_url}
# 6. Extraer el valor serializado por returnByValue.
value = result.get("result", {}).get("value")
return {"ok": True, "value": value, "error": "", "target_url": target_url}
if __name__ == "__main__":
import sys
expr = sys.argv[1] if len(sys.argv) > 1 else "document.title"
substr = sys.argv[2] if len(sys.argv) > 2 else ""
out = cdp_eval(expr, port=9222, target_url_substr=substr)
print(json.dumps(out, ensure_ascii=False, indent=2))
+146
View File
@@ -0,0 +1,146 @@
"""Tests para cdp_eval.
Como cdp_eval requiere un Chrome vivo con remote debugging, se mockean las dos
fronteras de I/O:
- urllib.request.urlopen -> devuelve un /json con 2 targets (uno whatsapp).
- websocket.create_connection -> un fake que responde al id==1 con un value.
"""
import io
import json
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import urllib.request
import websocket
from browser.cdp_eval import cdp_eval
# --- Fakes -----------------------------------------------------------------
def _targets_json():
"""Dos targets de tipo page: uno de Google, otro de WhatsApp Web."""
return [
{
"type": "page",
"url": "https://www.google.com/",
"webSocketDebuggerUrl": "ws://127.0.0.1:9222/devtools/page/GOOGLE",
},
{
"type": "page",
"url": "https://web.whatsapp.com/",
"webSocketDebuggerUrl": "ws://127.0.0.1:9222/devtools/page/WA",
},
]
class _FakeHTTPResponse:
"""Context manager que imita la respuesta de urlopen con .read()."""
def __init__(self, payload):
self._buf = io.BytesIO(json.dumps(payload).encode())
def read(self):
return self._buf.read()
def __enter__(self):
return self
def __exit__(self, *exc):
return False
class _FakeWS:
"""WebSocket fake: guarda el ws_url usado y responde al evaluate con value.
Antes de la respuesta con id==1, emite un evento intermedio (sin id) para
verificar que cdp_eval drena eventos hasta encontrar su respuesta.
"""
last_url = None
def __init__(self, url, value):
_FakeWS.last_url = url
self._value = value
self._queue = []
def send(self, raw):
msg = json.loads(raw)
if msg.get("id") == 1:
# Primero un evento de CDP sin id (debe drenarse), luego la respuesta.
self._queue.append(json.dumps({
"method": "Runtime.consoleAPICalled",
"params": {"type": "log"},
}))
self._queue.append(json.dumps({
"id": 1,
"result": {"result": {"type": "string", "value": self._value}},
}))
def recv(self):
if self._queue:
return self._queue.pop(0)
return ""
def close(self):
pass
# --- Tests -----------------------------------------------------------------
def test_golden_selecciona_target_por_substr_y_devuelve_value(monkeypatch):
monkeypatch.setattr(
urllib.request, "urlopen",
lambda url, timeout=5: _FakeHTTPResponse(_targets_json()),
)
monkeypatch.setattr(
websocket, "create_connection",
lambda url, timeout=10.0: _FakeWS(url, "WhatsApp"),
)
res = cdp_eval("document.title", port=9222, target_url_substr="whatsapp")
assert res["ok"] is True
assert res["value"] == "WhatsApp"
assert res["error"] == ""
assert res["target_url"] == "https://web.whatsapp.com/"
# Confirma que eligio el target whatsapp, no el de google.
assert _FakeWS.last_url.endswith("/WA")
def test_edge_substr_sin_match_devuelve_ok_false(monkeypatch):
monkeypatch.setattr(
urllib.request, "urlopen",
lambda url, timeout=5: _FakeHTTPResponse(_targets_json()),
)
# create_connection no deberia llamarse; si lo hace, revienta el test.
monkeypatch.setattr(
websocket, "create_connection",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("no debe conectar")),
)
res = cdp_eval("document.title", port=9222, target_url_substr="nope-no-existe")
assert res["ok"] is False
assert res["value"] is None
assert "no target matching" in res["error"]
assert "nope-no-existe" in res["error"]
assert res["target_url"] == ""
def test_error_urlopen_lanza_devuelve_ok_false(monkeypatch):
def _boom(url, timeout=5):
raise OSError("connection refused")
monkeypatch.setattr(urllib.request, "urlopen", _boom)
res = cdp_eval("document.title", port=9222, target_url_substr="whatsapp")
assert res["ok"] is False
assert res["value"] is None
assert "connection refused" in res["error"]
assert res["target_url"] == ""
+80
View File
@@ -0,0 +1,80 @@
---
name: cdp_press_key
kind: function
lang: py
domain: browser
version: "1.0.0"
purity: impure
signature: "def cdp_press_key(key: str, *, port: int = 9222, target_url_substr: str = '', modifiers: int = 0, timeout_s: float = 10.0) -> dict"
description: "Pulsa una tecla nombrada (Enter, Escape, Backspace, Tab, ArrowDown, Delete, ...) sobre el elemento enfocado de una pestana de un Chrome con remote debugging, via CDP Input.dispatchKeyEvent (rawKeyDown + keyUp). Primitiva de input CDP reutilizable: enviar mensajes (Enter en el composer de WhatsApp), cerrar overlays (Escape), navegar resultados (ArrowDown), borrar (Backspace) o combos con modificadores (Ctrl+A)."
tags: [cdp, browser, automation, python, navegator]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["json", "urllib.request", "websocket"]
params_schema:
params:
- name: key
desc: "Nombre canonico de la tecla a pulsar. Soportadas: Enter, Escape, Backspace, Tab, Delete, ArrowDown, ArrowUp, ArrowLeft, ArrowRight, Home, End. Una tecla no soportada devuelve ok=False sin tocar la red."
- name: port
desc: "Puerto de remote debugging de Chrome. Default 9222."
- name: target_url_substr
desc: "Substring que debe contener la URL del target (pestana). Si vacio, usa el primer target de tipo 'page'."
- name: modifiers
desc: "Bitmask de modificadores CDP combinables con OR: Alt=1, Ctrl=2, Meta/Cmd=4, Shift=8. Ej. Ctrl+Shift = 2|8 = 10. Default 0."
- name: timeout_s
desc: "Timeout en segundos para la conexion WebSocket. Default 10.0."
output: "dict {ok: bool, error: str}. ok=True si los eventos rawKeyDown+keyUp se emitieron sin error. Nunca lanza: errores de red/conexion/transport y teclas no soportadas se devuelven en 'error' con ok=False."
tested: true
tests: ["test_golden_enter_emite_rawkeydown_y_keyup_vk13", "test_edge_tecla_no_soportada_ok_false_sin_abrir_ws", "test_error_create_connection_lanza_ok_false"]
test_file_path: "python/functions/browser/cdp_press_key_test.py"
file_path: "python/functions/browser/cdp_press_key.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from browser.cdp_press_key import cdp_press_key
# Requiere un Chrome lanzado con --remote-debugging-port=9222
# y una pestana de WhatsApp Web abierta con el composer enfocado y texto escrito.
res = cdp_press_key("Enter", target_url_substr="whatsapp") # envia el mensaje
print(res["ok"], res["error"])
# Combo con modificadores (Ctrl+A para seleccionar todo): Ctrl=2.
cdp_press_key("Home", target_url_substr="whatsapp", modifiers=2)
```
O directo por CLI: `python3 python/functions/browser/cdp_press_key.py "Enter" "whatsapp"`.
## Cuando usarla
Cuando necesites enviar una pulsacion de tecla sobre la pestana **ya abierta** del
navegador diario sin abrir ventana nueva ni darle foco al sistema: **Enter** para
enviar (composer de WhatsApp), **Escape** para cerrar overlays/dialogos,
**ArrowDown/ArrowUp** para navegar resultados de busqueda, **Backspace/Delete** para
borrar, o combos con `modifiers` (Ctrl/Shift/Alt/Meta). Tipicamente **despues** de
escribir con `cdp_type_chars_py_browser` para confirmar la accion. Es la primitiva de
input sobre la que se construyen funciones `whatsapp_*_py_browser` y cualquier script
que opere una pestana existente via CDP.
## Gotchas
- Actua sobre el elemento **enfocado** de la pagina: CDP `Input.dispatchKeyEvent` no
apunta a un selector, va al foco actual. Asegura el foco (clic o `el.focus()` via
`cdp_eval_py_browser`) antes de pulsar.
- **Enter en WhatsApp Web envia el mensaje** (no inserta salto de linea). Para newline
dentro del composer usa Shift+Enter (`modifiers=8`) o no uses esta funcion para eso.
- `modifiers` es el bitmask de CDP, no un string: Alt=1, Ctrl=2, Meta/Cmd=4, Shift=8;
combina con OR (ej. Ctrl+Shift = 10).
- No se emite evento `char` aparte: el par `rawKeyDown`+`keyUp` con el
`windowsVirtualKeyCode` correcto (Enter=13) basta para disparar el envio en WhatsApp
(validado via `press_key` del MCP del navegador).
- Requiere un Chrome lanzado con `--remote-debugging-port=9222` (o el puerto que pases).
Sin remote debugging, `GET /json` falla y devuelve `ok=False`.
- Nunca lanza: errores de red, conexion WS, transport o tecla no soportada se reportan
en el campo `error` con `ok=False`.
+144
View File
@@ -0,0 +1,144 @@
"""Pulsa una tecla nombrada sobre el elemento enfocado de una pestana de Chrome via CDP.
Primitiva de input CDP: localiza un target (pestana) por substring de su URL, abre el
WebSocket de depuracion y emite el par de eventos `Input.dispatchKeyEvent`
(rawKeyDown + keyUp) de una tecla con nombre canonico (Enter, Escape, Backspace, Tab,
ArrowDown, ...) con modificadores opcionales. Base reutilizable para enviar mensajes
(Enter en el composer de WhatsApp), cerrar overlays (Escape), navegar resultados
(ArrowDown), borrar (Backspace) o combos (Ctrl+A con modifiers).
"""
import json
import urllib.request
import websocket
# Mapa de teclas soportadas -> {key, code, windowsVirtualKeyCode}.
# vk = windowsVirtualKeyCode == nativeVirtualKeyCode (codigos VK de Windows).
_KEY_MAP = {
"Enter": {"key": "Enter", "code": "Enter", "vk": 13},
"Escape": {"key": "Escape", "code": "Escape", "vk": 27},
"Backspace": {"key": "Backspace", "code": "Backspace", "vk": 8},
"Tab": {"key": "Tab", "code": "Tab", "vk": 9},
"Delete": {"key": "Delete", "code": "Delete", "vk": 46},
"ArrowDown": {"key": "ArrowDown", "code": "ArrowDown", "vk": 40},
"ArrowUp": {"key": "ArrowUp", "code": "ArrowUp", "vk": 38},
"ArrowLeft": {"key": "ArrowLeft", "code": "ArrowLeft", "vk": 37},
"ArrowRight": {"key": "ArrowRight", "code": "ArrowRight", "vk": 39},
"Home": {"key": "Home", "code": "Home", "vk": 36},
"End": {"key": "End", "code": "End", "vk": 35},
}
def cdp_press_key(
key: str,
*,
port: int = 9222,
target_url_substr: str = "",
modifiers: int = 0,
timeout_s: float = 10.0,
) -> dict:
"""Pulsa una tecla nombrada sobre el elemento enfocado de una pestana de Chrome.
Args:
key: Nombre canonico de la tecla. Soportadas: Enter, Escape, Backspace,
Tab, Delete, ArrowDown, ArrowUp, ArrowLeft, ArrowRight, Home, End.
port: Puerto de remote debugging de Chrome. Default 9222.
target_url_substr: Substring que debe contener la URL del target (pestana).
Si "", usa el primer target de tipo "page".
modifiers: Bitmask de modificadores CDP (Alt=1, Ctrl=2, Meta/Cmd=4,
Shift=8; combinables con OR). Default 0 (sin modificadores).
timeout_s: Timeout (segundos) para la conexion WebSocket. Default 10.0.
Returns:
dict con claves:
ok: bool — True si los eventos de tecla se emitieron sin error.
error: str — mensaje de error (vacio si ok).
Nunca lanza: errores de red/conexion/transport se devuelven en "error"
con ok=False.
"""
# 1. Validar la tecla contra el mapa interno (antes de tocar la red).
entry = _KEY_MAP.get(key)
if entry is None:
return {"ok": False, "error": f"unsupported key: {key}"}
k = entry["key"]
c = entry["code"]
vk = entry["vk"]
# 2. Listar targets via HTTP.
try:
with urllib.request.urlopen(
f"http://127.0.0.1:{port}/json", timeout=5
) as resp:
targets = json.loads(resp.read().decode())
except Exception as e: # noqa: BLE001 — red/HTTP/JSON, no relanzar
return {"ok": False, "error": str(e)}
# 3. Elegir el primer target type=="page" cuya url contenga el substring.
chosen = None
for t in targets:
if t.get("type") != "page":
continue
url = t.get("url", "")
if target_url_substr == "" or target_url_substr in url:
chosen = t
break
if chosen is None:
return {"ok": False, "error": f"no target matching {target_url_substr}"}
ws_url = chosen.get("webSocketDebuggerUrl", "")
# 4. Abrir WS y emitir rawKeyDown + keyUp.
try:
ws = websocket.create_connection(ws_url, timeout=timeout_s)
except Exception as e: # noqa: BLE001 — conexion WS
return {"ok": False, "error": str(e)}
try:
for msg_id, ev_type in ((1, "rawKeyDown"), (2, "keyUp")):
ws.send(json.dumps({
"id": msg_id,
"method": "Input.dispatchKeyEvent",
"params": {
"type": ev_type,
"key": k,
"code": c,
"windowsVirtualKeyCode": vk,
"nativeVirtualKeyCode": vk,
"modifiers": modifiers,
},
}))
# Drenar respuestas hasta ver el id==2 (o agotar el stream). Ignoramos
# eventos del server y frames no-JSON.
while True:
raw = ws.recv()
if not raw:
break
try:
parsed = json.loads(raw)
except Exception: # noqa: BLE001 — frame no-JSON, ignorar
continue
if parsed.get("id") == 2:
break
except Exception as e: # noqa: BLE001 — fallo de transport durante send/recv
return {"ok": False, "error": str(e)}
finally:
try:
ws.close()
except Exception: # noqa: BLE001 — cierre best-effort
pass
return {"ok": True, "error": ""}
if __name__ == "__main__":
import sys
key_arg = sys.argv[1] if len(sys.argv) > 1 else "Enter"
substr = sys.argv[2] if len(sys.argv) > 2 else ""
out = cdp_press_key(key_arg, port=9222, target_url_substr=substr)
print(json.dumps(out, ensure_ascii=False, indent=2))
@@ -0,0 +1,143 @@
"""Tests para cdp_press_key — mockean urlopen + create_connection.
Mockean la capa de red de CDP: urllib.request.urlopen (lista de targets) y
websocket.create_connection (un fake que captura los mensajes enviados y devuelve
las respuestas CDP con el id correspondiente).
"""
import json
import os
import sys
from contextlib import contextmanager
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from browser import cdp_press_key as mod # noqa: E402
from browser.cdp_press_key import cdp_press_key # noqa: E402
class _FakeResp:
"""Context manager que imita la respuesta de urllib.request.urlopen."""
def __init__(self, payload: list):
self._payload = payload
def __enter__(self):
return self
def __exit__(self, *exc):
return False
def read(self):
return json.dumps(self._payload).encode()
class _FakeWS:
"""WebSocket falso: captura los mensajes enviados y responde por id."""
def __init__(self):
self.sent = []
self._inbox = []
self.closed = False
def send(self, raw: str):
msg = json.loads(raw)
self.sent.append(msg)
# Encola una respuesta CDP vacia con el mismo id (como Chrome devuelve).
self._inbox.append(json.dumps({"id": msg["id"], "result": {}}))
def recv(self):
if self._inbox:
return self._inbox.pop(0)
return ""
def close(self):
self.closed = True
@contextmanager
def _patch(monkeypatch_targets, ws_obj=None, urlopen_exc=None, create_conn_exc=None):
"""Parchea urlopen y create_connection del modulo. Restaura al salir."""
orig_urlopen = mod.urllib.request.urlopen
orig_create = mod.websocket.create_connection
def fake_urlopen(url, timeout=5):
if urlopen_exc is not None:
raise urlopen_exc
return _FakeResp(monkeypatch_targets)
def fake_create(ws_url, timeout=10):
if create_conn_exc is not None:
raise create_conn_exc
return ws_obj
mod.urllib.request.urlopen = fake_urlopen
mod.websocket.create_connection = fake_create
try:
yield
finally:
mod.urllib.request.urlopen = orig_urlopen
mod.websocket.create_connection = orig_create
_TARGETS = [
{"type": "page", "url": "https://web.whatsapp.com/", "webSocketDebuggerUrl": "ws://x/1"},
]
def test_golden_enter_emite_rawkeydown_y_keyup_vk13():
"""Enter envia rawKeyDown + keyUp con windowsVirtualKeyCode 13."""
ws = _FakeWS()
with _patch(_TARGETS, ws_obj=ws):
res = cdp_press_key("Enter", target_url_substr="whatsapp")
assert res == {"ok": True, "error": ""}
assert len(ws.sent) == 2
down, up = ws.sent
assert down["method"] == "Input.dispatchKeyEvent"
assert down["params"]["type"] == "rawKeyDown"
assert down["params"]["key"] == "Enter"
assert down["params"]["code"] == "Enter"
assert down["params"]["windowsVirtualKeyCode"] == 13
assert down["params"]["nativeVirtualKeyCode"] == 13
assert down["params"]["modifiers"] == 0
assert up["params"]["type"] == "keyUp"
assert up["params"]["windowsVirtualKeyCode"] == 13
assert ws.closed is True
def test_edge_tecla_no_soportada_ok_false_sin_abrir_ws():
"""Una tecla fuera del mapa devuelve ok=False sin tocar la red ni abrir WS."""
ws = _FakeWS()
def fail_create(ws_url, timeout=10):
raise AssertionError("create_connection no debe llamarse para tecla no soportada")
def fail_urlopen(url, timeout=5):
raise AssertionError("urlopen no debe llamarse para tecla no soportada")
orig_urlopen = mod.urllib.request.urlopen
orig_create = mod.websocket.create_connection
mod.urllib.request.urlopen = fail_urlopen
mod.websocket.create_connection = fail_create
try:
res = cdp_press_key("F13", target_url_substr="whatsapp")
finally:
mod.urllib.request.urlopen = orig_urlopen
mod.websocket.create_connection = orig_create
assert res["ok"] is False
assert "unsupported key: F13" in res["error"]
assert ws.sent == []
def test_error_create_connection_lanza_ok_false():
"""Si create_connection lanza, se captura y devuelve ok=False sin relanzar."""
with _patch(_TARGETS, create_conn_exc=ConnectionRefusedError("ws down")):
res = cdp_press_key("Escape", target_url_substr="whatsapp")
assert res["ok"] is False
assert "ws down" in res["error"]
@@ -0,0 +1,87 @@
---
name: cdp_type_chars
kind: function
lang: py
domain: browser
version: "1.0.0"
purity: impure
signature: "def cdp_type_chars(text: str, *, port: int = 9222, target_url_substr: str = '', delay_ms: int = 12, timeout_s: float = 10.0) -> dict"
description: "Escribe texto caracter a caracter en el elemento ENFOCADO de una pestana via CDP (Input.dispatchKeyEvent keyDown/keyUp por char). Unico metodo validado para el editor Lexical de WhatsApp Web, donde document.execCommand/el.value no funcionan. El caller debe enfocar el elemento antes."
tags: [cdp, browser, automation, python, navegator]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: ["urllib.request", "json", "time", "websocket"]
tested: true
tests: ["escribir ab envia 4 dispatchKeyEvent con text correctos", "text vacio no envia nada y devuelve ok", "fallo de create_connection devuelve ok False"]
test_file_path: "python/functions/browser/cdp_type_chars_test.py"
file_path: "python/functions/browser/cdp_type_chars.py"
params:
- name: text
desc: "Texto a escribir, caracter a caracter. Cada char emite un par keyDown/keyUp."
- name: port
desc: "Puerto de depuracion remota de Chrome (DevTools). Default 9222."
- name: target_url_substr
desc: "Substring para elegir el target page por su URL. Si vacio, usa el primer page disponible."
- name: delay_ms
desc: "Pausa en milisegundos entre cada par de teclas. Humaniza y da tiempo al re-render de editores como Lexical. Default 12."
- name: timeout_s
desc: "Timeout en segundos para el GET /json y la apertura del WebSocket. Default 10.0."
output: "dict {ok: bool, chars_sent: int, error: str}. ok True si todos los chars se enviaron; chars_sent cuenta los enviados (incluso si falla a mitad); error con el mensaje o cadena vacia. Nunca lanza."
---
## Ejemplo
Requiere Chrome lanzado con remote debugging (`--remote-debugging-port=9222`)
y un elemento editable ENFOCADO en la pestana. Primero se enfoca con cdp_eval
(ejecutando `.focus()` sobre el composer), luego se escribe:
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from browser.cdp_eval import cdp_eval # enfoca el elemento
from browser.cdp_type_chars import cdp_type_chars
# 1. Enfocar el composer (contenteditable de Lexical) de WhatsApp Web.
cdp_eval(
"document.querySelector('footer div[contenteditable=\"true\"]').focus()",
target_url_substr="whatsapp",
)
# 2. Escribir caracter a caracter en el elemento enfocado.
res = cdp_type_chars("hola", target_url_substr="whatsapp")
print(res) # {'ok': True, 'chars_sent': 4, 'error': ''}
```
> Nota: esta funcion NO enfoca por si misma. El enfoque previo lo hace el
> caller con `cdp_eval_py_browser` (ejecutando `.focus()` sobre el selector).
> El contrato de `cdp_type_chars` es solo "escribir en lo que ya esta
> enfocado".
## Cuando usarla
- Para escribir en inputs, textarea y contenteditable de una pestana ya
abierta: el composer Lexical de WhatsApp Web, su search box, o cualquier
campo que requiera key events reales en vez de asignacion directa de valor.
- SIEMPRE despues de enfocar el elemento destino con cdp_eval ejecutando
`.focus()` sobre el selector.
- Cuando `el.value = ...` o `document.execCommand('insertText')` no surten
efecto porque el editor escucha eventos de teclado (caso Lexical).
## Gotchas
- Escribe en el elemento ACTUALMENTE ENFOCADO, no recibe selector: enfoca
antes con cdp_eval (`...focus()`). Sin foco, los chars se pierden o van al
body.
- NO mezclar con `document.execCommand('insertText')`: en Lexical produce
texto duplicado o intercalado (gotcha real observado). Usa solo una via.
- `delay_ms` muy bajo (cerca de 0) puede perder caracteres en Lexical, que
necesita un re-render entre teclas. 12 ms es un default seguro; subir si
ves chars perdidos.
- Solo inserta texto plano via el campo `text`. Para Enter, Backspace, flechas
u otras teclas especiales (incluido enviar el mensaje) usa
`cdp_press_key_py_browser`.
- Impura: depende de un Chrome con remote debugging accesible en `port`. Si no
hay target page valido devuelve `{"ok": False, ...}` (no lanza).
+123
View File
@@ -0,0 +1,123 @@
"""Escribe texto caracter a caracter en el elemento enfocado via CDP.
Primitiva de input de bajo nivel: envia pares keyDown/keyUp de
`Input.dispatchKeyEvent` por cada caracter del texto al target `page`
seleccionado. El campo `text` del keyDown es lo que inserta el caracter
en el elemento actualmente enfocado (inputs, textarea, contenteditable).
Es el unico metodo validado para el editor Lexical de WhatsApp Web:
`document.execCommand('insertText')` y `el.value = ...` no disparan los
listeners internos de Lexical y el texto no persiste. El caller debe
enfocar el elemento ANTES (p.ej. con cdp_eval ejecutando `.focus()`).
"""
import json
import time
import urllib.request
import websocket
def cdp_type_chars(
text: str,
*,
port: int = 9222,
target_url_substr: str = "",
delay_ms: int = 12,
timeout_s: float = 10.0,
) -> dict:
"""Escribe texto caracter a caracter en el elemento enfocado de una pestana.
Localiza el target `page` por substring de URL, abre el WebSocket CDP y
envia un par keyDown/keyUp de Input.dispatchKeyEvent por cada caracter,
con `delay_ms` de pausa entre pares (humaniza y deja re-renderizar a
editores como Lexical). Escribe en el elemento ACTUALMENTE ENFOCADO; el
caller debe enfocarlo antes (cdp_eval ejecutando `.focus()`).
Args:
text: Texto a escribir, caracter a caracter.
port: Puerto de depuracion remota de Chrome. Default 9222.
target_url_substr: Substring para elegir el target page. Si vacio,
usa el primer page disponible.
delay_ms: Pausa en milisegundos entre cada par de teclas. Default 12.
timeout_s: Timeout de apertura del WebSocket en segundos. Default 10.0.
Returns:
dict con:
ok (bool): True si todos los caracteres se enviaron sin error.
chars_sent (int): Numero de caracteres efectivamente enviados.
error (str): Mensaje de error si lo hubo, "" en caso contrario.
No lanza excepciones: cualquier error de red/WS se devuelve en el dict.
"""
# 1. Localizar el target page por substring de URL.
try:
with urllib.request.urlopen(
f"http://127.0.0.1:{port}/json", timeout=timeout_s
) as resp:
targets = json.loads(resp.read().decode())
except Exception as e:
return {"ok": False, "chars_sent": 0,
"error": f"no se pudo conectar a Chrome en port {port}: {e}"}
ws_url = ""
for t in targets:
if t.get("type") != "page":
continue
url = t.get("url", "")
if target_url_substr and target_url_substr not in url:
continue
ws_url = t.get("webSocketDebuggerUrl", "")
if ws_url:
break
if not ws_url:
hint = f" matching '{target_url_substr}'" if target_url_substr else ""
return {"ok": False, "chars_sent": 0,
"error": f"no target page{hint} con webSocketDebuggerUrl"}
# 2. Abrir WebSocket y enviar las teclas.
chars_sent = 0
ws = None
try:
ws = websocket.create_connection(ws_url, timeout=timeout_s)
# Lectura no bloqueante para drenar respuestas/eventos sin colgarse.
ws.settimeout(0.1)
msg_id = 1
for ch in text:
ws.send(json.dumps({
"id": msg_id,
"method": "Input.dispatchKeyEvent",
"params": {"type": "keyDown", "text": ch},
}))
ws.send(json.dumps({
"id": msg_id + 1,
"method": "Input.dispatchKeyEvent",
"params": {"type": "keyUp", "text": ch},
}))
msg_id += 2
chars_sent += 1
# Drenar el socket sin bloquear: descarta lo que haya llegado.
try:
while True:
ws.recv()
except Exception:
pass
time.sleep(delay_ms / 1000.0)
return {"ok": True, "chars_sent": chars_sent, "error": ""}
except Exception as e:
return {"ok": False, "chars_sent": chars_sent, "error": str(e)}
finally:
if ws is not None:
try:
ws.close()
except Exception:
pass
if __name__ == "__main__":
import sys
txt = sys.argv[1] if len(sys.argv) > 1 else "hola"
substr = sys.argv[2] if len(sys.argv) > 2 else ""
print(json.dumps(cdp_type_chars(txt, target_url_substr=substr), ensure_ascii=False))
@@ -0,0 +1,123 @@
"""Tests para cdp_type_chars.
Mockea urllib.request.urlopen (GET /json) y websocket.create_connection
(conexion fake que acumula los mensajes enviados) para validar el transporte
CDP sin un Chrome real.
"""
import io
import json
import cdp_type_chars as mod
from cdp_type_chars import cdp_type_chars
class _FakeResp:
"""Context manager que imita la respuesta de urllib.request.urlopen."""
def __init__(self, payload):
self._buf = io.BytesIO(json.dumps(payload).encode())
def __enter__(self):
return self
def __exit__(self, *exc):
return False
def read(self):
return self._buf.read()
class _FakeWS:
"""Conexion WebSocket fake: acumula los mensajes enviados y nunca recibe."""
def __init__(self):
self.sent = []
self.closed = False
def settimeout(self, _):
pass
def send(self, payload):
self.sent.append(json.loads(payload))
def recv(self):
# Socket vacio: simula timeout no bloqueante para drenar.
raise TimeoutError("empty")
def close(self):
self.closed = True
def _patch_targets(monkeypatch, payload):
monkeypatch.setattr(
mod.urllib.request, "urlopen", lambda *a, **k: _FakeResp(payload)
)
def _patch_sleep(monkeypatch):
# Evita esperas reales por delay_ms.
monkeypatch.setattr(mod.time, "sleep", lambda _s: None)
_TARGETS = [
{"type": "page", "url": "https://web.whatsapp.com/",
"webSocketDebuggerUrl": "ws://127.0.0.1:9222/devtools/page/AB12"},
]
def test_escribir_ab_envia_4_dispatchkeyevent_con_text_correctos(monkeypatch):
"""Golden: 'ab' produce 4 Input.dispatchKeyEvent con los text correctos."""
fake_ws = _FakeWS()
_patch_targets(monkeypatch, _TARGETS)
_patch_sleep(monkeypatch)
monkeypatch.setattr(mod.websocket, "create_connection", lambda *a, **k: fake_ws)
res = cdp_type_chars("ab", target_url_substr="whatsapp")
assert res["ok"] is True
assert res["chars_sent"] == 2
assert res["error"] == ""
methods = [m["method"] for m in fake_ws.sent]
assert methods == ["Input.dispatchKeyEvent"] * 4
types = [m["params"]["type"] for m in fake_ws.sent]
assert types == ["keyDown", "keyUp", "keyDown", "keyUp"]
texts = [m["params"]["text"] for m in fake_ws.sent]
assert texts == ["a", "a", "b", "b"]
assert fake_ws.closed is True
def test_text_vacio_no_envia_nada_y_devuelve_ok(monkeypatch):
"""Edge: texto vacio -> 0 chars, ok True, sin mensajes enviados."""
fake_ws = _FakeWS()
_patch_targets(monkeypatch, _TARGETS)
_patch_sleep(monkeypatch)
monkeypatch.setattr(mod.websocket, "create_connection", lambda *a, **k: fake_ws)
res = cdp_type_chars("", target_url_substr="whatsapp")
assert res["ok"] is True
assert res["chars_sent"] == 0
assert res["error"] == ""
assert fake_ws.sent == []
def test_fallo_de_create_connection_devuelve_ok_false(monkeypatch):
"""Error: create_connection lanza -> ok False, error poblado."""
_patch_targets(monkeypatch, _TARGETS)
_patch_sleep(monkeypatch)
def _boom(*a, **k):
raise ConnectionRefusedError("connection refused")
monkeypatch.setattr(mod.websocket, "create_connection", _boom)
res = cdp_type_chars("hola", target_url_substr="whatsapp")
assert res["ok"] is False
assert res["chars_sent"] == 0
assert "connection refused" in res["error"]
@@ -0,0 +1,80 @@
---
name: har_extract_calls
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def har_extract_calls(entries: list[dict], *, drop_headers: list[str] | None = None) -> list[dict]"
description: "Normaliza una lista de entries HAR (salida de har_filter_flows) en call specs reproducibles: extrae cookies del header Cookie, limpia headers hop-by-hop, infiere body_type y expone los datos de auth para parametrizar luego con {{param}}. Segundo paso del patron grabar->destilar->reproducir un flujo web. NO auto-parametriza."
tags: [flow-replay, har, http, proxy, cybersecurity, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
params:
- name: entries
desc: "lista de entries HAR (cada uno con request y, opcional, response). Tipicamente la salida de har_filter_flows."
- name: drop_headers
desc: "nombres extra de headers a eliminar (case-insensitive), aparte de los hop-by-hop por defecto. None = no quitar extras."
output: "lista de call specs (una por entry) con claves: method (upper), url, headers (sin hop-by-hop ni Cookie), cookies (parseadas del header Cookie), body, body_type (json|form|raw|None), status (int|None), sets_cookies (nombres de cookies que setea la respuesta)"
tested: true
tests:
- "test_golden_post_con_cookie_y_body_json"
- "test_edge_get_sin_body"
- "test_drop_headers_extra_respetado"
- "test_form_body_y_set_cookie_desde_headers"
- "test_lista_vacia"
test_file_path: "python/functions/cybersecurity/har_extract_calls_test.py"
file_path: "python/functions/cybersecurity/har_extract_calls.py"
---
## Ejemplo
```python
from har_extract_calls import har_extract_calls
entries = [
{
"request": {
"method": "post",
"url": "https://api.example.com/login",
"headers": [
{"name": "Host", "value": "api.example.com"},
{"name": "Content-Type", "value": "application/json"},
{"name": "Cookie", "value": "session=abc; csrf=xyz"},
],
"postData": {
"mimeType": "application/json",
"text": '{"user":"neo","pass":"secret"}',
},
},
"response": {"status": 200, "cookies": [{"name": "session", "value": "new"}]},
}
]
har_extract_calls(entries)
# [{
# "method": "POST",
# "url": "https://api.example.com/login",
# "headers": {"Content-Type": "application/json"}, # Host (hop-by-hop) y Cookie removidos
# "cookies": {"session": "abc", "csrf": "xyz"}, # parseadas del header Cookie
# "body": '{"user":"neo","pass":"secret"}',
# "body_type": "json", # inferido del mimeType
# "status": 200,
# "sets_cookies": ["session"], # cookies que setea la respuesta
# }]
```
## Cuando usarla
Usala tras `har_filter_flows`, una vez tienes los entries HAR del flujo que te interesa, para obtener el boceto normalizado de los requests. Las call specs resultantes son el punto de partida para: (1) marcar a mano los valores dinamicos con `{{param}}` y guardar el flujo como funcion-accion del registry, o (2) reproducir la secuencia con `http_replay_sequence_py_infra`. Tambien para auditar rapido que cookies/headers de auth lleva cada peticion de un flujo capturado.
## Gotchas
- **NO auto-parametriza.** Deja todos los valores tal cual aparecen en el HAR. La deteccion de CSRF tokens, anti-forgery y otros valores dinamicos es responsabilidad del humano/Claude, que los marca despues con `{{param}}`. La auto-deteccion es v2, fuera de scope.
- **El output contiene secretos.** Las cookies de sesion, tokens `Authorization` y demas auth del HAR viajan tal cual en las call specs. NO commitear el output crudo ni pegarlo en sitios publicos: redactar/parametrizar antes de persistir.
- **Headers hop-by-hop se descartan siempre** (host, content-length, connection, keep-alive, proxy-connection, accept-encoding, te, trailer, transfer-encoding, upgrade). Si necesitas conservar alguno para reproducir un caso especial, tendras que reañadirlo manualmente en la call spec.
- **El header `Cookie` se mueve a `cookies`** y desaparece de `headers`: al reproducir, el cliente HTTP debe re-serializar las cookies (no asumir que siguen en headers).
@@ -0,0 +1,168 @@
"""Normaliza entries HAR en call specs reproducibles.
Segundo paso del patron "grabar -> destilar -> reproducir" un flujo web como
funcion del registry. Toma la salida de `har_filter_flows` (lista de entries
HAR) y produce call specs limpias, con auth (cookies/headers) expuesta para
que el humano/Claude marque luego los valores dinamicos con `{{param}}`.
Funcion PURA: sin I/O, transforma listas/dicts de forma determinista.
"""
# Headers hop-by-hop / ruidosos que se eliminan por defecto (case-insensitive).
# `cookie` se trata aparte: se extrae a `cookies` y se quita de `headers`.
_HOP_BY_HOP = frozenset(
{
"host",
"content-length",
"connection",
"keep-alive",
"proxy-connection",
"accept-encoding",
"te",
"trailer",
"transfer-encoding",
"upgrade",
}
)
def _parse_cookie_header(value: str) -> dict:
"""Parsea el valor de un header `Cookie` en un dict {name: value}.
Formato: `a=1; b=2; c=3`. El ultimo gana si hay nombres repetidos.
"""
cookies: dict = {}
for pair in value.split(";"):
pair = pair.strip()
if not pair:
continue
name, sep, val = pair.partition("=")
name = name.strip()
if not name:
continue
cookies[name] = val.strip() if sep else ""
return cookies
def _infer_body_type(mime_type: str | None) -> str | None:
"""Infiere el tipo de body a partir del mimeType de postData.
application/json -> "json"
application/x-www-form-... -> "form"
multipart/* -> "raw"
otro / None -> "raw" si hay body, None lo decide el caller.
"""
if not mime_type:
return None
mt = mime_type.split(";", 1)[0].strip().lower()
if mt == "application/json":
return "json"
if mt == "application/x-www-form-urlencoded":
return "form"
if mt.startswith("multipart/"):
return "raw"
return "raw"
def _set_cookie_names_from_headers(headers: list[dict]) -> list[str]:
"""Extrae nombres de cookies de los headers `Set-Cookie` de la respuesta."""
names: list[str] = []
for h in headers:
if str(h.get("name", "")).lower() != "set-cookie":
continue
raw = str(h.get("value", ""))
# Set-Cookie: name=value; Path=/; HttpOnly -> nos quedamos con `name`.
first = raw.split(";", 1)[0].strip()
name = first.split("=", 1)[0].strip()
if name:
names.append(name)
return names
def har_extract_calls(
entries: list[dict],
*,
drop_headers: list[str] | None = None,
) -> list[dict]:
"""Convierte entries HAR en call specs normalizadas y reproducibles.
Por cada entry HAR produce un dict call spec con method, url, headers
(sin hop-by-hop, sin Cookie), cookies (parseadas del header Cookie),
body, body_type inferido, status de respuesta y nombres de cookies que
la respuesta setea. NO auto-parametriza: deja los valores tal cual para
que el humano marque despues los dinamicos con `{{param}}`.
Args:
entries: lista de entries HAR (cada uno con `request` y, opcional,
`response`). Tipicamente la salida de `har_filter_flows`.
drop_headers: nombres extra de headers a eliminar (case-insensitive),
aparte de los hop-by-hop por defecto. None = no quitar extras.
Returns:
lista de call specs, una por entry, con las claves: method, url,
headers, cookies, body, body_type, status, sets_cookies.
"""
extra_drop = {h.lower() for h in (drop_headers or [])}
specs: list[dict] = []
for entry in entries:
request = entry.get("request") or {}
response = entry.get("response") or {}
method = str(request.get("method", "")).upper()
url = request.get("url", "")
# Headers: lista [{name, value}] -> dict, con drop de hop-by-hop +
# extras, y extraccion del header Cookie a `cookies`.
headers: dict = {}
cookies: dict = {}
for h in request.get("headers") or []:
name = str(h.get("name", ""))
value = str(h.get("value", ""))
lname = name.lower()
if lname == "cookie":
cookies.update(_parse_cookie_header(value))
continue
if lname in _HOP_BY_HOP or lname in extra_drop:
continue
headers[name] = value # ultimo gana si repetidos
# Body desde postData.
post_data = request.get("postData") or {}
body = post_data.get("text")
mime_type = post_data.get("mimeType")
body_type = _infer_body_type(mime_type) if body is not None else None
# Status de respuesta.
raw_status = response.get("status")
status = int(raw_status) if isinstance(raw_status, (int, float)) and raw_status else None
if isinstance(raw_status, str) and raw_status.isdigit():
status = int(raw_status)
# Cookies que setea la respuesta: preferir response.cookies (HAR);
# si no, parsear los headers Set-Cookie.
sets_cookies: list[str] = []
resp_cookies = response.get("cookies")
if resp_cookies:
sets_cookies = [
str(c.get("name", "")) for c in resp_cookies if c.get("name")
]
else:
sets_cookies = _set_cookie_names_from_headers(
response.get("headers") or []
)
specs.append(
{
"method": method,
"url": url,
"headers": headers,
"cookies": cookies,
"body": body,
"body_type": body_type,
"status": status,
"sets_cookies": sets_cookies,
}
)
return specs
@@ -0,0 +1,150 @@
"""Tests para har_extract_calls."""
from har_extract_calls import har_extract_calls
def test_golden_post_con_cookie_y_body_json():
"""Golden: POST con header Cookie + body json -> spec correcta,
cookie extraida, hop-by-hop dropeados, set-cookie de respuesta."""
entries = [
{
"request": {
"method": "post",
"url": "https://api.example.com/login",
"headers": [
{"name": "Host", "value": "api.example.com"},
{"name": "Content-Length", "value": "42"},
{"name": "Accept-Encoding", "value": "gzip"},
{"name": "Content-Type", "value": "application/json"},
{"name": "Authorization", "value": "Bearer tok123"},
{"name": "Cookie", "value": "session=abc; csrf=xyz"},
],
"postData": {
"mimeType": "application/json",
"text": '{"user":"neo","pass":"secret"}',
},
},
"response": {
"status": 200,
"cookies": [
{"name": "session", "value": "newsess"},
{"name": "remember", "value": "1"},
],
"headers": [],
},
}
]
[spec] = har_extract_calls(entries)
assert spec["method"] == "POST"
assert spec["url"] == "https://api.example.com/login"
# Hop-by-hop dropeados, Cookie extraido fuera de headers.
assert spec["headers"] == {
"Content-Type": "application/json",
"Authorization": "Bearer tok123",
}
assert "Host" not in spec["headers"]
assert "Content-Length" not in spec["headers"]
assert "Accept-Encoding" not in spec["headers"]
assert "Cookie" not in spec["headers"]
# Cookies parseadas del header Cookie.
assert spec["cookies"] == {"session": "abc", "csrf": "xyz"}
# Body + tipo inferido.
assert spec["body"] == '{"user":"neo","pass":"secret"}'
assert spec["body_type"] == "json"
# Status de respuesta.
assert spec["status"] == 200
# Cookies que setea la respuesta.
assert spec["sets_cookies"] == ["session", "remember"]
def test_edge_get_sin_body():
"""Edge: GET sin body -> body None, body_type None, sin cookies."""
entries = [
{
"request": {
"method": "get",
"url": "https://api.example.com/me",
"headers": [
{"name": "Accept", "value": "application/json"},
],
},
"response": {"status": 304, "headers": []},
}
]
[spec] = har_extract_calls(entries)
assert spec["method"] == "GET"
assert spec["body"] is None
assert spec["body_type"] is None
assert spec["cookies"] == {}
assert spec["headers"] == {"Accept": "application/json"}
assert spec["status"] == 304
assert spec["sets_cookies"] == []
def test_drop_headers_extra_respetado():
"""drop_headers extra elimina headers adicionales (case-insensitive)."""
entries = [
{
"request": {
"method": "GET",
"url": "https://api.example.com/data",
"headers": [
{"name": "User-Agent", "value": "curl/8"},
{"name": "X-Trace-Id", "value": "noise-123"},
{"name": "Accept", "value": "*/*"},
],
},
"response": {"status": 200, "headers": []},
}
]
[spec] = har_extract_calls(entries, drop_headers=["x-trace-id"])
assert "X-Trace-Id" not in spec["headers"]
assert spec["headers"] == {"User-Agent": "curl/8", "Accept": "*/*"}
def test_form_body_y_set_cookie_desde_headers():
"""Body form-urlencoded -> body_type form; set-cookie parseado de headers
cuando no hay response.cookies."""
entries = [
{
"request": {
"method": "POST",
"url": "https://api.example.com/form",
"headers": [
{
"name": "Content-Type",
"value": "application/x-www-form-urlencoded",
},
],
"postData": {
"mimeType": "application/x-www-form-urlencoded",
"text": "a=1&b=2",
},
},
"response": {
"status": 201,
"headers": [
{"name": "Set-Cookie", "value": "auth=tok; Path=/; HttpOnly"},
{"name": "Content-Type", "value": "text/html"},
{"name": "Set-Cookie", "value": "lang=es; Path=/"},
],
},
}
]
[spec] = har_extract_calls(entries)
assert spec["body_type"] == "form"
assert spec["body"] == "a=1&b=2"
assert spec["sets_cookies"] == ["auth", "lang"]
def test_lista_vacia():
"""Sin entries -> lista vacia."""
assert har_extract_calls([]) == []
@@ -0,0 +1,77 @@
---
name: har_filter_flows
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def har_filter_flows(har: dict, *, hosts: list[str] | None = None, methods: list[str] | None = None, drop_static: bool = True, drop_analytics: bool = True) -> list[dict]"
description: "Filtra un HAR (formato W3C, el que exporta query_mitm_flows --har) dejando solo los flujos relevantes para reconstruir una accion HTTP: descarta recursos estaticos y dominios de analytics, y restringe por host/metodo. Primer paso del patron grabar->destilar->reproducir."
tags: [flow-replay, har, proxy, cybersecurity, web-proxy]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [urllib.parse]
params:
- name: har
desc: "HAR ya parseado como dict (formato W3C). Se leen los entries de har['log']['entries']; si la estructura no existe devuelve []"
- name: hosts
desc: "lista de hosts a conservar (match exacto del host de cada URL). None = no filtra por host"
- name: methods
desc: "lista de metodos HTTP a conservar (GET/POST/...). Se normaliza a mayusculas en ambos lados. None = no filtra por metodo"
- name: drop_static
desc: "si True (default) descarta recursos estaticos: mimeType image/*, font/*, text/css, application|text/javascript, o ruta terminada en .css .js .mjs .map .png .jpg .jpeg .gif .svg .webp .ico .woff .woff2 .ttf .eot"
- name: drop_analytics
desc: "si True (default) descarta entries cuyo host caiga en la blocklist heuristica de telemetria (google-analytics, googletagmanager, doubleclick, sentry.io, segment, mixpanel, hotjar, datadoghq, etc.) por substring sobre el host"
output: "lista de dicts: subconjunto de los entries HAR de entrada (sin mutarlos) que pasan todos los filtros, en su orden original"
tested: true
tests: ["test_golden_solo_sobrevive_el_post_de_api", "test_har_vacio_devuelve_lista_vacia", "test_har_sin_log_entries_devuelve_lista_vacia", "test_filtro_por_hosts", "test_filtro_por_methods"]
test_file_path: "python/functions/cybersecurity/har_filter_flows_test.py"
file_path: "python/functions/cybersecurity/har_filter_flows.py"
---
## Ejemplo
```python
har = {
"log": {
"entries": [
{ # estatico: CSS -> se descarta
"request": {"method": "GET", "url": "https://app.example.com/styles/main.css"},
"response": {"content": {"mimeType": "text/css"}},
},
{ # analytics -> se descarta
"request": {"method": "POST", "url": "https://www.google-analytics.com/collect"},
"response": {"content": {"mimeType": "application/json"}},
},
{ # el POST de API que queremos reproducir -> sobrevive
"request": {"method": "POST", "url": "https://api.example.com/v1/login"},
"response": {"content": {"mimeType": "application/json"}},
},
]
}
}
flows = har_filter_flows(har)
print(len(flows)) # 1
print(flows[0]["request"]["url"]) # https://api.example.com/v1/login
# Restringir aun mas al host y metodo de interes:
flows = har_filter_flows(har, hosts=["api.example.com"], methods=["POST"])
print(len(flows)) # 1
```
## Cuando usarla
- Justo despues de capturar trafico con `web_proxy` / `query_mitm_flows --har`, cuando quieres **destilar** un HAR ruidoso a los pocos flujos que componen una accion (login, alta, transferencia) antes de convertirla en una funcion reproducible del registry.
- Cuando necesitas quedarte solo con las llamadas de API (`hosts=[...]`, `methods=["POST","PUT"]`) y descartar de un golpe estaticos y telemetria.
- Como primer paso del patron **grabar -> destilar -> reproducir** un flujo web: graba con el proxy, filtra con esta funcion, y reproduce los entries resultantes.
## Gotchas
- Funcion pura: no hace I/O. Recibe el HAR ya parseado (carga el `.har` con `json.load` antes de llamarla).
- La blocklist de analytics es **heuristica y ampliable** (substring sobre el host). Si un dominio de telemetria propio no esta en la lista, no se descarta; pasa `drop_analytics=False` y filtra a mano, o amplia la blocklist en el codigo.
- El filtro `hosts` es **match exacto de host** (no substring, no subdominios): `api.example.com` no captura `www.api.example.com`. Lista cada host que quieras conservar.
- El host se obtiene con `urllib.parse.urlsplit(...).hostname`; URLs sin host valido cuentan como host vacio `""`.
@@ -0,0 +1,148 @@
"""Filtra los flujos relevantes de un HAR para reconstruir una accion HTTP.
Primer paso del patron "grabar -> destilar -> reproducir": dado un HAR
(formato estandar W3C, el que exporta `query_mitm_flows --har` de mitmproxy),
descarta el ruido (recursos estaticos, dominios de analytics/telemetria) y
opcionalmente restringe por host y metodo, dejando solo los entries que
importan para reproducir una accion como funcion del registry.
Funcion pura: recibe el HAR ya parseado como dict, no hace I/O y no muta los
dicts de entrada (devuelve un subconjunto de los entries originales tal cual).
"""
from urllib.parse import urlsplit
# Extensiones de recursos estaticos (sobre el path, ignorando querystring).
_STATIC_EXTENSIONS = (
".css",
".js",
".mjs",
".map",
".png",
".jpg",
".jpeg",
".gif",
".svg",
".webp",
".ico",
".woff",
".woff2",
".ttf",
".eot",
)
# Prefijos de mimeType considerados estaticos.
_STATIC_MIME_PREFIXES = (
"image/",
"font/",
"text/css",
)
# mimeTypes exactos considerados estaticos (JavaScript en sus dos formas).
_STATIC_MIME_EXACT = (
"application/javascript",
"text/javascript",
)
# Blocklist heuristica de dominios de telemetria/analytics (substring sobre host).
_ANALYTICS_BLOCKLIST = (
"google-analytics.com",
"googletagmanager.com",
"analytics.google.com",
"doubleclick.net",
"facebook.com/tr",
"connect.facebook.net",
"sentry.io",
"segment.io",
"segment.com",
"mixpanel.com",
"hotjar.com",
"fullstory.com",
"clarity.ms",
"cdn.amplitude.com",
"stats.g.doubleclick.net",
"datadoghq.com",
"bugsnag.com",
)
def _is_static(entry: dict) -> bool:
"""True si el entry HAR es un recurso estatico (por mimeType o extension)."""
mime = (
entry.get("response", {})
.get("content", {})
.get("mimeType", "")
)
mime = (mime or "").split(";", 1)[0].strip().lower()
if mime:
if mime.startswith(_STATIC_MIME_PREFIXES):
return True
if mime in _STATIC_MIME_EXACT:
return True
url = entry.get("request", {}).get("url", "") or ""
path = urlsplit(url).path.lower()
return path.endswith(_STATIC_EXTENSIONS)
def _is_analytics(host: str) -> bool:
"""True si el host coincide (substring) con la blocklist de analytics."""
host = (host or "").lower()
return any(blocked in host for blocked in _ANALYTICS_BLOCKLIST)
def har_filter_flows(
har: dict,
*,
hosts: list[str] | None = None,
methods: list[str] | None = None,
drop_static: bool = True,
drop_analytics: bool = True,
) -> list[dict]:
"""Devuelve solo los entries HAR relevantes para reproducir una accion HTTP.
Args:
har: HAR ya parseado como dict (formato W3C). Se leen los entries de
`har["log"]["entries"]`; si la estructura no existe, devuelve [].
hosts: si no es None, mantiene solo entries cuyo host (de urlsplit)
este en la lista (match exacto de host).
methods: si no es None, mantiene solo entries cuyo metodo HTTP este en
la lista (ambos lados normalizados a mayusculas).
drop_static: si True, descarta recursos estaticos (CSS/JS/imagenes/
fuentes) por mimeType o por extension de la ruta.
drop_analytics: si True, descarta entries cuyo host caiga en la
blocklist de dominios de telemetria/analytics.
Returns:
Sublista de los dicts de entrada (los entries HAR originales, sin
mutar) que pasan todos los filtros.
"""
entries = har.get("log", {}).get("entries")
if not isinstance(entries, list):
return []
methods_upper = (
{m.upper() for m in methods} if methods is not None else None
)
hosts_set = set(hosts) if hosts is not None else None
result: list[dict] = []
for entry in entries:
request = entry.get("request", {})
url = request.get("url", "") or ""
host = urlsplit(url).hostname or ""
if drop_static and _is_static(entry):
continue
if drop_analytics and _is_analytics(host):
continue
if hosts_set is not None and host not in hosts_set:
continue
if methods_upper is not None:
method = (request.get("method", "") or "").upper()
if method not in methods_upper:
continue
result.append(entry)
return result
@@ -0,0 +1,93 @@
"""Tests para har_filter_flows."""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from har_filter_flows import har_filter_flows
def _entry(method: str, url: str, mime: str = "application/json") -> dict:
return {
"request": {"method": method, "url": url},
"response": {"content": {"mimeType": mime}},
}
def _mixed_har() -> dict:
return {
"log": {
"entries": [
# Recursos estaticos por mimeType.
_entry("GET", "https://app.example.com/styles/main.css", "text/css"),
_entry("GET", "https://app.example.com/bundle.js", "application/javascript"),
_entry("GET", "https://cdn.example.com/logo.png", "image/png"),
_entry("GET", "https://cdn.example.com/font.woff2", "font/woff2"),
# Estatico por extension de la ruta aunque el mime no lo delate.
_entry("GET", "https://cdn.example.com/icons/star.svg?v=2", "application/octet-stream"),
# Analytics / telemetria.
_entry("POST", "https://www.google-analytics.com/collect", "application/json"),
_entry("GET", "https://browser.sentry.io/api/123/envelope/", "application/json"),
# El POST de API que queremos reproducir.
_entry("POST", "https://api.example.com/v1/login", "application/json"),
]
}
}
def test_golden_solo_sobrevive_el_post_de_api():
flows = har_filter_flows(_mixed_har())
assert len(flows) == 1
assert flows[0]["request"]["method"] == "POST"
assert flows[0]["request"]["url"] == "https://api.example.com/v1/login"
def test_har_vacio_devuelve_lista_vacia():
assert har_filter_flows({}) == []
def test_har_sin_log_entries_devuelve_lista_vacia():
assert har_filter_flows({"log": {}}) == []
assert har_filter_flows({"log": {"entries": None}}) == []
def test_filtro_por_hosts():
har = {
"log": {
"entries": [
_entry("POST", "https://api.example.com/v1/login"),
_entry("POST", "https://other.example.com/v1/track"),
]
}
}
flows = har_filter_flows(har, hosts=["api.example.com"])
assert len(flows) == 1
assert flows[0]["request"]["url"] == "https://api.example.com/v1/login"
def test_filtro_por_methods():
har = {
"log": {
"entries": [
_entry("GET", "https://api.example.com/v1/me"),
_entry("POST", "https://api.example.com/v1/login"),
_entry("post", "https://api.example.com/v1/refresh"),
]
}
}
flows = har_filter_flows(har, methods=["post"])
assert len(flows) == 2
assert {f["request"]["url"] for f in flows} == {
"https://api.example.com/v1/login",
"https://api.example.com/v1/refresh",
}
def test_no_muta_los_entries_de_entrada():
har = _mixed_har()
original_count = len(har["log"]["entries"])
flows = har_filter_flows(har)
assert len(har["log"]["entries"]) == original_count
# El entry devuelto es el mismo objeto, no una copia.
assert flows[0] is har["log"]["entries"][-1]
+2
View File
@@ -1,8 +1,10 @@
from .setup_logger import setup_logger, get_logger
from .generate_app_icon import generate_app_icon
from .http_replay_sequence import http_replay_sequence
__all__ = [
"setup_logger",
"get_logger",
"generate_app_icon",
"http_replay_sequence",
]
+8 -1
View File
@@ -12,9 +12,12 @@ import io
import os
from pathlib import Path
import cairosvg
from PIL import Image, ImageDraw
# cairosvg se importa de forma perezosa dentro de los renderers que lo usan
# (ver _render_glyph_*). Asi el modulo (y el paquete infra que lo reexporta)
# se importa sin requerir cairosvg instalado; solo rasterizar SVGs lo exige.
DEFAULT_SIZES = [16, 24, 32, 48, 64, 128, 256]
@@ -62,6 +65,8 @@ def _luminance(accent_hex: str) -> float:
def _render_glyph_colored(svg_path: Path, size: int, fill: str) -> Image.Image:
"""Renderiza un SVG Phosphor reemplazando currentColor por `fill`."""
import cairosvg
svg = svg_path.read_text(encoding="utf-8")
svg = svg.replace('fill="currentColor"', f'fill="{fill}"')
png_bytes = cairosvg.svg2png(
@@ -82,6 +87,8 @@ def _render_glyph_white(svg_path: Path, size: int) -> Image.Image:
Returns:
Imagen RGBA con el glyph en blanco sobre fondo transparente.
"""
import cairosvg
svg = svg_path.read_text(encoding="utf-8")
# Phosphor usa fill="currentColor" — forzar blanco.
svg = svg.replace('fill="currentColor"', 'fill="#ffffff"')
@@ -0,0 +1,87 @@
---
name: http_replay_sequence
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def http_replay_sequence(calls: list[dict], *, params: dict | None = None, extract: list[dict] | None = None, timeout_s: float = 30.0, verify_tls: bool = True, allow_redirects: bool = True, base_headers: dict | None = None) -> dict"
description: "Motor de replay HTTP: ejecuta en orden una secuencia de call specs (las que produce har_extract_calls_py_cybersecurity) compartiendo una sesion (cookie jar) entre pasos, con substitucion de parametros {{param}} y extraccion de valores de una respuesta para usarlos en pasos siguientes (p.ej. token CSRF del GET inicial -> header del POST). Pieza reutilizable del Nivel 1 (HTTP puro) del patron grabar->destilar->reproducir."
tags: [flow-replay, http, replay, client, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [re, requests]
tested: true
tests: ["test_golden_extract_and_subst", "test_edge_missing_param", "test_error_path_request_exception"]
test_file_path: "python/functions/infra/http_replay_sequence_test.py"
file_path: "python/functions/infra/http_replay_sequence.py"
params:
- name: calls
desc: "Lista ordenada de call specs. Cada spec: {method, url, headers(dict), cookies(dict opc), body(str|None), body_type:'json'|'form'|'raw'|None}. El body ya es texto (no se re-serializa). Es el formato de salida de har_extract_calls_py_cybersecurity."
- name: params
desc: "Dict inicial de contexto para la substitucion {{param}}. Se copia (no se muta el original). Pasa aqui secretos/tokens desde un vault/pass, nunca hardcodeados en los call specs."
- name: extract
desc: "Lista de reglas de extraccion {from: int|'last', type: 'json'|'regex'|'header'|'set_cookie', expr: str, as: str}. Se aplican justo tras ejecutar el step indicado en 'from' ('last' = el step recien ejecutado) y guardan el valor en ctx[as] para los pasos siguientes."
- name: timeout_s
desc: "Timeout por request en segundos (default: 30.0)."
- name: verify_tls
desc: "Verificar certificados TLS; se setea en la sesion (default: True). No desactivar salvo entorno de pruebas controlado."
- name: allow_redirects
desc: "Si los requests siguen redirects (default: True)."
- name: base_headers
desc: "Headers por defecto que se mezclan en la sesion (se aplican a todos los pasos). Util para User-Agent / Accept comunes."
output: "Dict {status: 'ok'|'error', steps: [{idx, method, url, status_code, ok, extracted, missing_params, error}], params_final: dict (ctx tras todos los pasos), error: str}. status='error' solo ante excepcion de transporte (requests.RequestException) o entrada invalida; en ese caso corta y deja de ejecutar. Un 4xx/5xx NO corta: el step queda con ok=False y status global sigue 'ok'."
---
## Ejemplo
```python
from infra import http_replay_sequence
# Requiere red: usa httpbin.org (publico). 2 pasos:
# 1) GET /uuid -> extrae el uuid del JSON como param "u"
# 2) POST /anything -> manda header X-Token: {{u}} (el uuid del paso 1)
calls = [
{"method": "GET", "url": "https://httpbin.org/uuid",
"headers": {"Accept": "application/json"}, "body": None, "body_type": None},
{"method": "POST", "url": "https://httpbin.org/anything",
"headers": {"X-Token": "{{u}}", "Content-Type": "application/json"},
"body": '{"hello": "world"}', "body_type": "json"},
]
extract = [
{"from": 0, "type": "json", "expr": "uuid", "as": "u"},
]
result = http_replay_sequence(calls, extract=extract)
print(result["status"]) # "ok"
token = result["params_final"]["u"] # el uuid extraido del paso 0
print("token:", token)
# httpbin /anything devuelve los headers que recibio; comprobamos que el
# paso 2 llevo el valor substituido:
print(result["steps"][1]["status_code"]) # 200
print(result["steps"][1]["ok"]) # True
# El header X-Token: {{u}} se substituyo por el uuid antes de enviarse.
```
## Cuando usarla
Usala tras `har_extract_calls_py_cybersecurity`, para validar que un flujo capturado se reproduce SIN navegador (Nivel 1 del patron grabar->destilar->reproducir). Es la base de las funciones-accion guardadas en el registry: cuando una secuencia HTTP demuestra reproducir un login + accion, se promueve a una funcion/pipeline dedicada. Tambien sirve para encadenar requests dependientes (token CSRF, session id, paginacion con cursor) compartiendo cookie jar y propagando valores entre pasos.
## Gotchas
- **Seguridad — secretos via params, nunca hardcodeados.** Los call specs pueden contener cookies/tokens. El caller debe inyectarlos via `{{param}}` desde un vault/pass (`params={...}`), no escribirlos en los specs ni commitearlos.
- **Seguridad — replay con efectos es PELIGROSO.** Reproducir una secuencia con efectos (POST que reinicia un server, borra, paga, envia) ejecuta esos efectos de verdad. El caller debe confirmar antes de lanzar una secuencia mutante.
- **Seguridad — `verify_tls` default True.** No lo pongas en False salvo en un entorno de pruebas controlado; desactivar la verificacion TLS abre la puerta a MITM.
- **Extraccion JSON es dot-path simple, NO jsonpath completo.** `"data.items.0.token"` funciona (claves + indices de lista por digito), pero no hay filtros, wildcards ni expresiones. Para casos complejos, usa `type: regex` o post-procesa.
- **Sigue redirects por defecto** (`allow_redirects=True`). Si la secuencia capturada depende del 302 explicito (p.ej. para leer el Location o una cookie intermedia), pon `allow_redirects=False`.
- **Params faltantes NO abortan.** Si un `{{nombre}}` no esta en ctx, se deja el literal `{{nombre}}` y se anade a `step.missing_params`. El request se envia igual; solo una excepcion de transporte corta la ejecucion.
- **El body no se re-serializa.** `body_type: "json"` solo documenta el tipo; el body ya es texto y se manda como `data=body`. Asegurate de incluir el header `Content-Type` adecuado en el spec.
- **4xx/5xx no es error global.** El step queda `ok=False` con su `status_code`, pero `status` global sigue `"ok"`. Solo `requests.RequestException` (DNS, conexion, timeout) marca `status="error"` y corta.
## Capability growth log
v1.0.0 — version inicial.
@@ -0,0 +1,252 @@
"""HTTP replay engine: reproduce an ordered sequence of captured HTTP calls.
This is the reusable core of Level 1 ("pure HTTP") of the record -> distill ->
replay pattern. It takes the call specs produced by
``har_extract_calls_py_cybersecurity`` and replays them in order over a single
``requests.Session`` (shared cookie jar), supporting ``{{param}}`` substitution
and extracting values from one response to feed later steps (e.g. a CSRF token
from the initial GET injected as a header in a subsequent POST).
"""
import re
import requests
_PLACEHOLDER_RE = re.compile(r"\{\{\s*([A-Za-z0-9_]+)\s*\}\}")
def _subst(value, ctx, missing):
"""Replace every ``{{name}}`` occurrence in ``value`` using ``ctx``.
If a referenced param is missing from ``ctx``, the literal ``{{name}}`` is
kept untouched and the name is appended to ``missing`` (deduplicated).
Non-string values are returned unchanged.
"""
if not isinstance(value, str):
return value
def repl(match: "re.Match") -> str:
name = match.group(1)
if name in ctx and ctx[name] is not None:
return str(ctx[name])
if name not in missing:
missing.append(name)
return match.group(0)
return _PLACEHOLDER_RE.sub(repl, value)
def _subst_dict(d, ctx, missing):
"""Apply ``_subst`` to every value of a dict, returning a new dict."""
if not d:
return {}
out = {}
for k, v in d.items():
out[k] = _subst(v, ctx, missing)
return out
def _json_dot_path(data, expr: str):
"""Walk a simple dot-path over a parsed JSON value.
Supports dict keys and list indices: ``"data.items.0.token"``. A segment
that is all digits is treated as a list index. Returns the value or ``None``
if any segment cannot be resolved.
"""
cur = data
for seg in expr.split("."):
if seg == "":
continue
if isinstance(cur, list) and seg.isdigit():
idx = int(seg)
if 0 <= idx < len(cur):
cur = cur[idx]
else:
return None
elif isinstance(cur, dict) and seg in cur:
cur = cur[seg]
else:
return None
return cur
def _apply_extract_rule(rule, resp, session):
"""Resolve a single extract rule against a response. Returns str value or "".
Rule types:
- json: dot-path over ``resp.json()``.
- regex: ``re.search`` over ``resp.text``; group(1) if present, else group(0).
- header: ``resp.headers.get(expr)``.
- set_cookie: ``session.cookies.get(expr)``.
"""
rtype = rule.get("type", "json")
expr = rule.get("expr", "")
try:
if rtype == "json":
value = _json_dot_path(resp.json(), expr)
return "" if value is None else str(value)
if rtype == "regex":
m = re.search(expr, resp.text)
if not m:
return ""
if m.groups():
return "" if m.group(1) is None else str(m.group(1))
return str(m.group(0))
if rtype == "header":
value = resp.headers.get(expr)
return "" if value is None else str(value)
if rtype == "set_cookie":
value = session.cookies.get(expr)
return "" if value is None else str(value)
except (ValueError, TypeError):
return ""
return ""
def http_replay_sequence(
calls: list[dict],
*,
params: dict | None = None,
extract: list[dict] | None = None,
timeout_s: float = 30.0,
verify_tls: bool = True,
allow_redirects: bool = True,
base_headers: dict | None = None,
) -> dict:
"""Replay an ordered sequence of HTTP call specs over a shared session.
Args:
calls: List of call specs, each
``{"method","url","headers"(dict),"cookies"(dict opc),"body"(str|None),
"body_type":"json"|"form"|"raw"|None}``.
params: Initial context dict for ``{{param}}`` substitution (copied).
extract: List of extract rules
``{"from": int|"last", "type": "json"|"regex"|"header"|"set_cookie",
"expr": str, "as": str}``. Applied right after the referenced step runs.
timeout_s: Per-request timeout in seconds.
verify_tls: Whether to verify TLS certificates (set on the session).
allow_redirects: Whether requests should follow redirects.
base_headers: Default headers merged into the session.
Returns:
Dict with ``status`` ("ok"|"error"), ``steps`` (per-step records),
``params_final`` (the context after all steps) and ``error`` (message
when ``status == "error"``).
"""
ctx: dict = dict(params) if params else {}
extract = extract or []
steps: list[dict] = []
# Validate input shape before opening a session.
if not isinstance(calls, list):
return {
"status": "error",
"steps": [],
"params_final": ctx,
"error": "calls must be a list of call specs",
}
session = requests.Session()
session.verify = verify_tls
if base_headers:
session.headers.update(base_headers)
status = "ok"
error_msg = ""
try:
for i, call in enumerate(calls):
if not isinstance(call, dict):
status = "error"
error_msg = f"step {i}: call spec must be a dict"
steps.append(
{
"idx": i,
"method": "",
"url": "",
"status_code": 0,
"ok": False,
"extracted": {},
"missing_params": [],
"error": "call spec must be a dict",
}
)
break
missing: list[str] = []
method = (call.get("method") or "GET").upper()
url = _subst(call.get("url") or "", ctx, missing)
headers = _subst_dict(call.get("headers"), ctx, missing)
cookies = _subst_dict(call.get("cookies"), ctx, missing)
body = _subst(call.get("body"), ctx, missing)
body_type = call.get("body_type")
kwargs: dict = {
"headers": headers or None,
"cookies": cookies or None,
"timeout": timeout_s,
"allow_redirects": allow_redirects,
}
# json/form/raw all send the body as-is via data= (the body is
# already a serialized string; do NOT re-serialize JSON).
if body is not None:
kwargs["data"] = body
try:
resp = session.request(method, url, **kwargs)
except requests.RequestException as exc:
status = "error"
error_msg = f"step {i}: {exc}"
steps.append(
{
"idx": i,
"method": method,
"url": url,
"status_code": 0,
"ok": False,
"extracted": {},
"missing_params": missing,
"error": str(exc),
}
)
break
code = resp.status_code
ok = 200 <= code < 400
# Apply extract rules targeting this step. "last" == the step just run.
extracted: dict = {}
extract_notes: list[str] = []
for rule in extract:
frm = rule.get("from")
if frm == "last" or frm == i:
as_name = rule.get("as")
if not as_name:
continue
value = _apply_extract_rule(rule, resp, session)
ctx[as_name] = value
extracted[as_name] = value
if value == "":
extract_notes.append(f"extract '{as_name}' not found")
steps.append(
{
"idx": i,
"method": method,
"url": url,
"status_code": code,
"ok": ok,
"extracted": extracted,
"missing_params": missing,
"error": "; ".join(extract_notes),
}
)
finally:
session.close()
return {
"status": status,
"steps": steps,
"params_final": ctx,
"error": error_msg,
}
@@ -0,0 +1,120 @@
"""Tests para http_replay_sequence.
No dependen de red: mockean requests.Session.request con unittest.mock para
verificar substitucion, extraccion y manejo de errores de transporte.
"""
from unittest.mock import patch
import requests
from .http_replay_sequence import http_replay_sequence
class _FakeResp:
"""Respuesta minima que imita lo que usa la funcion de requests.Response."""
def __init__(self, status_code=200, json_data=None, text="", headers=None):
self.status_code = status_code
self._json = json_data if json_data is not None else {}
self.text = text
self.headers = headers or {}
def json(self):
return self._json
def test_golden_extract_and_subst():
"""2 pasos: extract json del paso 0 -> usado en {{token}} del paso 1.
Verifica que la url y el header del paso 1 llevaron el valor substituido.
"""
sent = [] # captura (method, url, kwargs) de cada request
def fake_request(self, method, url, **kwargs):
sent.append((method, url, kwargs))
if "/uuid" in url:
return _FakeResp(200, json_data={"data": {"items": [{"token": "ABC123"}]}})
return _FakeResp(200, json_data={"echo": True})
calls = [
{"method": "GET", "url": "https://api.example/uuid",
"headers": {"Accept": "application/json"}, "body": None, "body_type": None},
{"method": "POST", "url": "https://api.example/use/{{token}}",
"headers": {"X-Token": "{{token}}"}, "body": '{"k": "v"}', "body_type": "json"},
]
extract = [
{"from": 0, "type": "json", "expr": "data.items.0.token", "as": "token"},
]
with patch.object(requests.Session, "request", fake_request):
result = http_replay_sequence(calls, extract=extract)
assert result["status"] == "ok"
assert result["error"] == ""
assert result["params_final"]["token"] == "ABC123"
# Paso 0 extrajo el token.
assert result["steps"][0]["extracted"] == {"token": "ABC123"}
assert result["steps"][0]["ok"] is True
# Paso 1: la URL fue substituida.
method1, url1, kwargs1 = sent[1]
assert method1 == "POST"
assert url1 == "https://api.example/use/ABC123"
# El header X-Token llevo el valor substituido.
assert kwargs1["headers"]["X-Token"] == "ABC123"
# El body se manda como data= sin re-serializar.
assert kwargs1["data"] == '{"k": "v"}'
assert result["steps"][1]["ok"] is True
assert result["steps"][1]["missing_params"] == []
def test_edge_missing_param():
"""Param faltante -> missing_params poblado y literal {{x}} intacto."""
sent = []
def fake_request(self, method, url, **kwargs):
sent.append((method, url, kwargs))
return _FakeResp(200, json_data={})
calls = [
{"method": "GET", "url": "https://api.example/path/{{missing}}",
"headers": {"X-H": "{{missing}}"}, "body": None, "body_type": None},
]
with patch.object(requests.Session, "request", fake_request):
result = http_replay_sequence(calls)
assert result["status"] == "ok"
# El literal {{missing}} queda intacto tanto en url como en header.
method0, url0, kwargs0 = sent[0]
assert url0 == "https://api.example/path/{{missing}}"
assert kwargs0["headers"]["X-H"] == "{{missing}}"
# El step registra el param faltante (deduplicado, una sola vez).
assert result["steps"][0]["missing_params"] == ["missing"]
def test_error_path_request_exception():
"""La sesion lanza requests.RequestException -> status=error, corta, step.error poblado."""
def fake_request(self, method, url, **kwargs):
raise requests.RequestException("connection refused")
calls = [
{"method": "GET", "url": "https://down.example/a", "headers": {},
"body": None, "body_type": None},
{"method": "GET", "url": "https://down.example/b", "headers": {},
"body": None, "body_type": None},
]
with patch.object(requests.Session, "request", fake_request):
result = http_replay_sequence(calls)
assert result["status"] == "error"
assert "connection refused" in result["error"]
# Corta tras la excepcion: solo se registro el primer step.
assert len(result["steps"]) == 1
assert result["steps"][0]["ok"] is False
assert result["steps"][0]["status_code"] == 0
assert "connection refused" in result["steps"][0]["error"]