149 lines
5.1 KiB
Python
149 lines
5.1 KiB
Python
import requests
|
|
from bs4 import BeautifulSoup
|
|
from urllib.parse import urljoin, urlparse
|
|
from sqlalchemy.orm import sessionmaker
|
|
from crawler_db_model import WebsPorVisitar, WebsVisitadas, engine
|
|
from dotenv import load_dotenv
|
|
import os
|
|
import hashlib
|
|
import re
|
|
|
|
# Cargar variables de entorno
|
|
load_dotenv()
|
|
|
|
# Crear sesión de base de datos
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
def extract_urls(url):
|
|
"""Extrae todas las URLs de una página web."""
|
|
try:
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
}
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
if response.status_code != 200:
|
|
return [], response.status_code
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
urls = set()
|
|
for link in soup.find_all('a', href=True):
|
|
full_url = urljoin(url, link['href'])
|
|
parsed_url = urlparse(full_url)
|
|
# Filtrar URLs inválidas o fuera de dominio
|
|
if parsed_url.scheme in ['http', 'https']:
|
|
urls.add(full_url)
|
|
|
|
return list(urls), response.status_code
|
|
except Exception as e:
|
|
print(f"Error al procesar {url}: {e}")
|
|
return [], None
|
|
|
|
def hash_content(content):
|
|
"""Genera un hash SHA-256 del contenido para verificar duplicados."""
|
|
return hashlib.sha256(content.encode('utf-8')).hexdigest()
|
|
|
|
def generate_summary(soup):
|
|
"""Genera un resumen más detallado del contenido de la página."""
|
|
# Extraer título
|
|
titulo = soup.title.string if soup.title else "Sin título"
|
|
|
|
# Extraer meta descripción
|
|
meta_description = soup.find('meta', attrs={'name': 'description'})
|
|
descripcion = meta_description['content'] if meta_description else "No hay descripción disponible."
|
|
|
|
# Extraer encabezados principales
|
|
headers = {
|
|
'h1': [h1.get_text(strip=True) for h1 in soup.find_all('h1')],
|
|
'h2': [h2.get_text(strip=True) for h2 in soup.find_all('h2')],
|
|
'h3': [h3.get_text(strip=True) for h3 in soup.find_all('h3')]
|
|
}
|
|
|
|
# Extraer texto de párrafos principales
|
|
paragraphs = [p.get_text(strip=True) for p in soup.find_all('p')][:5]
|
|
|
|
# Generar un resumen en texto plano
|
|
resumen_texto = f"Título: {titulo}. Descripción: {descripcion}. Encabezados: {', '.join(headers['h1'][:3])}."
|
|
|
|
# Resumen estructurado
|
|
summary_json = {
|
|
"titulo": titulo,
|
|
"descripcion": descripcion,
|
|
"encabezados": headers,
|
|
"parrafos": paragraphs
|
|
}
|
|
|
|
return resumen_texto, summary_json
|
|
|
|
def process_url(url):
|
|
"""Procesa una URL: extrae datos y las inserta en las tablas correspondientes."""
|
|
try:
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
}
|
|
urls, status_code = extract_urls(url)
|
|
|
|
# Obtener datos del sitio
|
|
response = requests.get(url, headers=headers, timeout=10)
|
|
if response.status_code != 200:
|
|
return
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
dominio = urlparse(url).netloc
|
|
resumen_texto, resumen_json = generate_summary(soup)
|
|
ip = requests.get(f"https://api.ipify.org?domain={dominio}&format=json").json().get('ip', "")
|
|
contenido_hash = hash_content(response.text)
|
|
|
|
# Insertar en WebsVisitadas
|
|
visitada = WebsVisitadas(
|
|
url=url,
|
|
dominio=dominio,
|
|
titulo=resumen_json['titulo'],
|
|
resumen=resumen_texto, # Resumen en texto plano
|
|
ip=ip,
|
|
codigo_http=status_code,
|
|
contenido_hash=contenido_hash
|
|
)
|
|
session.add(visitada)
|
|
|
|
# Insertar nuevas URLs en WebsPorVisitar si no están repetidas
|
|
for new_url in urls:
|
|
if not session.query(WebsPorVisitar).filter_by(url=new_url).first() and \
|
|
not session.query(WebsVisitadas).filter_by(url=new_url).first():
|
|
session.add(WebsPorVisitar(url=new_url))
|
|
|
|
session.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Error procesando {url}: {e}")
|
|
session.rollback()
|
|
|
|
def crawl():
|
|
"""Crawl principal que procesa URLs hasta agotar la lista de webs por visitar."""
|
|
while True:
|
|
# Obtener la siguiente URL a visitar
|
|
web_por_visitar = session.query(WebsPorVisitar).first()
|
|
if not web_por_visitar:
|
|
print("No hay más webs por visitar.")
|
|
break
|
|
|
|
url = web_por_visitar.url
|
|
print(f"Visitando: {url}")
|
|
|
|
# Procesar la URL y eliminarla de la lista de pendientes
|
|
process_url(url)
|
|
session.delete(web_por_visitar)
|
|
session.commit()
|
|
|
|
if __name__ == "__main__":
|
|
# URL inicial
|
|
url_inicial = "https://datos.bancomundial.org/"
|
|
|
|
# Insertar URL inicial si no existe
|
|
if not session.query(WebsPorVisitar).filter_by(url=url_inicial).first():
|
|
session.add(WebsPorVisitar(url=url_inicial))
|
|
session.commit()
|
|
|
|
# Iniciar el crawler
|
|
crawl()
|