aef8791151
- Added Appshell component with responsive navbar and main content area - Integrated ColorSchemeToggle for light/dark mode switching - Created Welcome component with styled title and introductory text - Developed ChatPage for LLM interaction with WebSocket support - Implemented Biblioteca for managing notes with rich text editor - Added LoginPage for user authentication with error handling - Introduced MessageList and MessageBubble components for chat messages - Styled components with CSS modules for consistent design
207 lines
7.8 KiB
Python
207 lines
7.8 KiB
Python
import asyncio
|
|
import json
|
|
import base64
|
|
import websockets
|
|
from typing import Optional, List
|
|
from .ElementoWeb import ElementoWeb
|
|
import os
|
|
|
|
|
|
class Tab:
|
|
def __init__(self, websocket: websockets.WebSocketClientProtocol, ws_url: str, verbose: bool = True):
|
|
self.websocket = websocket
|
|
self.ws_url = ws_url
|
|
self._message_id = 0
|
|
self._pending = {}
|
|
self._load_event = asyncio.Event()
|
|
self.verbose = verbose
|
|
|
|
async def __aenter__(self):
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
await self.cerrar()
|
|
|
|
@classmethod
|
|
async def crear_desde_websocket(cls, ws_url: str) -> "Tab":
|
|
websocket = await websockets.connect(ws_url, max_size=10 * 1024 * 1024)
|
|
tab = cls(websocket, ws_url)
|
|
asyncio.create_task(tab._recibir_eventos())
|
|
await tab._enviar("Page.enable")
|
|
await tab._enviar("Network.enable")
|
|
return tab
|
|
|
|
async def _recibir_eventos(self):
|
|
async for mensaje in self.websocket:
|
|
data = json.loads(mensaje)
|
|
if "id" in data and data["id"] in self._pending:
|
|
future = self._pending.pop(data["id"])
|
|
if "result" in data:
|
|
future.set_result(data["result"])
|
|
elif "error" in data:
|
|
future.set_exception(Exception(data["error"]))
|
|
elif data.get("method") == "Page.loadEventFired":
|
|
self._load_event.set()
|
|
|
|
async def _enviar(self, metodo: str, parametros: Optional[dict] = None, timeout: float = 10.0) -> dict:
|
|
self._message_id += 1
|
|
msg_id = self._message_id
|
|
mensaje = {
|
|
"id": msg_id,
|
|
"method": metodo,
|
|
"params": parametros or {}
|
|
}
|
|
|
|
future = asyncio.get_event_loop().create_future()
|
|
self._pending[msg_id] = future
|
|
await self.websocket.send(json.dumps(mensaje))
|
|
return await asyncio.wait_for(future, timeout=timeout)
|
|
|
|
async def navegar(self, url: str, wait_time: float = 5.0):
|
|
self._load_event.clear()
|
|
if self.verbose:
|
|
print(f"🌍 Navegando a: {url}")
|
|
await self._enviar("Page.navigate", {"url": url})
|
|
try:
|
|
await asyncio.wait_for(self._load_event.wait(), timeout=wait_time)
|
|
if self.verbose:
|
|
print("✅ Página cargada correctamente.")
|
|
except asyncio.TimeoutError:
|
|
print(f"⚠️ Tiempo de espera agotado ({wait_time}s) al cargar la página.")
|
|
|
|
async def evaluar_js(self, js_code: str) -> Optional[str]:
|
|
try:
|
|
result = await self._enviar("Runtime.evaluate", {
|
|
"expression": js_code,
|
|
"returnByValue": True
|
|
})
|
|
if "exceptionDetails" in result:
|
|
raise Exception(result["exceptionDetails"])
|
|
return result.get("result", {}).get("value")
|
|
except Exception as e:
|
|
print(f"⚠️ Error al ejecutar JS: {e}")
|
|
return None
|
|
|
|
async def inyectar_archivo_js(self, ruta_archivo: str, reemplazos: dict = None) -> Optional[str]:
|
|
if not os.path.exists(ruta_archivo):
|
|
print(f"❌ Archivo JS no encontrado: {ruta_archivo}")
|
|
return None
|
|
|
|
with open(ruta_archivo, "r", encoding="utf-8") as f:
|
|
js_code = f.read()
|
|
|
|
if reemplazos:
|
|
for key, value in reemplazos.items():
|
|
js_code = js_code.replace(f"{{{{{key}}}}}", str(value))
|
|
|
|
# 🔧 Eliminamos el `return` externo
|
|
js_code_final = f"(async () => {{\n{js_code}\n}})();"
|
|
|
|
try:
|
|
result = await self._enviar("Runtime.evaluate", {
|
|
"expression": js_code_final,
|
|
"returnByValue": True
|
|
})
|
|
if "exceptionDetails" in result:
|
|
raise Exception(result["exceptionDetails"])
|
|
return result.get("result", {}).get("value")
|
|
except Exception as e:
|
|
print(f"⚠️ Error al inyectar JS desde {ruta_archivo}: {e}")
|
|
return None
|
|
|
|
async def obtener_user_agent(self) -> Optional[str]:
|
|
return await self.evaluar_js("navigator.userAgent")
|
|
|
|
async def capturar_screenshot(self, output_path: str = "screenshot.png"):
|
|
try:
|
|
result = await self._enviar("Page.captureScreenshot")
|
|
data = result["data"]
|
|
with open(output_path, "wb") as f:
|
|
f.write(base64.b64decode(data))
|
|
if self.verbose:
|
|
print(f"📸 Screenshot guardado como {output_path}")
|
|
except Exception as e:
|
|
print(f"⚠️ Error al capturar screenshot: {e}")
|
|
|
|
async def cerrar(self):
|
|
try:
|
|
if not self.websocket.closed:
|
|
await self.websocket.close()
|
|
if self.verbose:
|
|
print("🛑 WebSocket cerrado.")
|
|
except Exception as e:
|
|
print(f"⚠️ Error al cerrar pestaña: {e}")
|
|
|
|
async def obtener_html_completo(self) -> Optional[str]:
|
|
try:
|
|
result = await self._enviar("Runtime.evaluate", {
|
|
"expression": "document.documentElement.outerHTML",
|
|
"returnByValue": True
|
|
})
|
|
return result.get("result", {}).get("value")
|
|
except Exception as e:
|
|
print(f"⚠️ Error al obtener HTML: {e}")
|
|
return None
|
|
|
|
async def obtener_dominio(self) -> Optional[str]:
|
|
try:
|
|
dominio = await self.evaluar_js("window.location.hostname")
|
|
if self.verbose and dominio:
|
|
print(f"🌐 Dominio actual: {dominio}")
|
|
return dominio
|
|
except Exception as e:
|
|
print(f"⚠️ Error al obtener dominio: {e}")
|
|
return None
|
|
|
|
async def get_element_by_selector_node(self, selector: str) -> Optional["ElementoWeb"]:
|
|
try:
|
|
doc = await self._enviar("DOM.getDocument")
|
|
root_node_id = doc["root"]["nodeId"]
|
|
|
|
result = await self._enviar("DOM.querySelector", {
|
|
"nodeId": root_node_id,
|
|
"selector": selector
|
|
})
|
|
node_id = result.get("nodeId")
|
|
|
|
if not node_id:
|
|
print(f"⚠️ Nodo no encontrado con selector: {selector}")
|
|
return None
|
|
|
|
return ElementoWeb.from_node(self, node_id=node_id)
|
|
except Exception as e:
|
|
print(f"⚠️ Error al buscar nodo desde DOM.querySelector: {e}")
|
|
return None
|
|
|
|
async def get_elements_by_css_selector(self, selector: str) -> List["ElementoWeb"]:
|
|
try:
|
|
result = await self._enviar("Runtime.evaluate", {
|
|
"expression": f'Array.from(document.querySelectorAll("{selector}"))',
|
|
"objectGroup": "grupo_elementos_css",
|
|
"includeCommandLineAPI": True,
|
|
"returnByValue": False
|
|
})
|
|
array_id = result["result"]["objectId"]
|
|
props = await self._enviar("Runtime.getProperties", {
|
|
"objectId": array_id,
|
|
"ownProperties": True
|
|
})
|
|
elementos = []
|
|
for prop in props["result"]:
|
|
if "value" in prop and "objectId" in prop["value"]:
|
|
elementos.append(ElementoWeb(self, prop["value"]["objectId"]))
|
|
if self.verbose:
|
|
print(f"🔍 Se encontraron {len(elementos)} elementos con el selector CSS '{selector}'.")
|
|
return elementos
|
|
except Exception as e:
|
|
print(f"⚠️ Error al buscar elementos por selector CSS '{selector}': {e}")
|
|
return []
|
|
|
|
async def enfocar(self):
|
|
try:
|
|
await self._enviar("Page.bringToFront")
|
|
if self.verbose:
|
|
print("🪟 Pestaña enfocada (bringToFront).")
|
|
except Exception as e:
|
|
print(f"⚠️ Error al enfocar pestaña: {e}")
|