763e06c127
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
479 lines
18 KiB
Python
479 lines
18 KiB
Python
"""monitor_freelance_projects — monitor de captacion de clientes freelance.
|
|
|
|
Pipeline one-shot que detecta proyectos freelance NUEVOS, los persiste con dedup en
|
|
DuckDB y los exporta a Excel para revisar. Es la pieza de orquestacion de un monitor
|
|
de captacion de clientes: convierte el patron "scrapear -> normalizar -> persistir
|
|
con dedup -> exportar" en una sola invocacion, agendable con dag_engine.
|
|
|
|
NO reescribe ninguna logica de scraping, persistencia ni exportacion: compone SEIS
|
|
funciones del registry que ya existen, importandolas tal cual.
|
|
|
|
Funciones del registry compuestas (importadas, no reimplementadas):
|
|
scrape_workana_projects (browser) — scrapea Workana via CDP.
|
|
scrape_upwork_projects (browser) — scrapea Upwork via CDP (opcional, tolerante).
|
|
duckdb_execute (infra) — DDL: CREATE TABLE IF NOT EXISTS.
|
|
duckdb_query_readonly (infra) — lee urls existentes + tabla completa para el Excel.
|
|
duckdb_upsert (infra) — UPSERT idempotente por url (dedup + ownership de first_seen_at).
|
|
write_xlsx_sheets (infra) — escribe el .xlsx con hojas "Nuevos" y "Todos".
|
|
|
|
Devuelve SIEMPRE un dict (estilo de los grupos recon/market-intel): nunca lanza.
|
|
NUNCA inventa datos: si Workana falla, propaga el error con contexto.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import unicodedata
|
|
|
|
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
sys.path.insert(0, os.path.join(ROOT, "python", "functions"))
|
|
|
|
from browser.scrape_workana_projects import scrape_workana_projects # noqa: E402
|
|
from browser.scrape_upwork_projects import scrape_upwork_projects # noqa: E402
|
|
from infra.duckdb_execute import duckdb_execute # noqa: E402
|
|
from infra.duckdb_query_readonly import duckdb_query_readonly # noqa: E402
|
|
from infra.duckdb_upsert import duckdb_upsert # noqa: E402
|
|
from infra.write_xlsx_sheets import write_xlsx_sheets # noqa: E402
|
|
|
|
|
|
# Directorio por defecto para la DuckDB y el Excel del monitor. Se deriva con
|
|
# expanduser para no hardcodear ningun home concreto.
|
|
_DEFAULT_DIR = os.path.expanduser(os.path.join("~", ".fn_freelance"))
|
|
_DEFAULT_DB = os.path.join(_DEFAULT_DIR, "freelance.duckdb")
|
|
_DEFAULT_XLSX = os.path.join(_DEFAULT_DIR, "freelance_projects.xlsx")
|
|
|
|
_TABLE = "freelance_projects"
|
|
|
|
# Columnas de la tabla, en el orden del DDL. El upsert usa este orden estable.
|
|
_COLUMNS = [
|
|
"url", # PRIMARY KEY (clave de dedup)
|
|
"source",
|
|
"job_id",
|
|
"title",
|
|
"budget",
|
|
"posted",
|
|
"bids",
|
|
"skills_json",
|
|
"snippet",
|
|
"country",
|
|
"is_custom_software",
|
|
"scraped_at",
|
|
"first_seen_at", # ownership de la DB: se setea al insertar, no se pisa al re-upsert
|
|
]
|
|
|
|
# Columnas que el UPSERT refresca en conflicto: TODAS menos la clave (url) y
|
|
# first_seen_at (la DB es dueña — la primera vez que se vio el proyecto no cambia).
|
|
_UPDATE_COLS = [c for c in _COLUMNS if c not in ("url", "first_seen_at")]
|
|
|
|
# DDL idempotente. url es PRIMARY KEY: imprescindible para que el ON CONFLICT del
|
|
# upsert deduplique por url.
|
|
_DDL = f"""
|
|
CREATE TABLE IF NOT EXISTS {_TABLE} (
|
|
url VARCHAR PRIMARY KEY,
|
|
source VARCHAR,
|
|
job_id VARCHAR,
|
|
title VARCHAR,
|
|
budget VARCHAR,
|
|
posted VARCHAR,
|
|
bids VARCHAR,
|
|
skills_json VARCHAR,
|
|
snippet VARCHAR,
|
|
country VARCHAR,
|
|
is_custom_software BOOLEAN,
|
|
scraped_at VARCHAR,
|
|
first_seen_at VARCHAR
|
|
)
|
|
""".strip()
|
|
|
|
# Keywords fuertes que marcan un proyecto como "software a medida". Se buscan sobre
|
|
# title + snippet + skills, todo en minusculas y sin acentos. El flag SOLO marca
|
|
# (resalta) — no filtra: el usuario quiere ver todo lo de programacion.
|
|
CUSTOM_SW_KEYWORDS = [
|
|
"a medida",
|
|
"custom software",
|
|
"desarrollo de software",
|
|
"mvp",
|
|
"saas",
|
|
"aplicacion web",
|
|
"web app",
|
|
"aplicacion movil",
|
|
"app movil",
|
|
"automatizacion",
|
|
"bot",
|
|
"scraping",
|
|
"integracion api",
|
|
"api rest",
|
|
"sistema de gestion",
|
|
"plataforma",
|
|
"crm",
|
|
"erp",
|
|
"dashboard",
|
|
"backend",
|
|
"fullstack",
|
|
"full stack",
|
|
"microservicio",
|
|
]
|
|
|
|
# Headers legibles (espanol) de las hojas del Excel y el orden de sus columnas.
|
|
_XLSX_HEADERS = [
|
|
"Fuente",
|
|
"Título",
|
|
"Presupuesto",
|
|
"A medida",
|
|
"Publicado",
|
|
"Propuestas",
|
|
"Skills",
|
|
"País",
|
|
"URL",
|
|
"Snippet",
|
|
]
|
|
|
|
|
|
def _strip_accents(text: str) -> str:
|
|
"""Devuelve `text` en minusculas y sin tildes/diacriticos.
|
|
|
|
Normaliza con NFKD y descarta los caracteres combinantes para que el match de
|
|
keywords funcione igual con "aplicación" que con "aplicacion".
|
|
"""
|
|
norm = unicodedata.normalize("NFKD", text)
|
|
return "".join(c for c in norm if not unicodedata.combining(c)).lower()
|
|
|
|
|
|
def _is_custom_software(project: dict) -> bool:
|
|
"""Decide si un proyecto es "software a medida" por sus keywords.
|
|
|
|
Concatena title + snippet + skills del proyecto, lo normaliza (minusculas, sin
|
|
acentos) y devuelve True si alguna de las CUSTOM_SW_KEYWORDS aparece como
|
|
substring. Solo MARCA el proyecto; no lo filtra.
|
|
"""
|
|
skills = project.get("skills") or []
|
|
if not isinstance(skills, list):
|
|
skills = []
|
|
haystack_parts = [
|
|
str(project.get("title") or ""),
|
|
str(project.get("snippet") or ""),
|
|
" ".join(str(s) for s in skills),
|
|
]
|
|
haystack = _strip_accents(" ".join(haystack_parts))
|
|
return any(kw in haystack for kw in CUSTOM_SW_KEYWORDS)
|
|
|
|
|
|
def _normalize_project(project: dict) -> dict:
|
|
"""Convierte un project del scraper en una fila lista para DuckDB.
|
|
|
|
Serializa `skills` (list) a JSON string `skills_json`, calcula
|
|
`is_custom_software` y setea `first_seen_at = scraped_at` (solo se usa al
|
|
insertar; el upsert no lo pisa en conflicto). Devuelve un dict con EXACTAMENTE
|
|
las claves de `_COLUMNS`, en ese orden.
|
|
"""
|
|
skills = project.get("skills") or []
|
|
if not isinstance(skills, list):
|
|
skills = []
|
|
scraped_at = project.get("scraped_at") or ""
|
|
return {
|
|
"url": project.get("url") or "",
|
|
"source": project.get("source") or "",
|
|
"job_id": project.get("job_id") or "",
|
|
"title": project.get("title") or "",
|
|
"budget": project.get("budget") or "",
|
|
"posted": project.get("posted") or "",
|
|
"bids": project.get("bids") or "",
|
|
"skills_json": json.dumps(skills, ensure_ascii=False),
|
|
"snippet": project.get("snippet") or "",
|
|
"country": project.get("country") or "",
|
|
"is_custom_software": _is_custom_software(project),
|
|
"scraped_at": scraped_at,
|
|
"first_seen_at": scraped_at,
|
|
}
|
|
|
|
|
|
def _row_to_xlsx(row: dict) -> list:
|
|
"""Convierte una fila de la tabla en la lista de celdas del Excel.
|
|
|
|
Acepta tanto un dict recien normalizado (skills_json string) como una fila
|
|
leida de la DB. Convierte is_custom_software a "Sí"/"No" y skills_json (JSON
|
|
string) de vuelta a una cadena legible separada por comas.
|
|
"""
|
|
skills_json = row.get("skills_json") or "[]"
|
|
try:
|
|
skills = json.loads(skills_json)
|
|
if not isinstance(skills, list):
|
|
skills = []
|
|
except (ValueError, TypeError):
|
|
skills = []
|
|
skills_str = ", ".join(str(s) for s in skills)
|
|
a_medida = "Sí" if row.get("is_custom_software") else "No"
|
|
return [
|
|
row.get("source") or "",
|
|
row.get("title") or "",
|
|
row.get("budget") or "",
|
|
a_medida,
|
|
row.get("posted") or "",
|
|
row.get("bids") or "",
|
|
skills_str,
|
|
row.get("country") or "",
|
|
row.get("url") or "",
|
|
row.get("snippet") or "",
|
|
]
|
|
|
|
|
|
def monitor_freelance_projects(
|
|
category: str = "it-programming",
|
|
language: str = "es",
|
|
query: str = "",
|
|
pages: int = 1,
|
|
include_upwork: bool = False,
|
|
upwork_query: str = "custom software",
|
|
duckdb_path: str = "",
|
|
xlsx_path: str = "",
|
|
port: int = 9222,
|
|
timeout_s: float = 25.0,
|
|
) -> dict:
|
|
"""Detecta proyectos freelance nuevos, los persiste con dedup y exporta a Excel.
|
|
|
|
Pipeline IMPURO: requiere un Chrome con remote debugging escuchando en `port`
|
|
(los scrapers renderizan SPAs via CDP) y escribe en disco (DuckDB + .xlsx).
|
|
Compone seis funciones del registry y nunca lanza: cualquier fallo se refleja en
|
|
la clave `status` del dict devuelto. NUNCA inventa datos.
|
|
|
|
Pasos:
|
|
1. Scrapea Workana (siempre). Si include_upwork, scrapea Upwork tambien; si
|
|
Upwork falla (status='error'), se loguea y se sigue solo con Workana.
|
|
2. Normaliza cada project: skills -> skills_json (TEXT), anade
|
|
is_custom_software (BOOLEAN) por keywords, first_seen_at = scraped_at.
|
|
3. DDL idempotente (CREATE TABLE IF NOT EXISTS) via duckdb_execute.
|
|
4. Lee las urls ya existentes para identificar QUE proyectos son nuevos, y
|
|
hace UPSERT idempotente por url (dedup; first_seen_at no se pisa).
|
|
5. Lee la tabla completa y escribe un .xlsx con dos hojas: "Nuevos" (solo los
|
|
de esta corrida) y "Todos".
|
|
|
|
Args:
|
|
category: categoria de Workana (?category=). Default "it-programming".
|
|
language: idioma de los proyectos de Workana (?language=). Default "es".
|
|
query: query libre aplicada a ambas fuentes. En Workana se pasa como
|
|
extra_query; en Upwork sobrescribe upwork_query si no esta vacia.
|
|
pages: numero de paginas de listado a recorrer por fuente. Default 1.
|
|
include_upwork: si True, scrapea Upwork ademas de Workana. Default False
|
|
(sus selectores no estan validados en vivo y requiere login).
|
|
upwork_query: query para Upwork cuando include_upwork. Default
|
|
"custom software". `query` lo sobrescribe si se pasa.
|
|
duckdb_path: ruta del archivo DuckDB. Si "", usa ~/.fn_freelance/freelance.duckdb
|
|
(creando el directorio).
|
|
xlsx_path: ruta del .xlsx de salida. Si "", usa
|
|
~/.fn_freelance/freelance_projects.xlsx (creando el directorio).
|
|
port: puerto de remote debugging del Chrome a usar por los scrapers.
|
|
Default 9222 (chromium-personal logueado).
|
|
timeout_s: timeout en segundos por pagina para los scrapers. Default 25.0.
|
|
|
|
Returns:
|
|
dict. En exito::
|
|
|
|
{
|
|
"status": "ok",
|
|
"new_count": int, # proyectos nuevos de esta corrida
|
|
"total_in_db": int, # filas totales en la tabla
|
|
"new_projects": [ {...}, ], # los proyectos nuevos (normalizados)
|
|
"xlsx_path": "<abs>",
|
|
"duckdb_path": "<abs>",
|
|
"sources": {
|
|
"workana": {"count": int, "status": str},
|
|
"upwork": {"count": int, "status": str} | "skipped",
|
|
},
|
|
}
|
|
|
|
En error (sin lanzar): {"status": "error", "error": str, "sources": {...}}.
|
|
"""
|
|
sources_report: dict = {}
|
|
try:
|
|
# Resolver rutas: si vienen vacias, usar los defaults y crear el directorio.
|
|
db_path = os.path.abspath(duckdb_path) if duckdb_path else _DEFAULT_DB
|
|
out_xlsx = os.path.abspath(xlsx_path) if xlsx_path else _DEFAULT_XLSX
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
os.makedirs(os.path.dirname(out_xlsx), exist_ok=True)
|
|
|
|
# --- Paso 1: scrape Workana (siempre). Su fallo es error duro. ---
|
|
wk = scrape_workana_projects(
|
|
category=category,
|
|
language=language,
|
|
extra_query=query,
|
|
pages=pages,
|
|
port=port,
|
|
timeout_s=timeout_s,
|
|
)
|
|
wk_status = wk.get("status", "error")
|
|
wk_projects = wk.get("projects", []) if isinstance(wk, dict) else []
|
|
sources_report["workana"] = {
|
|
"count": len(wk_projects),
|
|
"status": wk_status,
|
|
}
|
|
if wk_status != "ok":
|
|
return {
|
|
"status": "error",
|
|
"error": f"Workana scrape fallo: {wk.get('error', 'sin detalle')}",
|
|
"sources": sources_report,
|
|
}
|
|
|
|
# --- Paso 1b: scrape Upwork (opcional, tolerante a fallo). ---
|
|
all_projects = list(wk_projects)
|
|
if include_upwork:
|
|
uw_q = query or upwork_query
|
|
uw = scrape_upwork_projects(
|
|
query=uw_q,
|
|
pages=pages,
|
|
port=port,
|
|
timeout_s=timeout_s,
|
|
)
|
|
uw_status = uw.get("status", "error") if isinstance(uw, dict) else "error"
|
|
uw_projects = uw.get("projects", []) if isinstance(uw, dict) else []
|
|
sources_report["upwork"] = {
|
|
"count": len(uw_projects),
|
|
"status": uw_status,
|
|
}
|
|
if uw_status == "ok":
|
|
all_projects.extend(uw_projects)
|
|
else:
|
|
# No abortamos: seguimos solo con Workana.
|
|
print(
|
|
f"[monitor_freelance_projects] WARN Upwork no devolvio datos "
|
|
f"(status={uw_status}, error={uw.get('error') if isinstance(uw, dict) else 'n/a'}); "
|
|
f"se continua solo con Workana.",
|
|
file=sys.stderr,
|
|
)
|
|
else:
|
|
sources_report["upwork"] = "skipped"
|
|
|
|
# --- Paso 2: normalizar + enriquecer. Dedup intra-corrida por url. ---
|
|
rows_by_url: dict = {}
|
|
for project in all_projects:
|
|
if not isinstance(project, dict):
|
|
continue
|
|
url = project.get("url")
|
|
if not url:
|
|
continue
|
|
rows_by_url[url] = _normalize_project(project)
|
|
rows = list(rows_by_url.values())
|
|
|
|
# --- Paso 3: DDL idempotente. ---
|
|
ddl_res = duckdb_execute(db_path, _DDL)
|
|
if ddl_res.get("status") != "ok":
|
|
return {
|
|
"status": "error",
|
|
"error": f"DDL fallo: {ddl_res.get('error', 'sin detalle')}",
|
|
"sources": sources_report,
|
|
}
|
|
|
|
# --- Paso 4a: leer urls ya existentes para saber cuales son nuevas. ---
|
|
existing_urls: set = set()
|
|
if rows:
|
|
q_urls = duckdb_query_readonly(
|
|
db_path,
|
|
f"SELECT url FROM {_TABLE}",
|
|
max_rows=1_000_000,
|
|
)
|
|
if q_urls.get("status") != "ok":
|
|
return {
|
|
"status": "error",
|
|
"error": f"lectura de urls existentes fallo: {q_urls.get('error', 'sin detalle')}",
|
|
"sources": sources_report,
|
|
}
|
|
existing_urls = {r.get("url") for r in q_urls.get("rows", [])}
|
|
|
|
new_projects = [r for r in rows if r["url"] not in existing_urls]
|
|
|
|
# --- Paso 4b: UPSERT idempotente por url. ---
|
|
if rows:
|
|
up = duckdb_upsert(
|
|
db_path,
|
|
_TABLE,
|
|
rows,
|
|
key_cols=["url"],
|
|
update_cols=_UPDATE_COLS,
|
|
)
|
|
if up.get("status") != "ok":
|
|
return {
|
|
"status": "error",
|
|
"error": f"upsert fallo: {up.get('error', 'sin detalle')}",
|
|
"sources": sources_report,
|
|
}
|
|
|
|
# --- Paso 5: leer toda la tabla y exportar a Excel. ---
|
|
q_all = duckdb_query_readonly(
|
|
db_path,
|
|
f"SELECT {', '.join(_COLUMNS)} FROM {_TABLE} ORDER BY scraped_at DESC",
|
|
max_rows=1_000_000,
|
|
)
|
|
if q_all.get("status") != "ok":
|
|
return {
|
|
"status": "error",
|
|
"error": f"lectura de la tabla para Excel fallo: {q_all.get('error', 'sin detalle')}",
|
|
"sources": sources_report,
|
|
}
|
|
all_rows_db = q_all.get("rows", [])
|
|
total_in_db = len(all_rows_db)
|
|
|
|
new_urls = {r["url"] for r in new_projects}
|
|
sheet_nuevos = [_row_to_xlsx(r) for r in all_rows_db if r.get("url") in new_urls]
|
|
sheet_todos = [_row_to_xlsx(r) for r in all_rows_db]
|
|
|
|
abs_xlsx = write_xlsx_sheets(
|
|
out_xlsx,
|
|
{
|
|
"Nuevos": {"headers": _XLSX_HEADERS, "rows": sheet_nuevos},
|
|
"Todos": {"headers": _XLSX_HEADERS, "rows": sheet_todos},
|
|
},
|
|
)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"new_count": len(new_projects),
|
|
"total_in_db": total_in_db,
|
|
"new_projects": new_projects,
|
|
"xlsx_path": abs_xlsx,
|
|
"duckdb_path": db_path,
|
|
"sources": sources_report,
|
|
}
|
|
except Exception as e: # noqa: BLE001 — el pipeline nunca lanza
|
|
return {
|
|
"status": "error",
|
|
"error": f"{type(e).__name__}: {e}",
|
|
"sources": sources_report,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
import argparse
|
|
|
|
ap = argparse.ArgumentParser(
|
|
description="Monitor de captacion de clientes freelance (Workana + Upwork -> DuckDB + Excel)."
|
|
)
|
|
ap.add_argument("--category", default="it-programming")
|
|
ap.add_argument("--language", default="es")
|
|
ap.add_argument("--query", default="")
|
|
ap.add_argument("--pages", type=int, default=1)
|
|
ap.add_argument("--include-upwork", action="store_true")
|
|
ap.add_argument("--upwork-query", default="custom software")
|
|
ap.add_argument("--duckdb-path", default="")
|
|
ap.add_argument("--xlsx-path", default="")
|
|
ap.add_argument("--port", type=int, default=9222)
|
|
ap.add_argument("--timeout-s", type=float, default=25.0)
|
|
args = ap.parse_args()
|
|
|
|
out = monitor_freelance_projects(
|
|
category=args.category,
|
|
language=args.language,
|
|
query=args.query,
|
|
pages=args.pages,
|
|
include_upwork=args.include_upwork,
|
|
upwork_query=args.upwork_query,
|
|
duckdb_path=args.duckdb_path,
|
|
xlsx_path=args.xlsx_path,
|
|
port=args.port,
|
|
timeout_s=args.timeout_s,
|
|
)
|
|
print(json.dumps(out, ensure_ascii=False, indent=2))
|
|
return 0 if out.get("status") == "ok" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|