From e1b756ac997d791343fabf8487e5ef9180362d00 Mon Sep 17 00:00:00 2001 From: egutierrez Date: Sun, 1 Jun 2025 15:31:13 +0200 Subject: [PATCH] feat: Implement cookie extraction script for Chrome v20 and enhance browser interaction --- scrappers/devolver_cookies.py | 179 +++++++++++++++++++++ scrappers/ejecucion_iterativa_navegador.py | 87 ++++++++++ scrappers/iniciar_chrome.py | 80 +++++++++ scrappers/prueba_navegadores.py | 122 ++++++++++++++ src/ScrappingWeb/ElementoWeb.py | 126 ++++++++++++--- src/ScrappingWeb/Navegador.py | 4 +- src/ScrappingWeb/Scrapper.py | 77 ++++++++- src/ScrappingWeb/Tab.py | 106 ++++++++---- 8 files changed, 717 insertions(+), 64 deletions(-) create mode 100644 scrappers/devolver_cookies.py create mode 100644 scrappers/ejecucion_iterativa_navegador.py create mode 100644 scrappers/iniciar_chrome.py create mode 100644 scrappers/prueba_navegadores.py diff --git a/scrappers/devolver_cookies.py b/scrappers/devolver_cookies.py new file mode 100644 index 0000000..4e70bce --- /dev/null +++ b/scrappers/devolver_cookies.py @@ -0,0 +1,179 @@ +import os +import sys +import json +import binascii +import ctypes +import base64 +import sqlite3 +import pandas as pd +import pathlib +from Crypto.Cipher import AES, ChaCha20_Poly1305 +from pypsexec.client import Client + +""" +Este script extrae cookies v20 de Google Chrome y las guarda en un archivo CSV. +Requiere privilegios de administrador para acceder a los datos de Chrome. + +Conseguido para poder extraer cookies de Chrome v20, que utiliza un nuevo formato de cifrado. + +""" + + +def is_admin(): + try: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + except: + return False + + +def get_app_bound_key(local_state_path): + with open(local_state_path, "r", encoding="utf-8") as f: + local_state = json.load(f) + return local_state["os_crypt"]["app_bound_encrypted_key"] + + +def decrypt_app_bound_key(encrypted_key_b64): + arguments = "-c \"" + """import win32crypt +import binascii +encrypted_key = win32crypt.CryptUnprotectData(binascii.a2b_base64('{}'), None, None, None, 0) +print(binascii.b2a_base64(encrypted_key[1]).decode()) +""".replace("\n", ";") + "\"" + + c = Client("localhost") + c.connect() + + decrypted_key = None + try: + c.create_service() + + assert(binascii.a2b_base64(encrypted_key_b64)[:4] == b"APPB") + stripped_key_b64 = binascii.b2a_base64(binascii.a2b_base64(encrypted_key_b64)[4:]).decode().strip() + + encrypted_key_b64_sys, _, _ = c.run_executable( + sys.executable, + arguments=arguments.format(stripped_key_b64), + use_system_account=True + ) + + decrypted_key_b64, _, _ = c.run_executable( + sys.executable, + arguments=arguments.format(encrypted_key_b64_sys.decode().strip()), + use_system_account=False + ) + + decrypted_key = binascii.a2b_base64(decrypted_key_b64)[-61:] + finally: + c.remove_service() + c.disconnect() + + return decrypted_key + + +def decrypt_final_key(encrypted_key): + aes_key = bytes.fromhex("B31C6E241AC846728DA9C1FAC4936651CFFB944D143AB816276BCC6DA0284787") + chacha20_key = bytes.fromhex("E98F37D7F4E1FA433D19304DC2258042090E2D1D7EEA7670D41F738D08729660") + + flag = encrypted_key[0] + iv = encrypted_key[1:13] + ciphertext = encrypted_key[13:45] + tag = encrypted_key[45:] + + if flag == 1: + cipher = AES.new(aes_key, AES.MODE_GCM, nonce=iv) + elif flag == 2: + cipher = ChaCha20_Poly1305.new(key=chacha20_key, nonce=iv) + else: + raise ValueError(f"Unsupported flag: {flag}") + + return cipher.decrypt_and_verify(ciphertext, tag) + + +def decrypt_cookie_v20(encrypted_value, key): + cookie_iv = encrypted_value[3:15] + encrypted_cookie = encrypted_value[15:-16] + cookie_tag = encrypted_value[-16:] + cookie_cipher = AES.new(key, AES.MODE_GCM, nonce=cookie_iv) + decrypted_cookie = cookie_cipher.decrypt_and_verify(encrypted_cookie, cookie_tag) + return decrypted_cookie[32:].decode('utf-8') + + +def extract_all_v20_cookies(): + user_profile = os.environ['USERPROFILE'] + local_state_path = rf"{user_profile}\AppData\Local\Google\Chrome\User Data\Local State" + base_profile_path = rf"{user_profile}\AppData\Local\Google\Chrome\User Data" + + app_bound_key_b64 = get_app_bound_key(local_state_path) + decrypted_key_raw = decrypt_app_bound_key(app_bound_key_b64) + final_key = decrypt_final_key(decrypted_key_raw) + + perfiles_invalidos = {"System Profile", "Guest Profile", "CrashpadMetrics"} + perfiles = [ + name for name in os.listdir(base_profile_path) + if os.path.isdir(os.path.join(base_profile_path, name)) + and name not in perfiles_invalidos + and os.path.exists(os.path.join(base_profile_path, name, "Network", "Cookies")) + ] + + all_cookies = [] + + for profile in perfiles: + db_path = os.path.join(base_profile_path, profile, "Network", "Cookies") + con = sqlite3.connect(pathlib.Path(db_path).as_uri() + "?mode=ro", uri=True) + cur = con.cursor() + r = cur.execute("SELECT host_key, name, path, is_secure, is_httponly, expires_utc, last_access_utc, CAST(encrypted_value AS BLOB) from cookies;") + cookies = cur.fetchall() + con.close() + + for row in cookies: + host, name, path, is_secure, is_httponly, expires_utc, last_access_utc, encrypted_value = row + encrypted_value_b64 = base64.b64encode(encrypted_value).decode() + + if encrypted_value.startswith(b"v20"): + try: + value = decrypt_cookie_v20(encrypted_value, final_key) + print(f"[✓] {host} {name}: {value}") + all_cookies.append({ + "host": host, + "name": name, + "path": path, + "value": value, + "encrypted_value_b64": encrypted_value_b64, + "expires_utc": expires_utc, + "is_secure": is_secure, + "is_httponly": is_httponly, + "last_access_utc": last_access_utc, + "profile": profile, + "is_decrypted": True, + "decrypt_error": "" + }) + except Exception as e: + print(f"[x] Error decrypting {host} {name}: {e}") + all_cookies.append({ + "host": host, + "name": name, + "path": path, + "value": "", + "encrypted_value_b64": encrypted_value_b64, + "expires_utc": expires_utc, + "is_secure": is_secure, + "is_httponly": is_httponly, + "last_access_utc": last_access_utc, + "profile": profile, + "is_decrypted": False, + "decrypt_error": str(e) + }) + + return pd.DataFrame(all_cookies) + + +if __name__ == "__main__": + if not is_admin(): + input("Este script necesita ejecutarse como administrador. Presiona Enter para reiniciar con privilegios...") + ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join([sys.argv[0]] + sys.argv[1:]), None, 1) + sys.exit() + + print("[*] Extrayendo cookies v20 desde todos los perfiles...") + df = extract_all_v20_cookies() + df.to_csv("cookies_extraidas.csv", index=False, encoding="utf-8") + print(f"[✓] Cookies v20 extraídas: {len(df)}") + print("[✓] Guardado en 'cookies_extraidas.csv'") diff --git a/scrappers/ejecucion_iterativa_navegador.py b/scrappers/ejecucion_iterativa_navegador.py new file mode 100644 index 0000000..d0c3c8c --- /dev/null +++ b/scrappers/ejecucion_iterativa_navegador.py @@ -0,0 +1,87 @@ +import asyncio +import os +import pyperclip +import re +from src.ScrappingWeb.Scrapper import Scrapper + +def sanitizar(nombre: str) -> str: + return re.sub(r'[\\/*?:"<>|]', "_", nombre).strip()[:100] + +OUTPUT_DIR = "esquemas_json" +os.makedirs(OUTPUT_DIR, exist_ok=True) + +async def main(): + ws_id = "F51AC05B27E1DEC4011E67369781596C" + ws_url = f"ws://127.0.0.1:9222/devtools/page/{ws_id}" + scrapper = Scrapper(debugging_url="http://127.0.0.1:9222") + + print("🔌 Conectando a pestaña específica...") + + tab = scrapper.get_tab(ws_url) or scrapper.get_tab(ws_id) + if not tab: + nuevas_tabs = await scrapper.obtener_tabs_existentes() + tab = next((t for t in nuevas_tabs if t.ws_url.rsplit("/", 1)[-1] == ws_id), None) + + if not tab: + print("⚠️ La pestaña con ese ID no se encontró.") + return + + elementos = await tab.get_elements_by_css_selector( + "#_0rif_bq-resource-tree > div.cfctest-tree-main.ng-tns-c3578326070-0 > ul > cfc-virtual-scroller > div > div.item-container > div > li" + ) + + for i, elemento in enumerate(elementos[:12]): + print(f"🖱️ Click #{i + 1}") + + clickeable = await elemento.encontrar_hijo_clickeable() + if clickeable: + await clickeable.click() + else: + print(f"⚠️ No se encontró subelemento clickeable en #{i+1}") + continue + + await asyncio.sleep(1) + + texto_crudo = await elemento.obtener_texto() + nombre_archivo = sanitizar(texto_crudo or f"esquema_item_{i+1}") + print(f"📄 Nombre base del archivo: {nombre_archivo}.txt") + + # ✅ Ejecutar JS en el navegador para simular flujo de copia + await tab.evaluar_js(""" + (() => { + const boton = document.querySelector('button[id^="_0rif_bqui-table-copy-schema-btn"] span.mdc-button__label > span'); + if (boton) boton.click(); + })() + """) + await asyncio.sleep(1) + + await tab.evaluar_js(""" + (() => { + const overlays = document.querySelectorAll("div.cdk-overlay-pane"); + for (let overlay of overlays) { + const items = overlay.querySelectorAll("cfc-menu-item .cfc-menu-item-label"); + for (let item of items) { + if (item.textContent.includes("Copiar como JSON")) { + item.click(); + break; + } + } + } + })() + """) + await asyncio.sleep(1.5) + + try: + texto_json = pyperclip.paste() + file_path = os.path.join(OUTPUT_DIR, f"{nombre_archivo}.txt") + with open(file_path, "w", encoding="utf-8") as f: + f.write(texto_json) + print(f"✅ Guardado: {file_path}") + except Exception as e: + print(f"❌ Error al leer el portapapeles o guardar archivo: {e}") + + + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scrappers/iniciar_chrome.py b/scrappers/iniciar_chrome.py new file mode 100644 index 0000000..32e6c2d --- /dev/null +++ b/scrappers/iniciar_chrome.py @@ -0,0 +1,80 @@ +import subprocess +import os +import time +import signal + +def iniciar_chrome(chrome_path, + user_data_dir, + headless=False, + debugging_port=9222, + user_agent=None, + ): + + # Asegúrate de que el directorio del perfil exista + os.makedirs(user_data_dir, exist_ok=True) + + # Lista de argumentos para Chrome + chrome_args = [ + f"--remote-debugging-port={debugging_port}", + f"--user-data-dir={user_data_dir}", + "--disable-blink-features=AutomationControlled", + "--no-sandbox", + "--disable-web-security", + "--disable-extensions", + "--disable-dev-shm-usage", + "--disable-infobars", + "--disable-popup-blocking", + "--disable-default-apps", + "--mute-audio", + "--window-size=1024,1024", + + ] + + if not headless: + pass + else: + chrome_args.append("--headless=new") # para versiones recientes de Chrome + + if not user_agent: + pass + else: + chrome_args.append(f"--user-agent={user_agent}") + + # Comando para iniciar Chrome + chrome_process = subprocess.Popen([chrome_path] + chrome_args) + + try: + print(f"Chrome iniciado (headless={headless}). Presiona Ctrl+C para salir.") + while True: + if chrome_process.poll() is not None: + print("Chrome se ha cerrado.") + break + time.sleep(1) + except KeyboardInterrupt: + print("Terminando proceso de Chrome...") + chrome_process.terminate() + try: + chrome_process.wait(timeout=5) + except subprocess.TimeoutExpired: + chrome_process.kill() + print("Chrome cerrado correctamente.") + + +# Ruta al ejecutable de Chrome +chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" + +# Directorio para el perfil de usuario +user_data_dir = os.path.abspath("./Perfiles_usuario/chrome_profile") + +# Puerto para la depuración remota +port = 9222 + +user_agent= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36" + + +# Llama a la función con True o False +iniciar_chrome(chrome_path=chrome_path, + user_data_dir=user_data_dir, + debugging_port=port, + headless=False, + user_agent=user_agent) # Cambia a True para modo headless \ No newline at end of file diff --git a/scrappers/prueba_navegadores.py b/scrappers/prueba_navegadores.py new file mode 100644 index 0000000..f656a07 --- /dev/null +++ b/scrappers/prueba_navegadores.py @@ -0,0 +1,122 @@ +import asyncio +import os +import re +from src.ScrappingWeb.Navegador import Navegador +from src.ScrappingWeb.Scrapper import Scrapper +from src.ScrappingWeb.Tab import Tab +import aiohttp +import csv + + +async def esperar_chrome_listo(port, timeout=10): + url = f"http://127.0.0.1:{port}/json" + for _ in range(timeout * 2): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + if resp.status == 200: + return + except Exception: + pass + await asyncio.sleep(0.5) + raise TimeoutError(f"Chrome en puerto {port} no respondió dentro del tiempo esperado.") + +chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" + +def sanitizar_nombre(nombre: str) -> str: + # Eliminar caracteres inválidos para nombre de archivo + return re.sub(r'[\\/*?:"<>|]', "_", nombre).strip()[:100] + + +async def iniciar_y_scrapear(id: int): + user_data_dir = os.path.abspath(f"./Perfiles_usuario/chrome_profile_{id}") + port = 9222 + id + navegador = Navegador( + chrome_path=chrome_path, + user_data_dir=user_data_dir, + id=id, + download_dir=os.path.join(user_data_dir, "downloads"), + debugging_port=port, + headless=False, + user_agent=f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/{100+id}.0.0.0 Safari/537.36" + ) + + # Iniciar navegador en background + asyncio.create_task(navegador.iniciar()) + + # Esperamos a que el navegador esté listo + await esperar_chrome_listo(port) + + # Conectarse con el scraper al navegador + scrapper = Scrapper(debugging_url=f"http://127.0.0.1:{port}") + tab = await scrapper.nueva_tab("", wait_time=6) + + # Ejecutar acciones desde la clase Tab + ua = await tab.obtener_user_agent() + print(f"🧭 [{id}] User-Agent:", ua) + + title = await tab.evaluar_js("document.title") + print(f"📄 [{id}] Título:", title) + + + # botones= await tab.get_elements_by_css_selector("#mw-content-text > div.mw-content-ltr.mw-parser-output > figure:nth-child(27) > a > img") + + # for boton in botones: + # await boton.click() + + + # # Crear carpeta si no existe + # os.makedirs("wikipedia_md", exist_ok=True) + + + # # Guardar el HTML completo + # html = await tab.obtener_html_completo() + # with open(f"contenido.html", "w", encoding="utf-8") as f: + # f.write(html) + + # # Leer enlaces del CSV + # with open("enlaces_extraidos.csv", "r", encoding="utf-8") as f: + # reader = csv.reader(f) + # next(reader) # saltar encabezados + # enlaces = list(reader) + + # for texto, enlace in enlaces: + # nombre_archivo = sanitizar_nombre(texto or "sin_titulo") + ".png" + # ruta_archivo = os.path.join("wikipedia", nombre_archivo) + + # try: + # print(f"🌐 Visitando: {enlace}") + # tab = await scrapper.nueva_tab(enlace, wait_time=6) + + # await tab.capturar_screenshot(ruta_archivo) + # print(f"📸 Captura guardada: {ruta_archivo}") + + # await tab.cerrar() + # except Exception as e: + # print(f"❌ Error con {enlace}: {e}") + + + # await tab.capturar_screenshot(f"screenshot_{id}.png") + + # html = await tab.obtener_html_completo() + # print(html) + + # with open("contenido.html", "w", encoding="utf-8") as f: + # f.write(html) + + # Extraer enlaces y guardarlos en CSV + + + + + # # # Cerrar tab y navegador si quieres + # await asyncio.sleep(10) + # await tab.cerrar() + # await navegador.cerrar() + +async def main(): + tareas = [iniciar_y_scrapear(i) for i in range(1)] + await asyncio.gather(*tareas) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/ScrappingWeb/ElementoWeb.py b/src/ScrappingWeb/ElementoWeb.py index 3735830..0e00700 100644 --- a/src/ScrappingWeb/ElementoWeb.py +++ b/src/ScrappingWeb/ElementoWeb.py @@ -1,54 +1,58 @@ from typing import TYPE_CHECKING, Optional import random import asyncio +import json if TYPE_CHECKING: - from src.ScrappingWeb.Tab import Tab + from .Tab import Tab class ElementoWeb: - def __init__(self, tab: "Tab", object_id: str): + def __init__(self, tab: "Tab", object_id: Optional[str]): self.tab = tab self.object_id = object_id + self._node_id = None # Lazy resolved + + @classmethod + def from_node(cls, tab: "Tab", node_id: int) -> "ElementoWeb": + inst = cls(tab, object_id=None) + inst._node_id = node_id + return inst + + async def _asegurar_object_id(self): + if not self.object_id and self._node_id: + try: + resolved = await self.tab._enviar("DOM.resolveNode", {"nodeId": self._node_id}) + self.object_id = resolved["object"]["objectId"] + except Exception as e: + print(f"⚠️ No se pudo resolver objectId desde nodeId: {e}") async def scroll_into_view(self): try: + await self._asegurar_object_id() await self.tab._enviar("Runtime.callFunctionOn", { "objectId": self.object_id, "functionDeclaration": "function() { this.scrollIntoView({block: 'center'}); }", "awaitPromise": True }) - print("📜 Elemento desplazado a la vista.") + if self.tab.verbose: + print("📜 Elemento desplazado a la vista.") except Exception as e: print(f"⚠️ Error al hacer scroll hacia el elemento: {e}") - @classmethod - def from_node(cls, tab: "Tab", node_id: int) -> "ElementoWeb": - # Creamos un objectId a partir del nodeId usando DOM.resolveNode - cls._node_id = node_id - cls._resolved_object_id = None # Lazy resolution opcional - return cls(tab, object_id=None) - async def click(self): try: await self.scroll_into_view() - - # Resolver objectId si es necesario - if not self.object_id and hasattr(self, "_node_id"): - resolved = await self.tab._enviar("DOM.resolveNode", {"nodeId": self._node_id}) - self.object_id = resolved["object"]["objectId"] - + await self._asegurar_object_id() if not self.object_id: raise ValueError("No se puede obtener objectId del elemento para hacer click.") - # Obtener nodeId + # Intenta obtener coordenadas del nodo node_result = await self.tab._enviar("DOM.describeNode", { "objectId": self.object_id }) - node_id = node_result["node"]["nodeId"] - # Obtener coordenadas con fallback try: box_model = await self.tab._enviar("DOM.getBoxModel", {"nodeId": node_id}) content = box_model["model"]["content"] @@ -60,7 +64,12 @@ class ElementoWeb: x = (quad[0] + quad[4]) / 2 y = (quad[1] + quad[5]) / 2 - # Simular movimiento humano del mouse + # 🧠 Enfocar el elemento antes de clickear + await self.tab._enviar("DOM.focus", { + "objectId": self.object_id + }) + + # 🎯 Movimiento humanoide opcional start_x, start_y = x + random.uniform(-100, 100), y + random.uniform(-100, 100) steps = random.randint(5, 12) for i in range(1, steps + 1): @@ -73,7 +82,7 @@ class ElementoWeb: }) await asyncio.sleep(random.uniform(0.01, 0.05)) - # Click humano + # 👆 Mouse Down await self.tab._enviar("Input.dispatchMouseEvent", { "type": "mousePressed", "x": x, @@ -81,7 +90,10 @@ class ElementoWeb: "button": "left", "clickCount": 1 }) + await asyncio.sleep(random.uniform(0.05, 0.15)) + + # 👇 Mouse Up await self.tab._enviar("Input.dispatchMouseEvent", { "type": "mouseReleased", "x": x, @@ -90,27 +102,89 @@ class ElementoWeb: "clickCount": 1 }) - print(f"🖱️ Click humano simulado en ({x:.1f}, {y:.1f})") + await asyncio.sleep(random.uniform(0.01, 0.05)) + + # 🖱️ Click manual adicional + await self.tab._enviar("Input.dispatchMouseEvent", { + "type": "mouseClicked", + "x": x, + "y": y, + "button": "left", + "clickCount": 1 + }) + + if self.tab.verbose: + print(f"🖱️ Click humano simulado en ({x:.1f}, {y:.1f})") except Exception as e: print(f"⚠️ Error al hacer click físico: {e}") print("🧪 Intentando fallback con JavaScript click()...") await self.click_js() - async def click_js(self): try: + await self._asegurar_object_id() + if not self.object_id: + print("⚠️ No se puede hacer click JS: objectId no disponible.") + return await self.tab._enviar("Runtime.callFunctionOn", { "objectId": self.object_id, "functionDeclaration": "function() { this.click(); }", "awaitPromise": True }) - print("🖱️ Click simulado por JavaScript (element.click())") + if self.tab.verbose: + print("🖱️ Click simulado por JavaScript (element.click())") except Exception as e: print(f"⚠️ Error al ejecutar click en JS: {e}") async def obtener_texto(self) -> Optional[str]: - return await self.tab.evaluar_js(f'document.getElementById("{self.object_id}").textContent') + try: + await self._asegurar_object_id() + result = await self.tab._enviar("Runtime.callFunctionOn", { + "objectId": self.object_id, + "functionDeclaration": "function() { return this.textContent; }", + "returnByValue": True + }) + return result.get("result", {}).get("value") + except Exception as e: + print(f"⚠️ Error al obtener texto del elemento: {e}") + return None async def escribir_texto(self, texto: str): - await self.tab.evaluar_js(f'document.getElementById("{self.object_id}").value = "{texto}"') + try: + await self._asegurar_object_id() + await self.tab._enviar("Runtime.callFunctionOn", { + "objectId": self.object_id, + "functionDeclaration": f"function() {{ this.value = {json.dumps(texto)}; this.dispatchEvent(new Event('input')); }}", + "awaitPromise": True + }) + if self.tab.verbose: + print(f"⌨️ Texto escrito en elemento: '{texto}'") + except Exception as e: + print(f"⚠️ Error al escribir texto: {e}") + + + async def encontrar_hijo_clickeable(self) -> Optional["ElementoWeb"]: + try: + await self._asegurar_object_id() + resultado = await self.tab._enviar("Runtime.callFunctionOn", { + "objectId": self.object_id, + "functionDeclaration": """ + function() { + const candidatos = this.querySelectorAll("span, div, a, button"); + for (const el of candidatos) { + const style = window.getComputedStyle(el); + const visible = style.display !== "none" && style.visibility !== "hidden"; + const interactivo = style.pointerEvents !== "none"; + if (visible && interactivo) return el; + } + return this; + } + """, + "returnByValue": False + }) + if "result" in resultado and "objectId" in resultado["result"]: + return ElementoWeb(self.tab, resultado["result"]["objectId"]) + except Exception as e: + print(f"⚠️ No se pudo encontrar hijo clickeable: {e}") + return None \ No newline at end of file diff --git a/src/ScrappingWeb/Navegador.py b/src/ScrappingWeb/Navegador.py index 02f46f6..e9d69e5 100644 --- a/src/ScrappingWeb/Navegador.py +++ b/src/ScrappingWeb/Navegador.py @@ -87,9 +87,9 @@ class Navegador: f"--user-data-dir={self.user_data_dir}", "--disable-blink-features=AutomationControlled", "--no-sandbox", - "--disable-web-security", + # "--disable-web-security", # "--disable-extensions", - "--disable-dev-shm-usage", + # "--disable-dev-shm-usage", "--disable-infobars", "--disable-popup-blocking", "--disable-default-apps", diff --git a/src/ScrappingWeb/Scrapper.py b/src/ScrappingWeb/Scrapper.py index 6f51cd9..26d3100 100644 --- a/src/ScrappingWeb/Scrapper.py +++ b/src/ScrappingWeb/Scrapper.py @@ -2,7 +2,10 @@ import aiohttp import websockets import json import asyncio -from src.ScrappingWeb.Tab import Tab +from .Tab import Tab +from typing import Optional + + class Scrapper: def __init__(self, debugging_url: str = "http://127.0.0.1:9222"): @@ -56,14 +59,80 @@ class Scrapper: raise RuntimeError("No se pudo obtener el WebSocket de la nueva pestaña") - async def nueva_tab(self, url: str, wait_time: float = 5.0) -> Tab: + async def nueva_tab(self, url: str = "", wait_time: float = 5.0) -> Tab: websocket_url = await self._crear_tab_websocket_url() tab = await Tab.crear_desde_websocket(websocket_url) self.tabs.append(tab) - await tab.navegar(url, wait_time) + + if url: + print(f"🌍 Navegando a: {url}") + await tab.navegar(url, wait_time) + else: + print("⚠️ No se especificó URL. La pestaña se creó pero no se navegó a ninguna página.") + return tab async def cerrar_todos(self): for tab in list(self.tabs): await tab.cerrar() - self.tabs.clear() \ No newline at end of file + self.tabs.clear() + + def get_tab(self, identifier: str) -> Optional[Tab]: + """ + Devuelve una instancia de Tab según su WebSocket URL o su ID final (extraído del WebSocket URL). + Acepta: + - ws_url completo: ws://127.0.0.1:9222/devtools/page/XYZ + - id directo: XYZ + """ + for tab in self.tabs: + # Comparar directamente contra ws_url + if tab.ws_url == identifier: + return tab + + # Comparar contra el ID extraído + ws_id = tab.ws_url.rsplit("/", 1)[-1] + if ws_id == identifier: + return tab + + return None + + async def obtener_tabs_existentes(self) -> list[Tab]: + """ + Recupera todas las pestañas de tipo 'page' que no están ya en self.tabs, + las conecta y devuelve como lista. Muestra resumen limpio por consola. + """ + async with aiohttp.ClientSession() as session: + async with session.get(f"{self.debugging_url}/json") as resp: + if resp.status != 200: + raise RuntimeError("No se pudo obtener la lista de pestañas") + + tabs_info = await resp.json() + + print("\n🧾 Pestañas activas (filtradas: solo 'type': 'page'):\n") + nuevas_tabs = [] + for idx, tab_info in enumerate(tabs_info, start=1): + tipo = tab_info.get("type") + if tipo != "page": + continue # Filtrar todo lo que no sea página visible + + ws_url = tab_info.get("webSocketDebuggerUrl") + tab_id = tab_info.get("id") + title = tab_info.get("title", "") + url = tab_info.get("url", "") + + # Verifica si ya la tienes cargada + if any(t.ws_url == ws_url for t in self.tabs): + continue + + # Conectar + try: + tab = await Tab.crear_desde_websocket(ws_url) + self.tabs.append(tab) + nuevas_tabs.append(tab) + except Exception as e: + print(f"⚠️ No se pudo conectar a pestaña {tab_id}: {e}") + + if not nuevas_tabs: + print("⚠️ No se encontraron nuevas pestañas para agregar.\n") + + return nuevas_tabs \ No newline at end of file diff --git a/src/ScrappingWeb/Tab.py b/src/ScrappingWeb/Tab.py index 0cc1e7f..f380671 100644 --- a/src/ScrappingWeb/Tab.py +++ b/src/ScrappingWeb/Tab.py @@ -2,21 +2,29 @@ import asyncio import json import base64 import websockets -from typing import Optional -from typing import List -from src.ScrappingWeb.ElementoWeb import ElementoWeb +from typing import Optional, List +from .ElementoWeb import ElementoWeb +import os + class Tab: - def __init__(self, websocket: websockets.WebSocketClientProtocol, ws_url: str): + def __init__(self, websocket: websockets.WebSocketClientProtocol, ws_url: str, verbose: bool = True): self.websocket = websocket self.ws_url = ws_url self._message_id = 0 self._pending = {} self._load_event = asyncio.Event() + self.verbose = verbose + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.cerrar() @classmethod async def crear_desde_websocket(cls, ws_url: str) -> "Tab": - websocket = await websockets.connect(ws_url) + websocket = await websockets.connect(ws_url, max_size=10 * 1024 * 1024) tab = cls(websocket, ws_url) asyncio.create_task(tab._recibir_eventos()) await tab._enviar("Page.enable") @@ -28,11 +36,14 @@ class Tab: data = json.loads(mensaje) if "id" in data and data["id"] in self._pending: future = self._pending.pop(data["id"]) - future.set_result(data.get("result")) + if "result" in data: + future.set_result(data["result"]) + elif "error" in data: + future.set_exception(Exception(data["error"])) elif data.get("method") == "Page.loadEventFired": self._load_event.set() - async def _enviar(self, metodo: str, parametros: Optional[dict] = None) -> dict: + async def _enviar(self, metodo: str, parametros: Optional[dict] = None, timeout: float = 10.0) -> dict: self._message_id += 1 msg_id = self._message_id mensaje = { @@ -44,15 +55,17 @@ class Tab: future = asyncio.get_event_loop().create_future() self._pending[msg_id] = future await self.websocket.send(json.dumps(mensaje)) - return await future + return await asyncio.wait_for(future, timeout=timeout) async def navegar(self, url: str, wait_time: float = 5.0): self._load_event.clear() - print(f"🌍 Navegando a: {url}") + if self.verbose: + print(f"🌍 Navegando a: {url}") await self._enviar("Page.navigate", {"url": url}) try: await asyncio.wait_for(self._load_event.wait(), timeout=wait_time) - print("✅ Página cargada correctamente.") + if self.verbose: + print("✅ Página cargada correctamente.") except asyncio.TimeoutError: print(f"⚠️ Tiempo de espera agotado ({wait_time}s) al cargar la página.") @@ -62,11 +75,40 @@ class Tab: "expression": js_code, "returnByValue": True }) - return result["result"]["value"] + if "exceptionDetails" in result: + raise Exception(result["exceptionDetails"]) + return result.get("result", {}).get("value") except Exception as e: print(f"⚠️ Error al ejecutar JS: {e}") return None + async def inyectar_archivo_js(self, ruta_archivo: str, reemplazos: dict = None) -> Optional[str]: + if not os.path.exists(ruta_archivo): + print(f"❌ Archivo JS no encontrado: {ruta_archivo}") + return None + + with open(ruta_archivo, "r", encoding="utf-8") as f: + js_code = f.read() + + if reemplazos: + for key, value in reemplazos.items(): + js_code = js_code.replace(f"{{{{{key}}}}}", str(value)) + + # 🔧 Eliminamos el `return` externo + js_code_final = f"(async () => {{\n{js_code}\n}})();" + + try: + result = await self._enviar("Runtime.evaluate", { + "expression": js_code_final, + "returnByValue": True + }) + if "exceptionDetails" in result: + raise Exception(result["exceptionDetails"]) + return result.get("result", {}).get("value") + except Exception as e: + print(f"⚠️ Error al inyectar JS desde {ruta_archivo}: {e}") + return None + async def obtener_user_agent(self) -> Optional[str]: return await self.evaluar_js("navigator.userAgent") @@ -76,66 +118,57 @@ class Tab: data = result["data"] with open(output_path, "wb") as f: f.write(base64.b64decode(data)) - print(f"📸 Screenshot guardado como {output_path}") + if self.verbose: + print(f"📸 Screenshot guardado como {output_path}") except Exception as e: print(f"⚠️ Error al capturar screenshot: {e}") async def cerrar(self): try: - await self.websocket.close() - print("🛑 WebSocket cerrado.") + if not self.websocket.closed: + await self.websocket.close() + if self.verbose: + print("🛑 WebSocket cerrado.") except Exception as e: print(f"⚠️ Error al cerrar pestaña: {e}") async def obtener_html_completo(self) -> Optional[str]: - """ - Devuelve el HTML completo de la página actual. - """ try: result = await self._enviar("Runtime.evaluate", { "expression": "document.documentElement.outerHTML", "returnByValue": True }) - html = result["result"]["value"] - print("📄 HTML completo obtenido.") - return html + return result.get("result", {}).get("value") except Exception as e: print(f"⚠️ Error al obtener HTML: {e}") return None - async def obtener_dominio(self) -> Optional[str]: - """ - Devuelve el dominio (hostname) de la página actual, por ejemplo: 'example.com'. - """ try: dominio = await self.evaluar_js("window.location.hostname") - print(f"🌐 Dominio actual: {dominio}") + if self.verbose and dominio: + print(f"🌐 Dominio actual: {dominio}") return dominio except Exception as e: print(f"⚠️ Error al obtener dominio: {e}") return None - async def get_element_by_selector_node(self, selector: str) -> Optional["ElementoWeb"]: try: - # Obtener nodo raíz del documento doc = await self._enviar("DOM.getDocument") root_node_id = doc["root"]["nodeId"] - # Buscar el nodo desde el DOM (más confiable que Runtime.evaluate) result = await self._enviar("DOM.querySelector", { "nodeId": root_node_id, "selector": selector }) - node_id = result["nodeId"] + node_id = result.get("nodeId") if not node_id: print(f"⚠️ Nodo no encontrado con selector: {selector}") return None return ElementoWeb.from_node(self, node_id=node_id) - except Exception as e: print(f"⚠️ Error al buscar nodo desde DOM.querySelector: {e}") return None @@ -157,8 +190,17 @@ class Tab: for prop in props["result"]: if "value" in prop and "objectId" in prop["value"]: elementos.append(ElementoWeb(self, prop["value"]["objectId"])) - print(f"🔍 Se encontraron {len(elementos)} elementos con el selector CSS '{selector}'.") + if self.verbose: + print(f"🔍 Se encontraron {len(elementos)} elementos con el selector CSS '{selector}'.") return elementos except Exception as e: print(f"⚠️ Error al buscar elementos por selector CSS '{selector}': {e}") - return [] \ No newline at end of file + return [] + + async def enfocar(self): + try: + await self._enviar("Page.bringToFront") + if self.verbose: + print("🪟 Pestaña enfocada (bringToFront).") + except Exception as e: + print(f"⚠️ Error al enfocar pestaña: {e}")