feat: Implement cookie extraction script for Chrome v20 and enhance browser interaction

This commit is contained in:
2025-06-01 15:31:13 +02:00
parent 628cddc3ae
commit e1b756ac99
8 changed files with 717 additions and 64 deletions
+179
View File
@@ -0,0 +1,179 @@
import os
import sys
import json
import binascii
import ctypes
import base64
import sqlite3
import pandas as pd
import pathlib
from Crypto.Cipher import AES, ChaCha20_Poly1305
from pypsexec.client import Client
"""
Este script extrae cookies v20 de Google Chrome y las guarda en un archivo CSV.
Requiere privilegios de administrador para acceder a los datos de Chrome.
Conseguido para poder extraer cookies de Chrome v20, que utiliza un nuevo formato de cifrado.
"""
def is_admin():
try:
return ctypes.windll.shell32.IsUserAnAdmin() != 0
except:
return False
def get_app_bound_key(local_state_path):
with open(local_state_path, "r", encoding="utf-8") as f:
local_state = json.load(f)
return local_state["os_crypt"]["app_bound_encrypted_key"]
def decrypt_app_bound_key(encrypted_key_b64):
arguments = "-c \"" + """import win32crypt
import binascii
encrypted_key = win32crypt.CryptUnprotectData(binascii.a2b_base64('{}'), None, None, None, 0)
print(binascii.b2a_base64(encrypted_key[1]).decode())
""".replace("\n", ";") + "\""
c = Client("localhost")
c.connect()
decrypted_key = None
try:
c.create_service()
assert(binascii.a2b_base64(encrypted_key_b64)[:4] == b"APPB")
stripped_key_b64 = binascii.b2a_base64(binascii.a2b_base64(encrypted_key_b64)[4:]).decode().strip()
encrypted_key_b64_sys, _, _ = c.run_executable(
sys.executable,
arguments=arguments.format(stripped_key_b64),
use_system_account=True
)
decrypted_key_b64, _, _ = c.run_executable(
sys.executable,
arguments=arguments.format(encrypted_key_b64_sys.decode().strip()),
use_system_account=False
)
decrypted_key = binascii.a2b_base64(decrypted_key_b64)[-61:]
finally:
c.remove_service()
c.disconnect()
return decrypted_key
def decrypt_final_key(encrypted_key):
aes_key = bytes.fromhex("B31C6E241AC846728DA9C1FAC4936651CFFB944D143AB816276BCC6DA0284787")
chacha20_key = bytes.fromhex("E98F37D7F4E1FA433D19304DC2258042090E2D1D7EEA7670D41F738D08729660")
flag = encrypted_key[0]
iv = encrypted_key[1:13]
ciphertext = encrypted_key[13:45]
tag = encrypted_key[45:]
if flag == 1:
cipher = AES.new(aes_key, AES.MODE_GCM, nonce=iv)
elif flag == 2:
cipher = ChaCha20_Poly1305.new(key=chacha20_key, nonce=iv)
else:
raise ValueError(f"Unsupported flag: {flag}")
return cipher.decrypt_and_verify(ciphertext, tag)
def decrypt_cookie_v20(encrypted_value, key):
cookie_iv = encrypted_value[3:15]
encrypted_cookie = encrypted_value[15:-16]
cookie_tag = encrypted_value[-16:]
cookie_cipher = AES.new(key, AES.MODE_GCM, nonce=cookie_iv)
decrypted_cookie = cookie_cipher.decrypt_and_verify(encrypted_cookie, cookie_tag)
return decrypted_cookie[32:].decode('utf-8')
def extract_all_v20_cookies():
user_profile = os.environ['USERPROFILE']
local_state_path = rf"{user_profile}\AppData\Local\Google\Chrome\User Data\Local State"
base_profile_path = rf"{user_profile}\AppData\Local\Google\Chrome\User Data"
app_bound_key_b64 = get_app_bound_key(local_state_path)
decrypted_key_raw = decrypt_app_bound_key(app_bound_key_b64)
final_key = decrypt_final_key(decrypted_key_raw)
perfiles_invalidos = {"System Profile", "Guest Profile", "CrashpadMetrics"}
perfiles = [
name for name in os.listdir(base_profile_path)
if os.path.isdir(os.path.join(base_profile_path, name))
and name not in perfiles_invalidos
and os.path.exists(os.path.join(base_profile_path, name, "Network", "Cookies"))
]
all_cookies = []
for profile in perfiles:
db_path = os.path.join(base_profile_path, profile, "Network", "Cookies")
con = sqlite3.connect(pathlib.Path(db_path).as_uri() + "?mode=ro", uri=True)
cur = con.cursor()
r = cur.execute("SELECT host_key, name, path, is_secure, is_httponly, expires_utc, last_access_utc, CAST(encrypted_value AS BLOB) from cookies;")
cookies = cur.fetchall()
con.close()
for row in cookies:
host, name, path, is_secure, is_httponly, expires_utc, last_access_utc, encrypted_value = row
encrypted_value_b64 = base64.b64encode(encrypted_value).decode()
if encrypted_value.startswith(b"v20"):
try:
value = decrypt_cookie_v20(encrypted_value, final_key)
print(f"[✓] {host} {name}: {value}")
all_cookies.append({
"host": host,
"name": name,
"path": path,
"value": value,
"encrypted_value_b64": encrypted_value_b64,
"expires_utc": expires_utc,
"is_secure": is_secure,
"is_httponly": is_httponly,
"last_access_utc": last_access_utc,
"profile": profile,
"is_decrypted": True,
"decrypt_error": ""
})
except Exception as e:
print(f"[x] Error decrypting {host} {name}: {e}")
all_cookies.append({
"host": host,
"name": name,
"path": path,
"value": "",
"encrypted_value_b64": encrypted_value_b64,
"expires_utc": expires_utc,
"is_secure": is_secure,
"is_httponly": is_httponly,
"last_access_utc": last_access_utc,
"profile": profile,
"is_decrypted": False,
"decrypt_error": str(e)
})
return pd.DataFrame(all_cookies)
if __name__ == "__main__":
if not is_admin():
input("Este script necesita ejecutarse como administrador. Presiona Enter para reiniciar con privilegios...")
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join([sys.argv[0]] + sys.argv[1:]), None, 1)
sys.exit()
print("[*] Extrayendo cookies v20 desde todos los perfiles...")
df = extract_all_v20_cookies()
df.to_csv("cookies_extraidas.csv", index=False, encoding="utf-8")
print(f"[✓] Cookies v20 extraídas: {len(df)}")
print("[✓] Guardado en 'cookies_extraidas.csv'")
@@ -0,0 +1,87 @@
import asyncio
import os
import pyperclip
import re
from src.ScrappingWeb.Scrapper import Scrapper
def sanitizar(nombre: str) -> str:
return re.sub(r'[\\/*?:"<>|]', "_", nombre).strip()[:100]
OUTPUT_DIR = "esquemas_json"
os.makedirs(OUTPUT_DIR, exist_ok=True)
async def main():
ws_id = "F51AC05B27E1DEC4011E67369781596C"
ws_url = f"ws://127.0.0.1:9222/devtools/page/{ws_id}"
scrapper = Scrapper(debugging_url="http://127.0.0.1:9222")
print("🔌 Conectando a pestaña específica...")
tab = scrapper.get_tab(ws_url) or scrapper.get_tab(ws_id)
if not tab:
nuevas_tabs = await scrapper.obtener_tabs_existentes()
tab = next((t for t in nuevas_tabs if t.ws_url.rsplit("/", 1)[-1] == ws_id), None)
if not tab:
print("⚠️ La pestaña con ese ID no se encontró.")
return
elementos = await tab.get_elements_by_css_selector(
"#_0rif_bq-resource-tree > div.cfctest-tree-main.ng-tns-c3578326070-0 > ul > cfc-virtual-scroller > div > div.item-container > div > li"
)
for i, elemento in enumerate(elementos[:12]):
print(f"🖱️ Click #{i + 1}")
clickeable = await elemento.encontrar_hijo_clickeable()
if clickeable:
await clickeable.click()
else:
print(f"⚠️ No se encontró subelemento clickeable en #{i+1}")
continue
await asyncio.sleep(1)
texto_crudo = await elemento.obtener_texto()
nombre_archivo = sanitizar(texto_crudo or f"esquema_item_{i+1}")
print(f"📄 Nombre base del archivo: {nombre_archivo}.txt")
# ✅ Ejecutar JS en el navegador para simular flujo de copia
await tab.evaluar_js("""
(() => {
const boton = document.querySelector('button[id^="_0rif_bqui-table-copy-schema-btn"] span.mdc-button__label > span');
if (boton) boton.click();
})()
""")
await asyncio.sleep(1)
await tab.evaluar_js("""
(() => {
const overlays = document.querySelectorAll("div.cdk-overlay-pane");
for (let overlay of overlays) {
const items = overlay.querySelectorAll("cfc-menu-item .cfc-menu-item-label");
for (let item of items) {
if (item.textContent.includes("Copiar como JSON")) {
item.click();
break;
}
}
}
})()
""")
await asyncio.sleep(1.5)
try:
texto_json = pyperclip.paste()
file_path = os.path.join(OUTPUT_DIR, f"{nombre_archivo}.txt")
with open(file_path, "w", encoding="utf-8") as f:
f.write(texto_json)
print(f"✅ Guardado: {file_path}")
except Exception as e:
print(f"❌ Error al leer el portapapeles o guardar archivo: {e}")
if __name__ == "__main__":
asyncio.run(main())
+80
View File
@@ -0,0 +1,80 @@
import subprocess
import os
import time
import signal
def iniciar_chrome(chrome_path,
user_data_dir,
headless=False,
debugging_port=9222,
user_agent=None,
):
# Asegúrate de que el directorio del perfil exista
os.makedirs(user_data_dir, exist_ok=True)
# Lista de argumentos para Chrome
chrome_args = [
f"--remote-debugging-port={debugging_port}",
f"--user-data-dir={user_data_dir}",
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-web-security",
"--disable-extensions",
"--disable-dev-shm-usage",
"--disable-infobars",
"--disable-popup-blocking",
"--disable-default-apps",
"--mute-audio",
"--window-size=1024,1024",
]
if not headless:
pass
else:
chrome_args.append("--headless=new") # para versiones recientes de Chrome
if not user_agent:
pass
else:
chrome_args.append(f"--user-agent={user_agent}")
# Comando para iniciar Chrome
chrome_process = subprocess.Popen([chrome_path] + chrome_args)
try:
print(f"Chrome iniciado (headless={headless}). Presiona Ctrl+C para salir.")
while True:
if chrome_process.poll() is not None:
print("Chrome se ha cerrado.")
break
time.sleep(1)
except KeyboardInterrupt:
print("Terminando proceso de Chrome...")
chrome_process.terminate()
try:
chrome_process.wait(timeout=5)
except subprocess.TimeoutExpired:
chrome_process.kill()
print("Chrome cerrado correctamente.")
# Ruta al ejecutable de Chrome
chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
# Directorio para el perfil de usuario
user_data_dir = os.path.abspath("./Perfiles_usuario/chrome_profile")
# Puerto para la depuración remota
port = 9222
user_agent= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"
# Llama a la función con True o False
iniciar_chrome(chrome_path=chrome_path,
user_data_dir=user_data_dir,
debugging_port=port,
headless=False,
user_agent=user_agent) # Cambia a True para modo headless
+122
View File
@@ -0,0 +1,122 @@
import asyncio
import os
import re
from src.ScrappingWeb.Navegador import Navegador
from src.ScrappingWeb.Scrapper import Scrapper
from src.ScrappingWeb.Tab import Tab
import aiohttp
import csv
async def esperar_chrome_listo(port, timeout=10):
url = f"http://127.0.0.1:{port}/json"
for _ in range(timeout * 2):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
return
except Exception:
pass
await asyncio.sleep(0.5)
raise TimeoutError(f"Chrome en puerto {port} no respondió dentro del tiempo esperado.")
chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
def sanitizar_nombre(nombre: str) -> str:
# Eliminar caracteres inválidos para nombre de archivo
return re.sub(r'[\\/*?:"<>|]', "_", nombre).strip()[:100]
async def iniciar_y_scrapear(id: int):
user_data_dir = os.path.abspath(f"./Perfiles_usuario/chrome_profile_{id}")
port = 9222 + id
navegador = Navegador(
chrome_path=chrome_path,
user_data_dir=user_data_dir,
id=id,
download_dir=os.path.join(user_data_dir, "downloads"),
debugging_port=port,
headless=False,
user_agent=f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/{100+id}.0.0.0 Safari/537.36"
)
# Iniciar navegador en background
asyncio.create_task(navegador.iniciar())
# Esperamos a que el navegador esté listo
await esperar_chrome_listo(port)
# Conectarse con el scraper al navegador
scrapper = Scrapper(debugging_url=f"http://127.0.0.1:{port}")
tab = await scrapper.nueva_tab("", wait_time=6)
# Ejecutar acciones desde la clase Tab
ua = await tab.obtener_user_agent()
print(f"🧭 [{id}] User-Agent:", ua)
title = await tab.evaluar_js("document.title")
print(f"📄 [{id}] Título:", title)
# botones= await tab.get_elements_by_css_selector("#mw-content-text > div.mw-content-ltr.mw-parser-output > figure:nth-child(27) > a > img")
# for boton in botones:
# await boton.click()
# # Crear carpeta si no existe
# os.makedirs("wikipedia_md", exist_ok=True)
# # Guardar el HTML completo
# html = await tab.obtener_html_completo()
# with open(f"contenido.html", "w", encoding="utf-8") as f:
# f.write(html)
# # Leer enlaces del CSV
# with open("enlaces_extraidos.csv", "r", encoding="utf-8") as f:
# reader = csv.reader(f)
# next(reader) # saltar encabezados
# enlaces = list(reader)
# for texto, enlace in enlaces:
# nombre_archivo = sanitizar_nombre(texto or "sin_titulo") + ".png"
# ruta_archivo = os.path.join("wikipedia", nombre_archivo)
# try:
# print(f"🌐 Visitando: {enlace}")
# tab = await scrapper.nueva_tab(enlace, wait_time=6)
# await tab.capturar_screenshot(ruta_archivo)
# print(f"📸 Captura guardada: {ruta_archivo}")
# await tab.cerrar()
# except Exception as e:
# print(f"❌ Error con {enlace}: {e}")
# await tab.capturar_screenshot(f"screenshot_{id}.png")
# html = await tab.obtener_html_completo()
# print(html)
# with open("contenido.html", "w", encoding="utf-8") as f:
# f.write(html)
# Extraer enlaces y guardarlos en CSV
# # # Cerrar tab y navegador si quieres
# await asyncio.sleep(10)
# await tab.cerrar()
# await navegador.cerrar()
async def main():
tareas = [iniciar_y_scrapear(i) for i in range(1)]
await asyncio.gather(*tareas)
if __name__ == "__main__":
asyncio.run(main())
+100 -26
View File
@@ -1,54 +1,58 @@
from typing import TYPE_CHECKING, Optional
import random
import asyncio
import json
if TYPE_CHECKING:
from src.ScrappingWeb.Tab import Tab
from .Tab import Tab
class ElementoWeb:
def __init__(self, tab: "Tab", object_id: str):
def __init__(self, tab: "Tab", object_id: Optional[str]):
self.tab = tab
self.object_id = object_id
self._node_id = None # Lazy resolved
@classmethod
def from_node(cls, tab: "Tab", node_id: int) -> "ElementoWeb":
inst = cls(tab, object_id=None)
inst._node_id = node_id
return inst
async def _asegurar_object_id(self):
if not self.object_id and self._node_id:
try:
resolved = await self.tab._enviar("DOM.resolveNode", {"nodeId": self._node_id})
self.object_id = resolved["object"]["objectId"]
except Exception as e:
print(f"⚠️ No se pudo resolver objectId desde nodeId: {e}")
async def scroll_into_view(self):
try:
await self._asegurar_object_id()
await self.tab._enviar("Runtime.callFunctionOn", {
"objectId": self.object_id,
"functionDeclaration": "function() { this.scrollIntoView({block: 'center'}); }",
"awaitPromise": True
})
print("📜 Elemento desplazado a la vista.")
if self.tab.verbose:
print("📜 Elemento desplazado a la vista.")
except Exception as e:
print(f"⚠️ Error al hacer scroll hacia el elemento: {e}")
@classmethod
def from_node(cls, tab: "Tab", node_id: int) -> "ElementoWeb":
# Creamos un objectId a partir del nodeId usando DOM.resolveNode
cls._node_id = node_id
cls._resolved_object_id = None # Lazy resolution opcional
return cls(tab, object_id=None)
async def click(self):
try:
await self.scroll_into_view()
# Resolver objectId si es necesario
if not self.object_id and hasattr(self, "_node_id"):
resolved = await self.tab._enviar("DOM.resolveNode", {"nodeId": self._node_id})
self.object_id = resolved["object"]["objectId"]
await self._asegurar_object_id()
if not self.object_id:
raise ValueError("No se puede obtener objectId del elemento para hacer click.")
# Obtener nodeId
# Intenta obtener coordenadas del nodo
node_result = await self.tab._enviar("DOM.describeNode", {
"objectId": self.object_id
})
node_id = node_result["node"]["nodeId"]
# Obtener coordenadas con fallback
try:
box_model = await self.tab._enviar("DOM.getBoxModel", {"nodeId": node_id})
content = box_model["model"]["content"]
@@ -60,7 +64,12 @@ class ElementoWeb:
x = (quad[0] + quad[4]) / 2
y = (quad[1] + quad[5]) / 2
# Simular movimiento humano del mouse
# 🧠 Enfocar el elemento antes de clickear
await self.tab._enviar("DOM.focus", {
"objectId": self.object_id
})
# 🎯 Movimiento humanoide opcional
start_x, start_y = x + random.uniform(-100, 100), y + random.uniform(-100, 100)
steps = random.randint(5, 12)
for i in range(1, steps + 1):
@@ -73,7 +82,7 @@ class ElementoWeb:
})
await asyncio.sleep(random.uniform(0.01, 0.05))
# Click humano
# 👆 Mouse Down
await self.tab._enviar("Input.dispatchMouseEvent", {
"type": "mousePressed",
"x": x,
@@ -81,7 +90,10 @@ class ElementoWeb:
"button": "left",
"clickCount": 1
})
await asyncio.sleep(random.uniform(0.05, 0.15))
# 👇 Mouse Up
await self.tab._enviar("Input.dispatchMouseEvent", {
"type": "mouseReleased",
"x": x,
@@ -90,27 +102,89 @@ class ElementoWeb:
"clickCount": 1
})
print(f"🖱️ Click humano simulado en ({x:.1f}, {y:.1f})")
await asyncio.sleep(random.uniform(0.01, 0.05))
# 🖱️ Click manual adicional
await self.tab._enviar("Input.dispatchMouseEvent", {
"type": "mouseClicked",
"x": x,
"y": y,
"button": "left",
"clickCount": 1
})
if self.tab.verbose:
print(f"🖱️ Click humano simulado en ({x:.1f}, {y:.1f})")
except Exception as e:
print(f"⚠️ Error al hacer click físico: {e}")
print("🧪 Intentando fallback con JavaScript click()...")
await self.click_js()
async def click_js(self):
try:
await self._asegurar_object_id()
if not self.object_id:
print("⚠️ No se puede hacer click JS: objectId no disponible.")
return
await self.tab._enviar("Runtime.callFunctionOn", {
"objectId": self.object_id,
"functionDeclaration": "function() { this.click(); }",
"awaitPromise": True
})
print("🖱️ Click simulado por JavaScript (element.click())")
if self.tab.verbose:
print("🖱️ Click simulado por JavaScript (element.click())")
except Exception as e:
print(f"⚠️ Error al ejecutar click en JS: {e}")
async def obtener_texto(self) -> Optional[str]:
return await self.tab.evaluar_js(f'document.getElementById("{self.object_id}").textContent')
try:
await self._asegurar_object_id()
result = await self.tab._enviar("Runtime.callFunctionOn", {
"objectId": self.object_id,
"functionDeclaration": "function() { return this.textContent; }",
"returnByValue": True
})
return result.get("result", {}).get("value")
except Exception as e:
print(f"⚠️ Error al obtener texto del elemento: {e}")
return None
async def escribir_texto(self, texto: str):
await self.tab.evaluar_js(f'document.getElementById("{self.object_id}").value = "{texto}"')
try:
await self._asegurar_object_id()
await self.tab._enviar("Runtime.callFunctionOn", {
"objectId": self.object_id,
"functionDeclaration": f"function() {{ this.value = {json.dumps(texto)}; this.dispatchEvent(new Event('input')); }}",
"awaitPromise": True
})
if self.tab.verbose:
print(f"⌨️ Texto escrito en elemento: '{texto}'")
except Exception as e:
print(f"⚠️ Error al escribir texto: {e}")
async def encontrar_hijo_clickeable(self) -> Optional["ElementoWeb"]:
try:
await self._asegurar_object_id()
resultado = await self.tab._enviar("Runtime.callFunctionOn", {
"objectId": self.object_id,
"functionDeclaration": """
function() {
const candidatos = this.querySelectorAll("span, div, a, button");
for (const el of candidatos) {
const style = window.getComputedStyle(el);
const visible = style.display !== "none" && style.visibility !== "hidden";
const interactivo = style.pointerEvents !== "none";
if (visible && interactivo) return el;
}
return this;
}
""",
"returnByValue": False
})
if "result" in resultado and "objectId" in resultado["result"]:
return ElementoWeb(self.tab, resultado["result"]["objectId"])
except Exception as e:
print(f"⚠️ No se pudo encontrar hijo clickeable: {e}")
return None
+2 -2
View File
@@ -87,9 +87,9 @@ class Navegador:
f"--user-data-dir={self.user_data_dir}",
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-web-security",
# "--disable-web-security",
# "--disable-extensions",
"--disable-dev-shm-usage",
# "--disable-dev-shm-usage",
"--disable-infobars",
"--disable-popup-blocking",
"--disable-default-apps",
+73 -4
View File
@@ -2,7 +2,10 @@ import aiohttp
import websockets
import json
import asyncio
from src.ScrappingWeb.Tab import Tab
from .Tab import Tab
from typing import Optional
class Scrapper:
def __init__(self, debugging_url: str = "http://127.0.0.1:9222"):
@@ -56,14 +59,80 @@ class Scrapper:
raise RuntimeError("No se pudo obtener el WebSocket de la nueva pestaña")
async def nueva_tab(self, url: str, wait_time: float = 5.0) -> Tab:
async def nueva_tab(self, url: str = "", wait_time: float = 5.0) -> Tab:
websocket_url = await self._crear_tab_websocket_url()
tab = await Tab.crear_desde_websocket(websocket_url)
self.tabs.append(tab)
await tab.navegar(url, wait_time)
if url:
print(f"🌍 Navegando a: {url}")
await tab.navegar(url, wait_time)
else:
print("⚠️ No se especificó URL. La pestaña se creó pero no se navegó a ninguna página.")
return tab
async def cerrar_todos(self):
for tab in list(self.tabs):
await tab.cerrar()
self.tabs.clear()
self.tabs.clear()
def get_tab(self, identifier: str) -> Optional[Tab]:
"""
Devuelve una instancia de Tab según su WebSocket URL o su ID final (extraído del WebSocket URL).
Acepta:
- ws_url completo: ws://127.0.0.1:9222/devtools/page/XYZ
- id directo: XYZ
"""
for tab in self.tabs:
# Comparar directamente contra ws_url
if tab.ws_url == identifier:
return tab
# Comparar contra el ID extraído
ws_id = tab.ws_url.rsplit("/", 1)[-1]
if ws_id == identifier:
return tab
return None
async def obtener_tabs_existentes(self) -> list[Tab]:
"""
Recupera todas las pestañas de tipo 'page' que no están ya en self.tabs,
las conecta y devuelve como lista. Muestra resumen limpio por consola.
"""
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.debugging_url}/json") as resp:
if resp.status != 200:
raise RuntimeError("No se pudo obtener la lista de pestañas")
tabs_info = await resp.json()
print("\n🧾 Pestañas activas (filtradas: solo 'type': 'page'):\n")
nuevas_tabs = []
for idx, tab_info in enumerate(tabs_info, start=1):
tipo = tab_info.get("type")
if tipo != "page":
continue # Filtrar todo lo que no sea página visible
ws_url = tab_info.get("webSocketDebuggerUrl")
tab_id = tab_info.get("id")
title = tab_info.get("title", "<Sin título>")
url = tab_info.get("url", "<Sin URL>")
# Verifica si ya la tienes cargada
if any(t.ws_url == ws_url for t in self.tabs):
continue
# Conectar
try:
tab = await Tab.crear_desde_websocket(ws_url)
self.tabs.append(tab)
nuevas_tabs.append(tab)
except Exception as e:
print(f"⚠️ No se pudo conectar a pestaña {tab_id}: {e}")
if not nuevas_tabs:
print("⚠️ No se encontraron nuevas pestañas para agregar.\n")
return nuevas_tabs
+74 -32
View File
@@ -2,21 +2,29 @@ import asyncio
import json
import base64
import websockets
from typing import Optional
from typing import List
from src.ScrappingWeb.ElementoWeb import ElementoWeb
from typing import Optional, List
from .ElementoWeb import ElementoWeb
import os
class Tab:
def __init__(self, websocket: websockets.WebSocketClientProtocol, ws_url: str):
def __init__(self, websocket: websockets.WebSocketClientProtocol, ws_url: str, verbose: bool = True):
self.websocket = websocket
self.ws_url = ws_url
self._message_id = 0
self._pending = {}
self._load_event = asyncio.Event()
self.verbose = verbose
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.cerrar()
@classmethod
async def crear_desde_websocket(cls, ws_url: str) -> "Tab":
websocket = await websockets.connect(ws_url)
websocket = await websockets.connect(ws_url, max_size=10 * 1024 * 1024)
tab = cls(websocket, ws_url)
asyncio.create_task(tab._recibir_eventos())
await tab._enviar("Page.enable")
@@ -28,11 +36,14 @@ class Tab:
data = json.loads(mensaje)
if "id" in data and data["id"] in self._pending:
future = self._pending.pop(data["id"])
future.set_result(data.get("result"))
if "result" in data:
future.set_result(data["result"])
elif "error" in data:
future.set_exception(Exception(data["error"]))
elif data.get("method") == "Page.loadEventFired":
self._load_event.set()
async def _enviar(self, metodo: str, parametros: Optional[dict] = None) -> dict:
async def _enviar(self, metodo: str, parametros: Optional[dict] = None, timeout: float = 10.0) -> dict:
self._message_id += 1
msg_id = self._message_id
mensaje = {
@@ -44,15 +55,17 @@ class Tab:
future = asyncio.get_event_loop().create_future()
self._pending[msg_id] = future
await self.websocket.send(json.dumps(mensaje))
return await future
return await asyncio.wait_for(future, timeout=timeout)
async def navegar(self, url: str, wait_time: float = 5.0):
self._load_event.clear()
print(f"🌍 Navegando a: {url}")
if self.verbose:
print(f"🌍 Navegando a: {url}")
await self._enviar("Page.navigate", {"url": url})
try:
await asyncio.wait_for(self._load_event.wait(), timeout=wait_time)
print("✅ Página cargada correctamente.")
if self.verbose:
print("✅ Página cargada correctamente.")
except asyncio.TimeoutError:
print(f"⚠️ Tiempo de espera agotado ({wait_time}s) al cargar la página.")
@@ -62,11 +75,40 @@ class Tab:
"expression": js_code,
"returnByValue": True
})
return result["result"]["value"]
if "exceptionDetails" in result:
raise Exception(result["exceptionDetails"])
return result.get("result", {}).get("value")
except Exception as e:
print(f"⚠️ Error al ejecutar JS: {e}")
return None
async def inyectar_archivo_js(self, ruta_archivo: str, reemplazos: dict = None) -> Optional[str]:
if not os.path.exists(ruta_archivo):
print(f"❌ Archivo JS no encontrado: {ruta_archivo}")
return None
with open(ruta_archivo, "r", encoding="utf-8") as f:
js_code = f.read()
if reemplazos:
for key, value in reemplazos.items():
js_code = js_code.replace(f"{{{{{key}}}}}", str(value))
# 🔧 Eliminamos el `return` externo
js_code_final = f"(async () => {{\n{js_code}\n}})();"
try:
result = await self._enviar("Runtime.evaluate", {
"expression": js_code_final,
"returnByValue": True
})
if "exceptionDetails" in result:
raise Exception(result["exceptionDetails"])
return result.get("result", {}).get("value")
except Exception as e:
print(f"⚠️ Error al inyectar JS desde {ruta_archivo}: {e}")
return None
async def obtener_user_agent(self) -> Optional[str]:
return await self.evaluar_js("navigator.userAgent")
@@ -76,66 +118,57 @@ class Tab:
data = result["data"]
with open(output_path, "wb") as f:
f.write(base64.b64decode(data))
print(f"📸 Screenshot guardado como {output_path}")
if self.verbose:
print(f"📸 Screenshot guardado como {output_path}")
except Exception as e:
print(f"⚠️ Error al capturar screenshot: {e}")
async def cerrar(self):
try:
await self.websocket.close()
print("🛑 WebSocket cerrado.")
if not self.websocket.closed:
await self.websocket.close()
if self.verbose:
print("🛑 WebSocket cerrado.")
except Exception as e:
print(f"⚠️ Error al cerrar pestaña: {e}")
async def obtener_html_completo(self) -> Optional[str]:
"""
Devuelve el HTML completo de la página actual.
"""
try:
result = await self._enviar("Runtime.evaluate", {
"expression": "document.documentElement.outerHTML",
"returnByValue": True
})
html = result["result"]["value"]
print("📄 HTML completo obtenido.")
return html
return result.get("result", {}).get("value")
except Exception as e:
print(f"⚠️ Error al obtener HTML: {e}")
return None
async def obtener_dominio(self) -> Optional[str]:
"""
Devuelve el dominio (hostname) de la página actual, por ejemplo: 'example.com'.
"""
try:
dominio = await self.evaluar_js("window.location.hostname")
print(f"🌐 Dominio actual: {dominio}")
if self.verbose and dominio:
print(f"🌐 Dominio actual: {dominio}")
return dominio
except Exception as e:
print(f"⚠️ Error al obtener dominio: {e}")
return None
async def get_element_by_selector_node(self, selector: str) -> Optional["ElementoWeb"]:
try:
# Obtener nodo raíz del documento
doc = await self._enviar("DOM.getDocument")
root_node_id = doc["root"]["nodeId"]
# Buscar el nodo desde el DOM (más confiable que Runtime.evaluate)
result = await self._enviar("DOM.querySelector", {
"nodeId": root_node_id,
"selector": selector
})
node_id = result["nodeId"]
node_id = result.get("nodeId")
if not node_id:
print(f"⚠️ Nodo no encontrado con selector: {selector}")
return None
return ElementoWeb.from_node(self, node_id=node_id)
except Exception as e:
print(f"⚠️ Error al buscar nodo desde DOM.querySelector: {e}")
return None
@@ -157,8 +190,17 @@ class Tab:
for prop in props["result"]:
if "value" in prop and "objectId" in prop["value"]:
elementos.append(ElementoWeb(self, prop["value"]["objectId"]))
print(f"🔍 Se encontraron {len(elementos)} elementos con el selector CSS '{selector}'.")
if self.verbose:
print(f"🔍 Se encontraron {len(elementos)} elementos con el selector CSS '{selector}'.")
return elementos
except Exception as e:
print(f"⚠️ Error al buscar elementos por selector CSS '{selector}': {e}")
return []
return []
async def enfocar(self):
try:
await self._enviar("Page.bringToFront")
if self.verbose:
print("🪟 Pestaña enfocada (bringToFront).")
except Exception as e:
print(f"⚠️ Error al enfocar pestaña: {e}")