import asyncio import json import base64 import websockets from typing import Optional, List from .ElementoWeb import ElementoWeb import os class Tab: def __init__(self, websocket: websockets.WebSocketClientProtocol, ws_url: str, verbose: bool = True): self.websocket = websocket self.ws_url = ws_url self._message_id = 0 self._pending = {} self._load_event = asyncio.Event() self.verbose = verbose async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): await self.cerrar() @classmethod async def crear_desde_websocket(cls, ws_url: str) -> "Tab": websocket = await websockets.connect(ws_url, max_size=10 * 1024 * 1024) tab = cls(websocket, ws_url) asyncio.create_task(tab._recibir_eventos()) await tab._enviar("Page.enable") await tab._enviar("Network.enable") return tab async def _recibir_eventos(self): async for mensaje in self.websocket: data = json.loads(mensaje) if "id" in data and data["id"] in self._pending: future = self._pending.pop(data["id"]) if "result" in data: future.set_result(data["result"]) elif "error" in data: future.set_exception(Exception(data["error"])) elif data.get("method") == "Page.loadEventFired": self._load_event.set() async def _enviar(self, metodo: str, parametros: Optional[dict] = None, timeout: float = 10.0) -> dict: self._message_id += 1 msg_id = self._message_id mensaje = { "id": msg_id, "method": metodo, "params": parametros or {} } future = asyncio.get_event_loop().create_future() self._pending[msg_id] = future await self.websocket.send(json.dumps(mensaje)) return await asyncio.wait_for(future, timeout=timeout) async def navegar(self, url: str, wait_time: float = 5.0): self._load_event.clear() if self.verbose: print(f"🌍 Navegando a: {url}") await self._enviar("Page.navigate", {"url": url}) try: await asyncio.wait_for(self._load_event.wait(), timeout=wait_time) if self.verbose: print("✅ Página cargada correctamente.") except asyncio.TimeoutError: print(f"⚠️ Tiempo de espera agotado ({wait_time}s) al cargar la página.") async def evaluar_js(self, js_code: str) -> Optional[str]: try: result = await self._enviar("Runtime.evaluate", { "expression": js_code, "returnByValue": True }) if "exceptionDetails" in result: raise Exception(result["exceptionDetails"]) return result.get("result", {}).get("value") except Exception as e: print(f"⚠️ Error al ejecutar JS: {e}") return None async def inyectar_archivo_js(self, ruta_archivo: str, reemplazos: dict = None) -> Optional[str]: if not os.path.exists(ruta_archivo): print(f"❌ Archivo JS no encontrado: {ruta_archivo}") return None with open(ruta_archivo, "r", encoding="utf-8") as f: js_code = f.read() if reemplazos: for key, value in reemplazos.items(): js_code = js_code.replace(f"{{{{{key}}}}}", str(value)) # 🔧 Eliminamos el `return` externo js_code_final = f"(async () => {{\n{js_code}\n}})();" try: result = await self._enviar("Runtime.evaluate", { "expression": js_code_final, "returnByValue": True }) if "exceptionDetails" in result: raise Exception(result["exceptionDetails"]) return result.get("result", {}).get("value") except Exception as e: print(f"⚠️ Error al inyectar JS desde {ruta_archivo}: {e}") return None async def obtener_user_agent(self) -> Optional[str]: return await self.evaluar_js("navigator.userAgent") async def capturar_screenshot(self, output_path: str = "screenshot.png"): try: result = await self._enviar("Page.captureScreenshot") data = result["data"] with open(output_path, "wb") as f: f.write(base64.b64decode(data)) if self.verbose: print(f"📸 Screenshot guardado como {output_path}") except Exception as e: print(f"⚠️ Error al capturar screenshot: {e}") async def cerrar(self): try: if not self.websocket.closed: await self.websocket.close() if self.verbose: print("🛑 WebSocket cerrado.") except Exception as e: print(f"⚠️ Error al cerrar pestaña: {e}") async def obtener_html_completo(self) -> Optional[str]: try: result = await self._enviar("Runtime.evaluate", { "expression": "document.documentElement.outerHTML", "returnByValue": True }) return result.get("result", {}).get("value") except Exception as e: print(f"⚠️ Error al obtener HTML: {e}") return None async def obtener_dominio(self) -> Optional[str]: try: dominio = await self.evaluar_js("window.location.hostname") if self.verbose and dominio: print(f"🌐 Dominio actual: {dominio}") return dominio except Exception as e: print(f"⚠️ Error al obtener dominio: {e}") return None async def get_element_by_selector_node(self, selector: str) -> Optional["ElementoWeb"]: try: doc = await self._enviar("DOM.getDocument") root_node_id = doc["root"]["nodeId"] result = await self._enviar("DOM.querySelector", { "nodeId": root_node_id, "selector": selector }) node_id = result.get("nodeId") if not node_id: print(f"⚠️ Nodo no encontrado con selector: {selector}") return None return ElementoWeb.from_node(self, node_id=node_id) except Exception as e: print(f"⚠️ Error al buscar nodo desde DOM.querySelector: {e}") return None async def get_elements_by_css_selector(self, selector: str) -> List["ElementoWeb"]: try: result = await self._enviar("Runtime.evaluate", { "expression": f'Array.from(document.querySelectorAll("{selector}"))', "objectGroup": "grupo_elementos_css", "includeCommandLineAPI": True, "returnByValue": False }) array_id = result["result"]["objectId"] props = await self._enviar("Runtime.getProperties", { "objectId": array_id, "ownProperties": True }) elementos = [] for prop in props["result"]: if "value" in prop and "objectId" in prop["value"]: elementos.append(ElementoWeb(self, prop["value"]["objectId"])) if self.verbose: print(f"🔍 Se encontraron {len(elementos)} elementos con el selector CSS '{selector}'.") return elementos except Exception as e: print(f"⚠️ Error al buscar elementos por selector CSS '{selector}': {e}") return [] async def enfocar(self): try: await self._enviar("Page.bringToFront") if self.verbose: print("🪟 Pestaña enfocada (bringToFront).") except Exception as e: print(f"⚠️ Error al enfocar pestaña: {e}")