618e3b0295
- CAPABILITIES_TODO.md - demo_e2e/RESUMEN.md - demo_e2e/results/prueba_1_quotes.json - demo_e2e/results/prueba_2_perceive.json - demo_e2e/results/prueba_3_search.json - demo_e2e/results/prueba_4_login_session.json - demo_e2e/results/prueba_5_books.json - demo_e2e/results/prueba_6_session_storage.json - demo_e2e/results/prueba_7_find_honesto.json - demo_e2e/results/prueba_8_verificacion.json - ... Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
378 lines
18 KiB
Python
378 lines
18 KiB
Python
"""Ejecuta 5 pruebas e2e graduadas contra el servidor browser_mcp para validar
|
|
las capacidades de control de navegador (CDP) sobre sitios sandbox estables.
|
|
|
|
Cada prueba se conecta al Chrome aislado del MCP en el puerto 9333 (que debe
|
|
estar ya corriendo) y ejerce un conjunto de tools. Los resultados (pasos,
|
|
respuestas, veredicto y datos extraídos) se guardan en results/.
|
|
|
|
Uso:
|
|
python3 run_demo.py
|
|
Requisitos:
|
|
- Chrome/Chromium headless en CDP 9333 (Chrome aislado del MCP).
|
|
- Binario browser_mcp compilado.
|
|
- FN_REGISTRY_ROOT para que la tool page_perceive pueda invocar fn run.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import time
|
|
|
|
from mcp_client import MCPClient
|
|
|
|
ROOT = "/home/enmanuel/fn_registry"
|
|
EXE = os.path.join(ROOT, "projects/web_scraping/apps/browser_mcp/browser_mcp")
|
|
RESULTS = os.path.join(os.path.dirname(__file__), "results")
|
|
PORT = 9333
|
|
|
|
os.makedirs(RESULTS, exist_ok=True)
|
|
|
|
|
|
class Recorder:
|
|
def __init__(self, client, log):
|
|
self.c = client
|
|
self.log = log
|
|
self.steps = []
|
|
|
|
def step(self, tool, args, timeout=60):
|
|
t0 = time.time()
|
|
try:
|
|
text, is_err = self.c.call(tool, args, timeout=timeout)
|
|
except Exception as e:
|
|
text, is_err = f"EXCEPTION: {e}", True
|
|
dt = round((time.time() - t0) * 1000)
|
|
rec = {"tool": tool, "args": args, "ms": dt, "is_error": is_err,
|
|
"response": text[:2000]}
|
|
self.steps.append(rec)
|
|
self.log.write(f" [{tool}] {dt}ms err={is_err} -> {text[:160]}\n")
|
|
self.log.flush()
|
|
return text, is_err
|
|
|
|
|
|
def save(name, payload):
|
|
path = os.path.join(RESULTS, name)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def prueba_1_quotes(c, log):
|
|
"""Extraer citas estructuradas (valida fix %v: array JS -> JSON real)."""
|
|
r = Recorder(c, log)
|
|
r.step("tab_navigate", {"port": PORT, "url": "https://quotes.toscrape.com"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
expr = ("[...document.querySelectorAll('.quote')].map(q=>({"
|
|
"text:q.querySelector('.text').innerText,"
|
|
"author:q.querySelector('.author').innerText,"
|
|
"tags:[...q.querySelectorAll('.tag')].map(t=>t.innerText)}))")
|
|
text, err = r.step("page_eval_js", {"port": PORT, "expression": expr})
|
|
quotes = []
|
|
try:
|
|
quotes = json.loads(text)
|
|
except Exception:
|
|
pass
|
|
ok = (not err) and isinstance(quotes, list) and len(quotes) >= 10 \
|
|
and all("author" in q for q in quotes)
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_1_quotes.json", {
|
|
"name": "1 - Extraer citas estructuradas (quotes.toscrape.com)",
|
|
"verdict": verdict,
|
|
"extracted_count": len(quotes),
|
|
"sample": quotes[:3],
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"{len(quotes)} citas"
|
|
|
|
|
|
def prueba_2_perceive(c, log):
|
|
"""Percibir página como outline AX accionable (P0.1)."""
|
|
r = Recorder(c, log)
|
|
r.step("tab_navigate", {"port": PORT, "url": "https://the-internet.herokuapp.com"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
text, err = r.step("page_perceive", {"port": PORT, "max_chars": 4000}, timeout=90)
|
|
has_refs = "#ref=" in text
|
|
has_link = "link" in text.lower()
|
|
ok = (not err) and has_refs and has_link and len(text) > 100
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_2_perceive.json", {
|
|
"name": "2 - Percibir página (page_perceive AX outline)",
|
|
"verdict": verdict,
|
|
"has_refs": has_refs, "has_link": has_link, "outline_len": len(text),
|
|
"outline_preview": text[:1500],
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"outline {len(text)} chars, refs={has_refs}"
|
|
|
|
|
|
def prueba_3_search(c, log):
|
|
"""Submit de formulario con teclado: type + press_key Enter (sin click submit)."""
|
|
r = Recorder(c, log)
|
|
# Form HTML normal (the-internet/login): tras escribir credenciales, Enter
|
|
# envía el form. Valida type + press_key Enter de forma fiable, sin depender
|
|
# de un widget JS (como el typeahead de Wikipedia, que ignora el keyevent).
|
|
base = "https://the-internet.herokuapp.com"
|
|
r.step("cookie_clear", {"port": PORT})
|
|
r.step("tab_navigate", {"port": PORT, "url": base + "/login"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
r.step("dom_click", {"port": PORT, "selector": "#username"})
|
|
r.step("dom_type", {"port": PORT, "text": "tomsmith"})
|
|
r.step("dom_click", {"port": PORT, "selector": "#password"})
|
|
r.step("dom_type", {"port": PORT, "text": "SuperSecretPassword!"})
|
|
r.step("press_key", {"port": PORT, "key": "Enter"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
r.step("dom_wait_element", {"port": PORT, "selector": "#flash", "timeout_ms": 8000})
|
|
flash, err = r.step("page_get_text", {"port": PORT, "selector": "#flash", "max_bytes": 200})
|
|
ok = (not err) and ("logged into" in flash.lower())
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_3_search.json", {
|
|
"name": "3 - Submit de formulario con teclado Enter (the-internet/login)",
|
|
"verdict": verdict,
|
|
"flash": flash.strip(),
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"flash='{flash.strip()[:40]}'"
|
|
|
|
|
|
def prueba_4_login_session(c, log):
|
|
"""Login + persistir sesión: storage_save -> cookie_clear -> storage_load."""
|
|
r = Recorder(c, log)
|
|
base = "https://the-internet.herokuapp.com"
|
|
# Sesión limpia: las cookies de pruebas previas (otros dominios) no deben
|
|
# contaminar el storage_state que guardaremos.
|
|
r.step("cookie_clear", {"port": PORT})
|
|
r.step("tab_navigate", {"port": PORT, "url": base + "/login"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
r.step("dom_click", {"port": PORT, "selector": "#username"})
|
|
r.step("dom_type", {"port": PORT, "text": "tomsmith"})
|
|
r.step("dom_click", {"port": PORT, "selector": "#password"})
|
|
r.step("dom_type", {"port": PORT, "text": "SuperSecretPassword!"})
|
|
r.step("dom_click", {"port": PORT, "selector": "button[type=submit]"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
r.step("dom_wait_element", {"port": PORT, "selector": "#flash", "timeout_ms": 8000})
|
|
flash1, _ = r.step("page_get_text", {"port": PORT, "selector": "#flash", "max_bytes": 300})
|
|
# "logged into" sólo aparece en el flash de ÉXITO; evita colisión con el
|
|
# mensaje de error "view the secure area" que contiene "secure area".
|
|
logged_in = "logged into" in flash1.lower()
|
|
# Guardar sesión, limpiar cookies, restaurar.
|
|
r.step("storage_save", {"port": PORT, "path": "/tmp/demo_session.json"})
|
|
r.step("cookie_clear", {"port": PORT})
|
|
# Tras limpiar cookies, /secure debe expulsar a login.
|
|
r.step("tab_navigate", {"port": PORT, "url": base + "/secure"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
after_clear, _ = r.step("page_get_text", {"port": PORT, "selector": "#flash", "max_bytes": 300})
|
|
kicked = "must login" in after_clear.lower()
|
|
# Restaurar sesión: navegar al dominio, load, volver a /secure.
|
|
r.step("tab_navigate", {"port": PORT, "url": base})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
r.step("storage_load", {"port": PORT, "path": "/tmp/demo_session.json"})
|
|
r.step("tab_navigate", {"port": PORT, "url": base + "/secure"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
# Check robusto (no #flash, que sufre timing): si seguimos en /secure y el
|
|
# body menciona "Secure Area", la sesión se restauró; si nos echó, pathname
|
|
# vuelve a "/".
|
|
probe, _ = r.step("page_eval_js", {"port": PORT, "expression":
|
|
"JSON.stringify({path:location.pathname,secure:document.body.innerText.includes('Secure Area')})"})
|
|
flash2 = probe
|
|
try:
|
|
pj = json.loads(json.loads(probe) if probe.strip().startswith('"') else probe)
|
|
except Exception:
|
|
pj = {}
|
|
restored = (pj.get("path") == "/secure") and bool(pj.get("secure"))
|
|
ok = logged_in and kicked and restored
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_4_login_session.json", {
|
|
"name": "4 - Login + sesión persistente (storage_state)",
|
|
"verdict": verdict,
|
|
"logged_in": logged_in, "kicked_after_clear": kicked, "restored_after_load": restored,
|
|
"flash_login": flash1[:200], "flash_after_clear": after_clear[:200], "flash_restored": flash2[:200],
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"login={logged_in} restore={restored}"
|
|
|
|
|
|
def prueba_5_books(c, log):
|
|
"""Scraping paginado multi-página + dedup (books.toscrape.com, 3 páginas)."""
|
|
r = Recorder(c, log)
|
|
all_books = []
|
|
for page in (1, 2, 3):
|
|
url = f"https://books.toscrape.com/catalogue/page-{page}.html"
|
|
r.step("tab_navigate", {"port": PORT, "url": url})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
expr = ("[...document.querySelectorAll('.product_pod')].map(b=>({"
|
|
"title:b.querySelector('h3 a').getAttribute('title'),"
|
|
"price:b.querySelector('.price_color').innerText,"
|
|
"stock:b.querySelector('.availability').innerText.trim()}))")
|
|
text, err = r.step("page_eval_js", {"port": PORT, "expression": expr})
|
|
try:
|
|
all_books.extend(json.loads(text))
|
|
except Exception:
|
|
pass
|
|
unique = {b["title"]: b for b in all_books if isinstance(b, dict) and b.get("title")}
|
|
ok = len(unique) >= 60
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_5_books.json", {
|
|
"name": "5 - Scraping paginado + dedup (books.toscrape.com, 3 páginas)",
|
|
"verdict": verdict,
|
|
"total_scraped": len(all_books), "unique_count": len(unique),
|
|
"sample": list(unique.values())[:3],
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"{len(unique)} libros únicos"
|
|
|
|
|
|
def prueba_6_session_storage(c, log):
|
|
"""sessionStorage en storage_state: set -> save -> clear -> load -> get (fix D)."""
|
|
r = Recorder(c, log)
|
|
r.step("tab_navigate", {"port": PORT, "url": "https://the-internet.herokuapp.com"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
r.step("page_eval_js", {"port": PORT, "expression":
|
|
"window.sessionStorage.setItem('demo_k','demo_v'); 'set'"})
|
|
r.step("storage_save", {"port": PORT, "path": "/tmp/demo_ss.json"})
|
|
r.step("page_eval_js", {"port": PORT, "expression": "window.sessionStorage.clear(); 'cleared'"})
|
|
cleared, _ = r.step("page_eval_js", {"port": PORT, "expression":
|
|
"String(window.sessionStorage.getItem('demo_k'))"})
|
|
r.step("storage_load", {"port": PORT, "path": "/tmp/demo_ss.json"})
|
|
got, err = r.step("page_eval_js", {"port": PORT, "expression":
|
|
"String(window.sessionStorage.getItem('demo_k'))"})
|
|
# Verificar también que el JSON guardado incluye el campo sessionStorage.
|
|
saved_has_ss = False
|
|
try:
|
|
with open("/tmp/demo_ss.json", encoding="utf-8") as f:
|
|
saved_has_ss = json.load(f).get("sessionStorage", {}).get("demo_k") == "demo_v"
|
|
except Exception:
|
|
pass
|
|
ok = (not err) and (cleared.strip() == "null") and ("demo_v" in got) and saved_has_ss
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_6_session_storage.json", {
|
|
"name": "6 - sessionStorage en storage_state (fix D)",
|
|
"verdict": verdict,
|
|
"cleared_value": cleared.strip(), "restored_value": got.strip(), "json_has_sessionstorage": saved_has_ss,
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"clear='{cleared.strip()}' restore='{got.strip()}'"
|
|
|
|
|
|
def prueba_7_find_honesto(c, log):
|
|
"""find_by_text con texto inexistente -> error explícito, no vacío (fix E)."""
|
|
r = Recorder(c, log)
|
|
r.step("tab_navigate", {"port": PORT, "url": "https://quotes.toscrape.com"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
# Texto presente: debe encontrar (no error).
|
|
found, ferr = r.step("dom_find_by_text", {"port": PORT, "text": "Login"})
|
|
# Texto inexistente: debe dar error explícito (antes: vacío sin error).
|
|
miss, merr = r.step("dom_find_by_text", {"port": PORT, "text": "ZZZ_texto_inexistente_42"})
|
|
ok = (not ferr) and bool(found.strip()) and merr and ("no se encontro" in miss.lower())
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_7_find_honesto.json", {
|
|
"name": "7 - find_by_text honesto: error en no-encontrado (fix E)",
|
|
"verdict": verdict,
|
|
"found_present": found.strip()[:80], "missing_is_error": merr, "missing_response": miss.strip()[:120],
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"present_ok={bool(found.strip())} miss_error={merr}"
|
|
|
|
|
|
def prueba_8_verificacion(c, log):
|
|
"""Verificación post-acción: click oculto y type sin foco dan error (fix B / P1)."""
|
|
r = Recorder(c, log)
|
|
r.step("tab_navigate", {"port": PORT, "url": "https://quotes.toscrape.com"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
# Inyectar un botón oculto y comprobar que click da error (no clic al aire).
|
|
r.step("page_eval_js", {"port": PORT, "expression":
|
|
"var b=document.createElement('button');b.id='hidden_btn';b.textContent='x';"
|
|
"b.style.display='none';document.body.appendChild(b);'injected'"})
|
|
_, click_hidden_err = r.step("dom_click", {"port": PORT, "selector": "#hidden_btn"})
|
|
# Quitar el foco y comprobar que type da error (no escribe a la nada).
|
|
r.step("page_eval_js", {"port": PORT, "expression":
|
|
"if(document.activeElement){document.activeElement.blur();} document.body.focus(); 'blurred'"})
|
|
_, type_nofocus_err = r.step("dom_type", {"port": PORT, "text": "fantasma"})
|
|
ok = bool(click_hidden_err) and bool(type_nofocus_err)
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_8_verificacion.json", {
|
|
"name": "8 - Verificación post-acción: click oculto / type sin foco dan error (fix B)",
|
|
"verdict": verdict,
|
|
"click_hidden_error": bool(click_hidden_err),
|
|
"type_nofocus_error": bool(type_nofocus_err),
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"click_hidden_err={bool(click_hidden_err)} type_nofocus_err={bool(type_nofocus_err)}"
|
|
|
|
|
|
def prueba_9_ref_loop(c, log):
|
|
"""Bucle percibir->actuar por #ref: perceive -> type_ref/click_ref (gap #1)."""
|
|
r = Recorder(c, log)
|
|
base = "https://the-internet.herokuapp.com"
|
|
r.step("cookie_clear", {"port": PORT})
|
|
r.step("tab_navigate", {"port": PORT, "url": base + "/login"})
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
outline, _ = r.step("page_perceive", {"port": PORT, "max_chars": 6000}, timeout=90)
|
|
|
|
def ref_for(pattern):
|
|
m = re.search(pattern + r'[^\n]*?#ref=(\d+)', outline)
|
|
return int(m.group(1)) if m else None
|
|
|
|
user_ref = ref_for(r'textbox "Username"')
|
|
pass_ref = ref_for(r'textbox "Password"')
|
|
# El name del botón incluye un icono FontAwesome antes de "Login" ().
|
|
login_ref = ref_for(r'button "[^"]*Login"')
|
|
refs_ok = all(x is not None for x in (user_ref, pass_ref, login_ref))
|
|
|
|
after = ""
|
|
if refs_ok:
|
|
# Actuar SOLO por #ref del outline — sin selector CSS. Cierra el bucle.
|
|
r.step("dom_type_ref", {"port": PORT, "ref": user_ref, "text": "tomsmith"})
|
|
r.step("dom_type_ref", {"port": PORT, "ref": pass_ref, "text": "SuperSecretPassword!"})
|
|
after, _ = r.step("dom_click_ref", {"port": PORT, "ref": login_ref}) # auto-observe → outline nuevo
|
|
r.step("page_wait_load", {"port": PORT, "timeout_ms": 12000})
|
|
|
|
flash, _ = r.step("page_get_text", {"port": PORT, "selector": "#flash", "max_bytes": 200})
|
|
logged = "logged into" in flash.lower()
|
|
# auto-observe: el dom_click_ref devolvió el outline tras la acción.
|
|
auto_observed = ("#ref=" in after) or ("Logout" in after) or (len(after) > 50)
|
|
ok = refs_ok and logged and auto_observed
|
|
verdict = "PASS" if ok else "FAIL"
|
|
save("prueba_9_ref_loop.json", {
|
|
"name": "9 - Bucle percibir->actuar por #ref + auto-observe (gap #1)",
|
|
"verdict": verdict,
|
|
"refs": {"username": user_ref, "password": pass_ref, "login": login_ref},
|
|
"logged_in": logged, "auto_observed": auto_observed,
|
|
"flash": flash.strip(), "after_observe_preview": after[:300],
|
|
"steps": r.steps,
|
|
})
|
|
return verdict, f"refs={refs_ok} logged={logged} auto_observe={auto_observed}"
|
|
|
|
|
|
def main():
|
|
log = open(os.path.join(RESULTS, "run.log"), "w", encoding="utf-8")
|
|
log.write(f"=== Demo e2e browser_mcp @ {time.strftime('%d/%m/%Y %H:%M')} ===\n")
|
|
client = MCPClient(EXE, env={"FN_REGISTRY_ROOT": ROOT}, cwd=ROOT,
|
|
stderr_path=os.path.join(RESULTS, "mcp_stderr.log"))
|
|
init = client.initialize()
|
|
log.write(f"initialize: {json.dumps(init.get('result', {}).get('serverInfo', {}))}\n")
|
|
|
|
pruebas = [prueba_1_quotes, prueba_2_perceive, prueba_3_search,
|
|
prueba_4_login_session, prueba_5_books,
|
|
prueba_6_session_storage, prueba_7_find_honesto,
|
|
prueba_8_verificacion, prueba_9_ref_loop]
|
|
summary = []
|
|
for fn in pruebas:
|
|
name = fn.__doc__.split("\n")[0]
|
|
log.write(f"\n--- {fn.__name__}: {name}\n")
|
|
try:
|
|
verdict, detail = fn(client, log)
|
|
except Exception as e:
|
|
verdict, detail = "ERROR", str(e)
|
|
summary.append({"prueba": fn.__name__, "verdict": verdict, "detail": detail})
|
|
log.write(f" => {verdict} ({detail})\n")
|
|
|
|
client.close()
|
|
save("summary.json", summary)
|
|
log.write("\n=== RESUMEN ===\n")
|
|
for s in summary:
|
|
log.write(f"{s['verdict']:6} {s['prueba']:24} {s['detail']}\n")
|
|
log.close()
|
|
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|