feat(browser): auto-commit con 178 cambios

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-20 18:22:23 +02:00
parent 7d100e7f3e
commit 763e06c127
178 changed files with 19917 additions and 317 deletions
@@ -0,0 +1,86 @@
---
name: add_contact_dav
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def add_contact_dav(name: str, *, tels=None, emails=None, adrs=None, org='', note='', uid='', base_url=DEFAULT_BASE_URL, username=DEFAULT_USERNAME, collection_path=DEFAULT_COLLECTION, secret_path='dav/xandikos-enmanuel', timeout_s=20.0, verify_tls=True) -> dict"
description: "One-shot que anade UN contacto a la libreta CardDAV de Enmanuel (Xandikos) en una sola llamada. Compone build_vcard + contact_import_key + pass_get_secret + carddav_put_vcard. Idempotente por uid: re-anadir el mismo contacto sobrescribe, no duplica. La contrasena se resuelve desde pass y nunca se logea."
tags: [dav, carddav, vcard, contact, contacts, pipelines]
params:
- name: name
desc: "Nombre completo del contacto (FN del vCard). Obligatorio."
- name: tels
desc: "Telefono(s). Acepta lista, string suelto o None."
- name: emails
desc: "Email(s). Acepta lista, string suelto o None."
- name: adrs
desc: "Direccion(es). Acepta lista, string suelto o None."
- name: org
desc: "Organizacion (ORG). Vacio = se omite."
- name: note
desc: "Nota libre (NOTE). Vacio = se omite."
- name: uid
desc: "UID explicito del vCard. Vacio => se calcula con contact_import_key (telefono > email > nombre) para idempotencia."
- name: base_url
desc: "URL base del servidor DAV. Default = libreta CardDAV de Enmanuel."
- name: username
desc: "Usuario HTTP Basic. Default = enmanuel."
- name: collection_path
desc: "Ruta de la coleccion CardDAV destino."
- name: secret_path
desc: "Ruta del secreto en pass cuya primera linea es la contrasena CardDAV."
- name: timeout_s
desc: "Timeout del PUT en segundos. Default 20.0."
- name: verify_tls
desc: "Si True (default) verifica el certificado TLS. No desactivar fuera de pruebas."
output: "dict. Exito: {status:'ok', http_status:int, url:str, uid:str}. Error (sin lanzar): {status:'error', error:str, uid:str, http_status:int|None}. Si la pass no se encuentra, devuelve error sin tocar la red."
uses_functions: [build_vcard_py_core, contact_import_key_py_core, carddav_put_vcard_py_infra, pass_get_secret_py_infra]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/add_contact_dav.py"
---
## Ejemplo
```bash
# Anadir un contacto en una sola llamada (uid determinista por telefono):
./fn run add_contact_dav --name "Juan Perez" --tel +34600111222 --email juan@example.com --org "ACME"
# Multi-valor: --tel y --email son repetibles -> se serializan como listas.
./fn run add_contact_dav --name "Maria Lopez" \
--tel +34611000111 --tel +34922000222 \
--email maria@example.com --email m.lopez@work.com \
--note "Conocida del evento OSINT"
```
Salida (JSON): `{"status": "ok", "http_status": 201, "url": "https://dav-.../enmanuel/contacts/addressbook/v1-<hash>.vcf", "uid": "v1-<hash>"}`.
## Cuando usarla
Usala cuando quieras dar de alta o actualizar UN contacto en la libreta CardDAV
de Enmanuel sin montar el flujo a mano (serializar vCard, sacar la pass, PUT).
Si re-ejecutas con el mismo telefono/email (o el mismo `--uid`), el contacto se
sobrescribe en vez de duplicarse. Para importar muchos contactos de golpe, este
pipeline no es lo idoneo: llamalo en bucle o construye un pipeline batch.
## Gotchas
- **Escritura remota real**: hace un HTTP PUT contra el servidor DAV. No es un
dry-run. Cada llamada con `status:'ok'` ha creado/actualizado un recurso real.
- **Idempotencia por uid**: si no pasas `--uid`, el UID se deriva de forma
determinista (telefono > email > nombre). Mismo telefono/email = mismo recurso
= sobrescritura. Distinto telefono pero mismo nombre = recurso distinto.
- **Secreto desde pass, nunca hardcode**: la contrasena se lee de
`pass show dav/xandikos-enmanuel` (configurable con `secret_path`). Nunca se
logea ni aparece en el dict de retorno. Si `pass` falla o la entry no existe,
devuelve `{status:'error'}` sin tocar la red.
- **verify_tls**: por defecto verifica el certificado TLS. `--no-verify-tls`
solo para pruebas controladas; nunca contra el servidor real de produccion.
@@ -0,0 +1,179 @@
"""Pipeline: anade UN contacto a la libreta CardDAV de Enmanuel en una llamada.
Compone funciones del registry: genera un UID determinista cuando el caller no
da uno (contact_import_key) para que re-anadir el mismo contacto sobrescriba en
vez de duplicar, serializa el dict de contacto a VCARD 3.0 (build_vcard),
resuelve la contrasena CardDAV desde `pass` (pass_get_secret) y sube el VCARD
via HTTP PUT (carddav_put_vcard). Impuro (red + lectura de `pass`). Solo stdlib.
La contrasena resuelta NUNCA se logea ni se incluye en el dict de retorno.
"""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from core.build_vcard import build_vcard
from core.contact_import_key import contact_import_key
from infra.carddav_put_vcard import carddav_put_vcard
from infra.pass_get_secret import pass_get_secret
# Config destino embebida (libreta CardDAV de Enmanuel en Xandikos self-hosted).
DEFAULT_BASE_URL = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com"
DEFAULT_USERNAME = "enmanuel"
DEFAULT_COLLECTION = "/enmanuel/contacts/addressbook/"
def _as_list(value) -> list:
"""Normaliza None / string suelto / lista a lista de strings.
None -> []; string suelto -> [string]; lista|tupla -> lista. Cualquier otro
valor escalar se envuelve en una lista de un elemento.
"""
if value is None:
return []
if isinstance(value, str):
return [value]
if isinstance(value, (list, tuple)):
return list(value)
return [value]
def add_contact_dav(
name: str,
*,
tels=None,
emails=None,
adrs=None,
org: str = "",
note: str = "",
uid: str = "",
base_url: str = DEFAULT_BASE_URL,
username: str = DEFAULT_USERNAME,
collection_path: str = DEFAULT_COLLECTION,
secret_path: str = "dav/xandikos-enmanuel",
timeout_s: float = 20.0,
verify_tls: bool = True,
) -> dict:
"""Anade un contacto a la libreta CardDAV en una sola llamada (one-shot).
Args:
name: nombre completo del contacto (FN del vCard). Obligatorio.
tels: telefono(s). Acepta lista, string suelto o None.
emails: email(s). Acepta lista, string suelto o None.
adrs: direccion(es). Acepta lista, string suelto o None.
org: organizacion (ORG). Vacio = se omite.
note: nota libre (NOTE). Vacio = se omite.
uid: UID del vCard. Si se deja vacio se calcula con contact_import_key
(telefono > email > nombre), de modo que re-anadir el mismo contacto
sobrescribe el recurso en vez de duplicarlo (idempotencia).
base_url: URL base del servidor DAV. Default = libreta de Enmanuel.
username: usuario HTTP Basic. Default = enmanuel.
collection_path: ruta de la coleccion CardDAV destino.
secret_path: ruta del secreto en `pass` con la contrasena (primera linea).
timeout_s: timeout del PUT en segundos. Default 20.0.
verify_tls: si True (default) verifica el certificado TLS.
Returns:
dict. En exito reusa el dict de carddav_put_vcard mas el uid usado:
{status:'ok', http_status:int, url:str, uid:str}. En error (sin lanzar):
{status:'error', error:str, uid:str, http_status:int|None}. Si la
contrasena no se encuentra en `pass`, devuelve {status:'error',
error:..., uid:...} sin tocar la red.
"""
tels_list = _as_list(tels)
emails_list = _as_list(emails)
adrs_list = _as_list(adrs)
used_uid = uid.strip() if uid else ""
if not used_uid:
used_uid = contact_import_key(name, phones=tels_list, emails=emails_list)
contact = {"uid": used_uid, "fn": name}
if tels_list:
contact["tels"] = tels_list
if emails_list:
contact["emails"] = emails_list
if adrs_list:
contact["adrs"] = adrs_list
if org:
contact["org"] = org
if note:
contact["note"] = note
vcard_text = build_vcard(contact)
secret = pass_get_secret(secret_path)
if secret.get("status") != "ok":
return {
"status": "error",
"error": "pass: %s" % secret.get("error", "secret not found"),
"uid": used_uid,
"http_status": None,
}
password = secret["value"]
result = carddav_put_vcard(
base_url,
username,
password,
collection_path,
used_uid,
vcard_text,
timeout_s=timeout_s,
verify_tls=verify_tls,
)
# Reusar el dict de carddav_put_vcard + asegurar el uid usado.
result["uid"] = used_uid
return result
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(
description="Anade UN contacto a la libreta CardDAV de Enmanuel."
)
parser.add_argument("--name", required=True, help="Nombre completo (FN).")
parser.add_argument(
"--tel", action="append", default=[], help="Telefono (repetible)."
)
parser.add_argument(
"--email", action="append", default=[], help="Email (repetible)."
)
parser.add_argument(
"--adr", action="append", default=[], help="Direccion (repetible)."
)
parser.add_argument("--org", default="", help="Organizacion (ORG).")
parser.add_argument("--note", default="", help="Nota libre (NOTE).")
parser.add_argument("--uid", default="", help="UID explicito (opcional).")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--username", default=DEFAULT_USERNAME)
parser.add_argument("--collection-path", default=DEFAULT_COLLECTION)
parser.add_argument("--secret-path", default="dav/xandikos-enmanuel")
parser.add_argument("--timeout-s", type=float, default=20.0)
parser.add_argument(
"--no-verify-tls",
action="store_true",
help="Desactiva la verificacion TLS (solo pruebas).",
)
args = parser.parse_args()
out = add_contact_dav(
args.name,
tels=args.tel,
emails=args.email,
adrs=args.adr,
org=args.org,
note=args.note,
uid=args.uid,
base_url=args.base_url,
username=args.username,
collection_path=args.collection_path,
secret_path=args.secret_path,
timeout_s=args.timeout_s,
verify_tls=not args.no_verify_tls,
)
print(json.dumps(out, ensure_ascii=False))
+105
View File
@@ -0,0 +1,105 @@
---
name: add_event_dav
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def add_event_dav(summary: str, start: str, end: str = '', *, location: str = '', description: str = '', all_day: bool = False, rrule: str = '', alarm_minutes: int = 0, uid: str = '', base_url: str = DEFAULT_BASE_URL, username: str = DEFAULT_USERNAME, collection_path: str = DEFAULT_COLLECTION, secret_path: str = 'dav/xandikos-enmanuel', timeout_s: float = 20.0, verify_tls: bool = True) -> dict"
description: "One-shot que anade UN evento al calendario CalDAV de Enmanuel (Xandikos self-hosted) en una sola llamada. Compone build_vevent (componer el VCALENDAR), extract_or_make_uid (UID si falta), pass_get_secret (resolver la contrasena DAV desde pass) y caldav_put_event (HTTP PUT). Impuro: escritura remota real. Idempotente por UID. La contrasena nunca se logea ni aparece en el resultado. Defaults apuntan al calendario de Enmanuel."
tags: [dav, caldav, calendar, event, pipelines]
params:
- name: summary
desc: "titulo del evento (-> SUMMARY). Obligatorio."
- name: start
desc: "fecha/hora de inicio, p.ej. '2026-06-20T17:00' (naive local), con sufijo 'Z' para UTC, o '2026-06-20' para all_day. Obligatorio."
- name: end
desc: "fecha/hora de fin. Si vacio y no es all_day, se deriva +1h del start; si all_day, el dia siguiente."
- name: location
desc: "lugar del evento (-> LOCATION)."
- name: description
desc: "descripcion del evento (-> DESCRIPTION)."
- name: all_day
desc: "bool. Si True, evento de dia completo (DTSTART;VALUE=DATE)."
- name: rrule
desc: "regla de recurrencia RRULE, p.ej. 'FREQ=WEEKLY;BYDAY=MO'."
- name: alarm_minutes
desc: "int. Si > 0, anade un recordatorio (VALARM display) N minutos antes."
- name: uid
desc: "UID explicito del evento. Si vacio, se sintetiza determinista del VCALENDAR (re-subir el mismo evento sobrescribe = idempotente)."
- name: base_url
desc: "URL base del servidor DAV. Default = Xandikos de Enmanuel."
- name: username
desc: "usuario para HTTP Basic auth. Default 'enmanuel'."
- name: collection_path
desc: "ruta de la coleccion CalDAV destino. Default '/enmanuel/calendars/calendar/'."
- name: secret_path
desc: "ruta del secreto en pass con la contrasena DAV. Default 'dav/xandikos-enmanuel'."
- name: timeout_s
desc: "timeout del PUT en segundos. Default 20.0."
- name: verify_tls
desc: "si True (default) verifica el certificado TLS. No desactivar salvo entornos de prueba."
output: "dict. En exito: {status: 'ok', http_status: int, uid: str, url: str}. En error (sin lanzar): {status: 'error', error: str, uid: str|None, http_status: int|None}. La contrasena nunca aparece en el resultado."
uses_functions: [build_vevent_py_core, extract_or_make_uid_py_infra, pass_get_secret_py_infra, caldav_put_event_py_infra]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [os, sys, argparse, json]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/add_event_dav.py"
---
## Ejemplo
```bash
# Anadir un evento con hora, lugar y recordatorio (UID sintetico determinista):
./fn run add_event_dav --summary "Cita dentista" --start 2026-06-20T17:00 \
--end 2026-06-20T18:00 --location "Clinica" --alarm-minutes 30
# {"status": "ok", "http_status": 201, "uid": "evt-<md5>", "url": "https://dav-.../enmanuel/calendars/calendar/evt-<md5>.ics"}
# Evento de dia completo recurrente:
./fn run add_event_dav --summary "Cumpleanos" --start 2026-06-20 --all-day \
--rrule "FREQ=YEARLY"
```
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from pipelines.add_event_dav import add_event_dav
res = add_event_dav(
"Reunion equipo", "2026-06-22T09:00", "2026-06-22T10:00",
location="Sala A", description="Sprint review", alarm_minutes=15,
)
print(res["status"], res["uid"]) # 'ok' evt-...
```
## Cuando usarla
Cuando quieras anadir un evento al calendario de Enmanuel sin orquestar a mano
los pasos (componer el iCal, resolver el secreto, hacer el PUT). Es la operacion
one-shot del grupo `dav` para CalDAV. Para subir un `.ics` entero con N eventos
usa `import_ics_to_caldav_py_pipelines`; para un solo evento parametrizado, esta.
Pasa `uid` explicito si quieres controlar/actualizar un evento concreto; dejalo
vacio para crear uno nuevo con UID derivado del contenido.
## Gotchas
- **Accion con efecto real (impura)**: hace un HTTP PUT que escribe en el
calendario remoto de Enmanuel. No es un dry-run. Verifica `start`/`end` antes
de lanzar; un PUT con datos erroneos crea el evento igualmente.
- **Idempotente por UID**: el nombre del recurso es `<uid>.ics`. Re-subir el
mismo UID SOBRESCRIBE el evento existente (no duplica). Con `uid` vacio el UID
es determinista (md5 de summary+start): re-lanzar el mismo evento exacto pisa
el anterior; cambiar summary o start crea un recurso nuevo.
- **Secreto desde `pass`, nunca hardcode**: la contrasena se resuelve con
`pass_get_secret('dav/xandikos-enmanuel')` y NUNCA se logea ni se incluye en el
dict de retorno. Si `pass` no esta instalado o la entry no existe, devuelve
`{status:'error', error:'pass: ...'}` sin lanzar y sin hacer el PUT.
- **`verify_tls=True` por defecto**: no uses `--no-verify-tls` salvo en pruebas
controladas. El servidor de Enmanuel tiene certificado valido.
- **ValueError de build_vevent**: si falta `summary` o `start`, el pipeline lo
captura y devuelve `{status:'error'}` (no propaga la excepcion).
+168
View File
@@ -0,0 +1,168 @@
"""Pipeline: anade UN evento al calendario CalDAV de Enmanuel en una llamada.
Compone funciones del registry:
- build_vevent (core): compone el dict de evento -> texto VCALENDAR.
- extract_or_make_uid (infra): resuelve/sintetiza el UID si no se da.
- pass_get_secret (infra): resuelve la contrasena DAV desde `pass`.
- caldav_put_event (infra): hace el HTTP PUT a la coleccion CalDAV.
Impuro (red + lectura de secreto via subproceso). La contrasena NUNCA se logea
ni aparece en el resultado. Solo stdlib + funciones del registry.
"""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from core.build_vevent import build_vevent
from infra.extract_or_make_uid import extract_or_make_uid
from infra.pass_get_secret import pass_get_secret
from infra.caldav_put_event import caldav_put_event
DEFAULT_BASE_URL = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com"
DEFAULT_USERNAME = "enmanuel"
DEFAULT_COLLECTION = "/enmanuel/calendars/calendar/"
def add_event_dav(
summary: str,
start: str,
end: str = "",
*,
location: str = "",
description: str = "",
all_day: bool = False,
rrule: str = "",
alarm_minutes: int = 0,
uid: str = "",
base_url: str = DEFAULT_BASE_URL,
username: str = DEFAULT_USERNAME,
collection_path: str = DEFAULT_COLLECTION,
secret_path: str = "dav/xandikos-enmanuel",
timeout_s: float = 20.0,
verify_tls: bool = True,
) -> dict:
"""Anade un evento al calendario CalDAV de Enmanuel en un solo paso.
Args:
summary: titulo del evento (-> SUMMARY). Obligatorio.
start: fecha/hora de inicio (p.ej. '2026-06-20T17:00'). Obligatorio.
end: fecha/hora de fin. Si vacio y no es all_day, build_vevent deriva +1h.
location: lugar del evento (-> LOCATION).
description: descripcion (-> DESCRIPTION).
all_day: si True, evento de dia completo (DTSTART;VALUE=DATE).
rrule: regla de recurrencia (p.ej. 'FREQ=WEEKLY;BYDAY=MO').
alarm_minutes: si > 0, anade un recordatorio N minutos antes (VALARM).
uid: UID explicito del evento. Si vacio, se sintetiza determinista a
partir del VCALENDAR generado (idempotente: re-subir sobrescribe).
base_url: URL base del servidor DAV. Default = Xandikos de Enmanuel.
username: usuario para HTTP Basic auth. Default 'enmanuel'.
collection_path: ruta de la coleccion CalDAV destino.
secret_path: ruta del secreto en `pass` con la contrasena DAV.
timeout_s: timeout del PUT en segundos.
verify_tls: si True (default) verifica el certificado TLS.
Returns:
dict. En exito: {status: 'ok', http_status: int, uid: str, url: str}.
En error (sin lanzar): {status: 'error', error: str, uid: str|None,
http_status: int|None}. La contrasena NUNCA aparece en el resultado.
"""
event = {
"summary": summary,
"start": start,
"end": end or None,
"location": location or None,
"description": description or None,
"all_day": all_day,
"rrule": rrule or None,
"alarm_minutes": alarm_minutes or None,
"uid": uid or None,
}
try:
vcalendar = build_vevent(event)
except ValueError as e:
return {"status": "error", "error": str(e), "uid": None, "http_status": None}
# UID definitivo: el explicito si vino, o el (sintetico) del VCALENDAR.
final_uid = uid.strip() if uid else extract_or_make_uid(vcalendar, prefix="evt-")
secret = pass_get_secret(secret_path)
if secret.get("status") != "ok":
return {
"status": "error",
"error": "pass: %s" % secret.get("error", "unknown"),
"uid": final_uid,
"http_status": None,
}
password = secret["value"]
res = caldav_put_event(
base_url,
username,
password,
collection_path,
final_uid,
vcalendar,
timeout_s=timeout_s,
verify_tls=verify_tls,
)
# Reusa el dict de caldav_put_event y le anade el uid usado.
out = dict(res)
out["uid"] = final_uid
return out
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(
description="Anade un evento al calendario CalDAV de Enmanuel."
)
parser.add_argument("--summary", required=True, help="Titulo del evento.")
parser.add_argument(
"--start", required=True, help="Inicio, p.ej. 2026-06-20T17:00."
)
parser.add_argument("--end", default="", help="Fin, p.ej. 2026-06-20T18:00.")
parser.add_argument("--location", default="", help="Lugar.")
parser.add_argument("--description", default="", help="Descripcion.")
parser.add_argument(
"--all-day", action="store_true", help="Evento de dia completo."
)
parser.add_argument(
"--rrule", default="", help="Recurrencia, p.ej. FREQ=WEEKLY;BYDAY=MO."
)
parser.add_argument(
"--alarm-minutes", type=int, default=0, help="Recordatorio N min antes."
)
parser.add_argument("--uid", default="", help="UID explicito (opcional).")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--username", default=DEFAULT_USERNAME)
parser.add_argument("--collection-path", default=DEFAULT_COLLECTION)
parser.add_argument("--secret-path", default="dav/xandikos-enmanuel")
parser.add_argument("--timeout-s", type=float, default=20.0)
parser.add_argument(
"--no-verify-tls", action="store_true", help="Desactiva verificacion TLS."
)
args = parser.parse_args()
result = add_event_dav(
args.summary,
args.start,
args.end,
location=args.location,
description=args.description,
all_day=args.all_day,
rrule=args.rrule,
alarm_minutes=args.alarm_minutes,
uid=args.uid,
base_url=args.base_url,
username=args.username,
collection_path=args.collection_path,
secret_path=args.secret_path,
timeout_s=args.timeout_s,
verify_tls=not args.no_verify_tls,
)
print(json.dumps(result))
@@ -0,0 +1,108 @@
---
name: ingest_gsc_search_analytics
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def ingest_gsc_search_analytics(site_url: str = '', duckdb_path: str = '', pg_dsn: str = '', start_date: str = '', end_date: str = '', lookback_days: int = 5, credentials_path: str = '') -> dict"
description: "Pipeline de ingesta diaria de Google Search Console (Search Analytics): GSC -> DuckDB -> PostgreSQL. Autentica con una service account (gsc_auth), extrae las filas de Search Analytics por las dimensiones date/query/page (pull_gsc_search_analytics), crea la tabla DuckDB si no existe con una restriccion UNIQUE (duckdb_execute), transforma cada fila renombrando 'date'->'data_date' y rellenando defaults estables (country='', device='', search_type='web') para las dimensiones no pedidas, hace upsert idempotente en DuckDB (duckdb_upsert) y espeja la tabla completa a PostgreSQL en modo replace para que Metabase la lea (duckdb_to_postgres). DuckDB es la verdad acumulada (historico append idempotente); PostgreSQL es un espejo regenerado por completo cada corrida. Resuelve defaults de site_url/pg_dsn/duckdb_path desde env (GSC_SITE_URL, SEO_DSN, SEO_DUCKDB con fallback ~/.fn_seo/seo.duckdb). Resuelve fechas teniendo en cuenta el lag de ~3 dias de la API: end=hoy-3, start=hoy-(3+lookback_days), re-pulleando los ultimos dias para que el upsert corrija lo que GSC ajusta a posteriori. Devuelve un dict sin lanzar: {status:'ok', site_url, start_date, end_date, rows_pulled, duckdb, postgres} en exito, {status:'error', error} en fallo."
tags: [seo, gsc, search-console, pipelines, duckdb]
uses_functions:
- gsc_auth_py_infra
- pull_gsc_search_analytics_py_datascience
- duckdb_execute_py_infra
- duckdb_upsert_py_infra
- duckdb_to_postgres_py_pipelines
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [os, datetime]
params:
- name: site_url
desc: "propiedad de Search Console: 'sc-domain:ejemplo.com' (propiedad de dominio) o la URL de prefijo 'https://ejemplo.com/'. Si esta vacio se lee de la env var GSC_SITE_URL. Obligatorio: ValueError si falta."
- name: duckdb_path
desc: "ruta al archivo DuckDB de la fuente de verdad acumulada. Si esta vacio se lee de la env var SEO_DUCKDB y, en su defecto, ~/.fn_seo/seo.duckdb. El directorio padre se crea (os.makedirs exist_ok=True)."
- name: pg_dsn
desc: "cadena de conexion PostgreSQL del espejo BI, p.ej. 'postgresql://user:pass@host:5432/db'. Si esta vacio se lee de la env var SEO_DSN. Obligatorio: ValueError si falta."
- name: start_date
desc: "fecha inicial inclusiva 'YYYY-MM-DD'. Si esta vacia se calcula como hoy-(3+lookback_days)."
- name: end_date
desc: "fecha final inclusiva 'YYYY-MM-DD'. Si esta vacia se calcula como hoy-3 (lag de la API de GSC)."
- name: lookback_days
desc: "numero de dias extra hacia atras que se re-pullean para que el upsert idempotente corrija los datos que GSC ajusta a posteriori (hasta ~3 dias). Default 5."
- name: credentials_path
desc: "ruta al JSON de la service account. Se pasa tal cual a gsc_auth, que ya hace su propio fallback a la env var GSC_SA_JSON."
output: "dict. En exito: {status:'ok', site_url:str, start_date:str, end_date:str, rows_pulled:int, duckdb:dict (resultado de duckdb_upsert), postgres:dict (resultado de duckdb_to_postgres)}. En error (sin lanzar): {status:'error', error:str}."
tested: true
tests:
- "test_renombra_date_a_data_date_y_persiste_en_duckdb"
- "test_resolucion_fechas_por_defecto"
- "test_upsert_idempotente_no_duplica"
- "test_falta_site_url_da_value_error"
- "test_falta_pg_dsn_da_value_error"
test_file_path: "python/functions/pipelines/ingest_gsc_search_analytics_test.py"
file_path: "python/functions/pipelines/ingest_gsc_search_analytics.py"
---
## Ejemplo
```bash
# Con las 3 env seteadas, una sola corrida hace el snapshot diario completo:
export GSC_SITE_URL="sc-domain:ejemplo.com"
export SEO_DSN="postgresql://seo:****@127.0.0.1:5432/seo"
export GSC_SA_JSON="$HOME/.fn_seo/service_account.json"
# (SEO_DUCKDB opcional; por defecto ~/.fn_seo/seo.duckdb)
./fn run ingest_gsc_search_analytics
# -> {"status": "ok", "site_url": "sc-domain:ejemplo.com",
# "start_date": "2026-06-09", "end_date": "2026-06-17",
# "rows_pulled": 1280, "duckdb": {...}, "postgres": {...}}
```
```python
import sys
sys.path.insert(0, "python/functions")
from pipelines.ingest_gsc_search_analytics import ingest_gsc_search_analytics
# Variante explicita: rango de fechas fijo y rutas pasadas como args.
res = ingest_gsc_search_analytics(
site_url="sc-domain:ejemplo.com",
duckdb_path="/home/me/.fn_seo/seo.duckdb",
pg_dsn="postgresql://seo:****@127.0.0.1:5432/seo",
start_date="2026-06-01",
end_date="2026-06-17",
credentials_path="/home/me/.fn_seo/service_account.json",
)
print(res["rows_pulled"], res["status"]) # 4210 ok
```
## Cuando usarla
Cuando quieras un snapshot diario de Google Search Console acumulado y consultable
desde Metabase: cada corrida añade/actualiza los datos del rango en DuckDB y
regenera el espejo PostgreSQL. La invoca el DAG `seo-gsc-daily` de dag_engine una
vez al dia (no uses cron ni systemd timers: usa dag_engine). Para un re-pull manual
puntual de un rango concreto, pásale `start_date`/`end_date` a mano.
## Gotchas
- **Lag de ~3 dias**: la API de GSC no consolida datos hasta ~3 dias despues. Por
eso `end_date` por defecto es hoy-3 y `start_date` retrocede `lookback_days` extra.
Pedir hasta hoy devolveria filas vacias o incompletas.
- **Re-pull idempotente**: se re-piden a proposito los ultimos `lookback_days` dias.
La restriccion `UNIQUE (site_url, data_date, query, page, country, device,
search_type)` + `duckdb_upsert` actualizan esas filas sin duplicarlas, recogiendo
las correcciones que GSC aplica a posteriori. El `snapshot_date` se sobrescribe al
valor de la ultima corrida.
- **DuckDB es la verdad; PostgreSQL es un espejo**: la ingesta acumula histórico solo
en DuckDB. El espejo a Postgres usa `mode='replace'` -> hace DROP + CREATE + INSERT
de la tabla completa cada vez. NO escribas en la tabla Postgres ni esperes acumular
alli: se borra y reescribe en cada corrida. Si quieres histórico, leelo de DuckDB.
- **Dimensiones**: este pull pide solo `date`/`query`/`page`. `country` y `device`
quedan vacios y `search_type='web'` como defaults estables para que la tupla UNIQUE
sea consistente. Si necesitas desglose por pais/dispositivo, es otro pull/tabla.
- **Requisitos de entorno**: necesita las 3 env (`GSC_SITE_URL`, `SEO_DSN`,
`GSC_SA_JSON`) o sus args equivalentes, y la service account debe estar añadida como
usuario con permiso sobre la propiedad en Search Console. Faltar `site_url` o
`pg_dsn` devuelve `{status:'error'}` (ValueError capturado, no crash).
@@ -0,0 +1,196 @@
"""Pipeline de ingesta diaria de Google Search Console (Search Analytics).
Orquesta el snapshot diario de Search Console de una propiedad: autentica con
una service account, extrae las filas de Search Analytics, las acumula de forma
idempotente en una tabla DuckDB (la fuente de verdad histórica) y espeja la
tabla completa a PostgreSQL para que herramientas BI como Metabase la lean.
Es un pipeline (kind: pipeline -> siempre impuro): compone funciones del
registry sin reescribir su lógica. Devuelve un dict sin lanzar excepciones,
siguiendo el estilo del grupo duckdb/etl del registry:
{status:'ok', ...} en éxito y {status:'error', error:str} en fallo.
"""
import os
from datetime import date, timedelta
from infra import gsc_auth, duckdb_execute
from infra.duckdb_upsert import duckdb_upsert
from datascience import pull_gsc_search_analytics
from pipelines.duckdb_to_postgres import duckdb_to_postgres
# DDL de la tabla acumulada. La restricción UNIQUE es exactamente la clave que
# duckdb_upsert necesita para que el re-pull de los últimos días actualice en
# lugar de duplicar (GSC corrige datos hasta ~3 días atrás).
_TABLE_DDL = """
CREATE TABLE IF NOT EXISTS gsc_search_analytics (
snapshot_date DATE, data_date DATE, site_url TEXT, query TEXT, page TEXT,
country TEXT, device TEXT, search_type TEXT,
clicks INTEGER, impressions INTEGER, ctr DOUBLE, position DOUBLE,
UNIQUE (site_url, data_date, query, page, country, device, search_type)
);
"""
# Columnas que forman la clave única; también las que usa el upsert.
_KEY_COLS = ["site_url", "data_date", "query", "page", "country", "device", "search_type"]
# Lag de la API de GSC: los datos no están consolidados hasta ~3 días después.
_GSC_LAG_DAYS = 3
def ingest_gsc_search_analytics(
site_url: str = "",
duckdb_path: str = "",
pg_dsn: str = "",
start_date: str = "",
end_date: str = "",
lookback_days: int = 5,
credentials_path: str = "",
) -> dict:
"""Ingesta diaria de Google Search Console: GSC -> DuckDB -> PostgreSQL.
Pasos en orden: (1) resuelve defaults desde env; (2) resuelve fechas teniendo
en cuenta el lag de ~3 días de la API; (3) crea la tabla DuckDB si no existe;
(4) autentica con la service account; (5) extrae Search Analytics por las
dimensiones date/query/page; (6) transforma cada fila a la forma de la tabla
(renombrando ``date`` -> ``data_date`` y rellenando defaults estables para
las dimensiones no pedidas); (7) hace upsert idempotente en DuckDB; (8) espeja
la tabla completa a PostgreSQL en modo ``replace``.
DuckDB es la verdad acumulada (histórico append idempotente). PostgreSQL es
un espejo regenerado por completo en cada corrida (mode='replace') para que
Metabase tenga siempre el snapshot íntegro sin acumular duplicados.
Args:
site_url: propiedad de Search Console (``sc-domain:ejemplo.com`` o la
URL de prefijo ``https://ejemplo.com/``). Si está vacío, se lee de la
env var ``GSC_SITE_URL``. Obligatorio (ValueError si falta).
duckdb_path: ruta al archivo DuckDB de la fuente de verdad. Si está vacío,
se lee de la env var ``SEO_DUCKDB`` y, en su defecto, se usa
``~/.fn_seo/seo.duckdb``. El directorio padre se crea si no existe.
pg_dsn: cadena de conexión PostgreSQL del espejo BI. Si está vacío, se lee
de la env var ``SEO_DSN``. Obligatorio (ValueError si falta).
start_date: fecha inicial inclusiva ``YYYY-MM-DD``. Si está vacía, se
calcula como hoy - (3 + lookback_days).
end_date: fecha final inclusiva ``YYYY-MM-DD``. Si está vacía, se calcula
como hoy - 3 (lag de la API).
lookback_days: nº de días extra hacia atrás que se re-pullean para que el
upsert corrija los datos que GSC ajusta a posteriori. Default 5.
credentials_path: ruta al JSON de la service account. Se pasa tal cual a
``gsc_auth``, que ya hace su propio fallback a la env var
``GSC_SA_JSON``.
Returns:
dict. En éxito: ``{"status": "ok", "site_url", "start_date", "end_date",
"rows_pulled", "duckdb", "postgres"}`` donde ``duckdb`` es el resultado
del upsert y ``postgres`` el del espejo. En error (sin lanzar):
``{"status": "error", "error": str}``.
"""
try:
# (1) Defaults desde env.
site_url = site_url or os.environ.get("GSC_SITE_URL", "")
pg_dsn = pg_dsn or os.environ.get("SEO_DSN", "")
duckdb_path = (
duckdb_path
or os.environ.get("SEO_DUCKDB", "")
or os.path.expanduser("~/.fn_seo/seo.duckdb")
)
if not site_url:
raise ValueError(
"ingest_gsc_search_analytics: falta site_url. Pásalo o define la "
"env var GSC_SITE_URL con la propiedad de Search Console."
)
if not pg_dsn:
raise ValueError(
"ingest_gsc_search_analytics: falta pg_dsn. Pásalo o define la "
"env var SEO_DSN con la cadena de conexión PostgreSQL del espejo."
)
# (2) Fechas: la API de GSC tiene ~3 días de lag.
today = date.today()
if not end_date:
end_date = (today - timedelta(days=_GSC_LAG_DAYS)).isoformat()
if not start_date:
start_date = (
today - timedelta(days=_GSC_LAG_DAYS + int(lookback_days))
).isoformat()
# (3) Crear la tabla DuckDB si no existe (y su directorio padre).
parent = os.path.dirname(duckdb_path)
if parent:
os.makedirs(parent, exist_ok=True)
ddl_res = duckdb_execute(duckdb_path, _TABLE_DDL)
if ddl_res.get("status") != "ok":
return {"status": "error", "error": f"create table: {ddl_res.get('error')}"}
# (4) Autenticar.
service = gsc_auth(credentials_path)
# (5) Extraer. Con dimensions=["date","query","page"] cada fila trae las
# claves "date", "query", "page" más las métricas.
raw = pull_gsc_search_analytics(
service,
site_url,
start_date,
end_date,
dimensions=["date", "query", "page"],
)
# (6) Transformar a la forma de la tabla. La columna se llama data_date,
# no date -> renombrar. country/device se dejan vacíos y search_type="web"
# como defaults estables para que la tupla UNIQUE sea consistente.
snapshot_date = today.isoformat()
rows = [
{
"snapshot_date": snapshot_date,
"data_date": row["date"],
"site_url": site_url,
"query": row.get("query", ""),
"page": row.get("page", ""),
"country": "",
"device": "",
"search_type": "web",
"clicks": row.get("clicks"),
"impressions": row.get("impressions"),
"ctr": row.get("ctr"),
"position": row.get("position"),
}
for row in raw
]
# (7) Upsert idempotente en DuckDB (la verdad acumulada).
duckdb_res = duckdb_upsert(
duckdb_path,
"gsc_search_analytics",
rows,
key_cols=_KEY_COLS,
)
# (8) Espejo completo a PostgreSQL (regenerado cada vez).
pg_res = duckdb_to_postgres(
duckdb_path,
"gsc_search_analytics",
pg_dsn,
pg_table="gsc_search_analytics",
mode="replace",
)
return {
"status": "ok",
"site_url": site_url,
"start_date": start_date,
"end_date": end_date,
"rows_pulled": len(raw),
"duckdb": duckdb_res,
"postgres": pg_res,
}
except Exception as e: # noqa: BLE001
# Pipeline impuro de borde: nunca propagamos el crash, lo reportamos.
return {"status": "error", "error": str(e)}
if __name__ == "__main__":
import json
print(json.dumps(ingest_gsc_search_analytics(), indent=2, default=str))
@@ -0,0 +1,140 @@
"""Tests para ingest_gsc_search_analytics.
Sin red ni credenciales: se mockean `gsc_auth`, `pull_gsc_search_analytics` y
`duckdb_to_postgres` sobre los símbolos ya importados en el módulo del pipeline.
DuckDB es embebido, así que `duckdb_execute` + `duckdb_upsert` se ejercitan de
verdad sobre un archivo temporal (tmp_path) y se verifican leyendo con
`duckdb_query_readonly`.
"""
import os
from datetime import date, timedelta
from unittest.mock import patch
import pytest
from pipelines import ingest_gsc_search_analytics as mod
from pipelines.ingest_gsc_search_analytics import ingest_gsc_search_analytics
from infra.duckdb_query_readonly import duckdb_query_readonly
# Filas fake tal como las devuelve pull_gsc_search_analytics con
# dimensions=["date","query","page"]: clave "date" (a renombrar), no "data_date".
_FAKE_RAW = [
{
"date": "2026-06-10",
"query": "zapatillas running",
"page": "https://ejemplo.com/running",
"clicks": 12,
"impressions": 340,
"ctr": 0.0353,
"position": 4.2,
},
{
"date": "2026-06-11",
"query": "ofertas verano",
"page": "https://ejemplo.com/ofertas",
"clicks": 5,
"impressions": 120,
"ctr": 0.0417,
"position": 7.1,
},
]
def _run(db_path, **kwargs):
"""Invoca el pipeline con todos los externos mockeados."""
with patch.object(mod, "gsc_auth", return_value=object()) as m_auth, patch.object(
mod, "pull_gsc_search_analytics", return_value=list(_FAKE_RAW)
) as m_pull, patch.object(
mod, "duckdb_to_postgres", return_value={"status": "ok", "rows_synced": 2}
) as m_pg:
res = ingest_gsc_search_analytics(
site_url="sc-domain:ejemplo.com",
duckdb_path=db_path,
pg_dsn="postgresql://u:p@localhost:5432/seo",
**kwargs,
)
return res, m_auth, m_pull, m_pg
def test_renombra_date_a_data_date_y_persiste_en_duckdb(tmp_path):
db = os.path.join(str(tmp_path), "seo.duckdb")
res, _, _, m_pg = _run(db)
assert res["status"] == "ok"
assert res["rows_pulled"] == 2
# El espejo a Postgres se invocó en modo replace.
m_pg.assert_called_once()
assert m_pg.call_args.kwargs.get("mode") == "replace"
# (1)+(2) El renombrado date->data_date y la persistencia: leemos la tabla.
q = duckdb_query_readonly(
db,
"SELECT data_date, query, page, clicks, search_type, country "
"FROM gsc_search_analytics ORDER BY data_date",
)
assert q["status"] == "ok"
assert q["row_count"] == 2
first = q["rows"][0]
# data_date existe y vale la fecha de la fila raw "date" renombrada.
assert str(first["data_date"]) == "2026-06-10"
assert first["query"] == "zapatillas running"
assert first["clicks"] == 12
# Defaults estables para dims no pedidas.
assert first["search_type"] == "web"
assert first["country"] == ""
def test_resolucion_fechas_por_defecto(tmp_path):
db = os.path.join(str(tmp_path), "seo.duckdb")
res, _, m_pull, _ = _run(db, lookback_days=5)
today = date.today()
expected_end = (today - timedelta(days=3)).isoformat()
expected_start = (today - timedelta(days=3 + 5)).isoformat()
assert res["end_date"] == expected_end
assert res["start_date"] == expected_start
# Y se pasaron a pull en ese orden (service, site_url, start, end).
args = m_pull.call_args.args
assert args[2] == expected_start
assert args[3] == expected_end
def test_upsert_idempotente_no_duplica(tmp_path):
db = os.path.join(str(tmp_path), "seo.duckdb")
# Dos corridas con las mismas filas fake: la clave UNIQUE evita duplicados.
_run(db)
_run(db)
q = duckdb_query_readonly(db, "SELECT COUNT(*) AS n FROM gsc_search_analytics")
assert q["status"] == "ok"
assert q["rows"][0]["n"] == 2
def test_falta_site_url_da_value_error(tmp_path, monkeypatch):
monkeypatch.delenv("GSC_SITE_URL", raising=False)
db = os.path.join(str(tmp_path), "seo.duckdb")
res = ingest_gsc_search_analytics(
site_url="",
duckdb_path=db,
pg_dsn="postgresql://u:p@localhost:5432/seo",
)
assert res["status"] == "error"
assert "site_url" in res["error"]
def test_falta_pg_dsn_da_value_error(tmp_path, monkeypatch):
monkeypatch.delenv("SEO_DSN", raising=False)
db = os.path.join(str(tmp_path), "seo.duckdb")
res = ingest_gsc_search_analytics(
site_url="sc-domain:ejemplo.com",
duckdb_path=db,
pg_dsn="",
)
assert res["status"] == "error"
assert "pg_dsn" in res["error"]
if __name__ == "__main__":
raise SystemExit(pytest.main([__file__, "-v"]))
@@ -34,12 +34,16 @@ from datascience import ( # noqa: E402
scrape_competitor_prices,
)
from infra import pg_insert_rows # noqa: E402
from browser.scrape_aliexpress_cdp import scrape_aliexpress_cdp # noqa: E402
from browser.scrape_amazon_movers_cdp import scrape_amazon_movers_cdp # noqa: E402
from browser.scrape_amazon_search_saturation_cdp import scrape_amazon_search_saturation_cdp # noqa: E402
PROJECT_DIR = os.path.join(ROOT, "projects", "captacion_clientes")
DEFAULT_CONFIG = os.path.join(PROJECT_DIR, "config", "sources.json")
DEFAULT_ENV = os.path.join(PROJECT_DIR, ".env")
SOURCES = ("amazon", "google_trends", "tiktok", "aliexpress", "competitor")
SOURCES = ("amazon", "google_trends", "tiktok", "aliexpress",
"aliexpress_cdp", "amazon_movers_cdp", "amazon_saturation_cdp", "competitor")
def resolve_dsn(cli_dsn: str | None) -> str:
@@ -101,21 +105,39 @@ def _dispatch(source: str, config: dict, dsn: str) -> dict:
list_type=list_type,
max_items=cfg.get("max_items", 50),
)
niche_map = cfg.get("niche_map", {})
for r in batch:
if not r.get("category"):
r["category"] = category or "general"
if category and category in niche_map:
r["niche"] = niche_map[category]
rows += batch
inserted = pg_insert_rows(dsn, "amazon_bestsellers", rows)
return {"source": source, "scraped": len(rows), "inserted": inserted}
if source == "google_trends":
cfg = config.get("google_trends", {})
rows = scrape_google_trends(
keywords=cfg.get("keywords", []),
geo=cfg.get("geo", "ES"),
timeframe=cfg.get("timeframe", "now 7-d"),
include_related=cfg.get("include_related", True),
)
niches = cfg.get("niches")
if niches:
# Modo por nicho: un grupo de keywords por nicho, etiquetando cada fila.
rows = []
for niche, kws in niches.items():
batch = scrape_google_trends(
keywords=kws,
geo=cfg.get("geo", "ES"),
timeframe=cfg.get("timeframe", "now 7-d"),
include_related=cfg.get("include_related", True),
)
for r in batch:
r["niche"] = niche
rows += batch
else:
rows = scrape_google_trends(
keywords=cfg.get("keywords", []),
geo=cfg.get("geo", "ES"),
timeframe=cfg.get("timeframe", "now 7-d"),
include_related=cfg.get("include_related", True),
)
inserted = pg_insert_rows(dsn, "google_trends", rows)
return {"source": source, "scraped": len(rows), "inserted": inserted}
@@ -145,6 +167,76 @@ def _dispatch(source: str, config: dict, dsn: str) -> dict:
inserted = pg_insert_rows(dsn, "aliexpress_trends", rows)
return {"source": source, "scraped": len(rows), "inserted": inserted}
if source == "aliexpress_cdp":
# Coste en China + nº de pedidos por nicho/producto, vía navegador (CDP 9222).
cfg = config.get("aliexpress_cdp", {})
rows = []
for niche, queries in cfg.get("niches", {}).items():
for query in queries:
res = scrape_aliexpress_cdp(
query,
sort=cfg.get("sort", "total_tranpro_desc"),
limit=cfg.get("limit", 20),
port=cfg.get("port", 9222),
)
if res.get("status") != "ok":
continue
for p in res.get("products", []):
if not p.get("price"):
continue
rows.append({
"category": niche, "niche": niche, "query": query,
"product_id": p.get("item_id"), "title": p.get("title"),
"price": p.get("price"), "price_orig": p.get("price_orig"),
"orders": p.get("orders_num"), "orders_raw": p.get("orders"),
"rating": p.get("rating"), "url": p.get("url"), "currency": "EUR",
})
inserted = pg_insert_rows(dsn, "aliexpress_trends", rows) if rows else 0
return {"source": source, "scraped": len(rows), "inserted": inserted}
if source == "amazon_movers_cdp":
# Productos que más suben en ranking de ventas (señal emergente), vía CDP.
cfg = config.get("amazon_movers_cdp", {})
rows = []
for niche, cats in cfg.get("niches", {}).items():
res = scrape_amazon_movers_cdp(
marketplace=cfg.get("marketplace", "amazon.es"),
categories=cats,
port=cfg.get("port", 9222),
max_items=cfg.get("max_items", 30),
)
if res.get("status") != "ok":
continue
for p in res.get("products", []):
p["niche"] = niche
p["list_type"] = "movers_shakers"
rows.append(p)
inserted = pg_insert_rows(dsn, "amazon_bestsellers", rows) if rows else 0
return {"source": source, "scraped": len(rows), "inserted": inserted}
if source == "amazon_saturation_cdp":
# Oferta de mercado por producto: nº de resultados en Amazon.es (saturación), vía CDP.
cfg = config.get("amazon_saturation_cdp", {})
rows = []
for niche, queries in cfg.get("niches", {}).items():
for query in queries:
res = scrape_amazon_search_saturation_cdp(
query,
marketplace=cfg.get("marketplace", "amazon.es"),
port=cfg.get("port", 9222),
)
if res.get("status") != "ok":
continue
rows.append({
"niche": niche, "query": query,
"marketplace": res.get("marketplace", "amazon.es"),
"total_results": res.get("total_results"),
"sponsored_top": res.get("sponsored_top"),
"n_cards": res.get("n_cards"),
})
inserted = pg_insert_rows(dsn, "amazon_saturation", rows) if rows else 0
return {"source": source, "scraped": len(rows), "inserted": inserted}
if source == "competitor":
targets = _read_competitor_targets(dsn)
if not targets:
@@ -0,0 +1,118 @@
---
name: monitor_freelance_projects
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def monitor_freelance_projects(category: str = 'it-programming', language: str = 'es', query: str = '', pages: int = 1, include_upwork: bool = False, upwork_query: str = 'custom software', duckdb_path: str = '', xlsx_path: str = '', port: int = 9222, timeout_s: float = 25.0) -> dict"
description: "Monitor de captacion de clientes freelance: scrapea proyectos nuevos de Workana (+ Upwork opcional) via CDP, los persiste en DuckDB con dedup por url, marca los de software a medida y exporta a Excel (hojas Nuevos y Todos)."
tags: [market-intel, recon, launcher, pipelines, freelance, workana, upwork, duckdb, excel]
uses_functions:
- scrape_workana_projects_py_browser
- scrape_upwork_projects_py_browser
- duckdb_execute_py_infra
- duckdb_upsert_py_infra
- duckdb_query_readonly_py_infra
- write_xlsx_sheets_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/monitor_freelance_projects.py"
params:
- name: category
desc: "Categoria de Workana (segmento ?category= de la URL de listado). Default 'it-programming'."
- name: language
desc: "Idioma de los proyectos de Workana (?language=). Default 'es'."
- name: query
desc: "Query libre aplicada a ambas fuentes. En Workana va como extra_query; en Upwork sobrescribe upwork_query si no esta vacia."
- name: pages
desc: "Numero de paginas de listado a recorrer por fuente. Default 1."
- name: include_upwork
desc: "Si True, scrapea Upwork ademas de Workana. Default False (selectores Upwork sin validar en vivo + requiere login); si Upwork falla, el pipeline sigue solo con Workana."
- name: upwork_query
desc: "Query para Upwork cuando include_upwork. Default 'custom software'. El param 'query' lo sobrescribe si se pasa."
- name: duckdb_path
desc: "Ruta del archivo DuckDB de persistencia. Si vacia, usa ~/.fn_freelance/freelance.duckdb (crea el directorio)."
- name: xlsx_path
desc: "Ruta del .xlsx de salida. Si vacia, usa ~/.fn_freelance/freelance_projects.xlsx (crea el directorio). Se sobrescribe en cada corrida."
- name: port
desc: "Puerto de remote debugging del Chrome que usan los scrapers (CDP). Default 9222 (chromium-personal logueado). Usa 9333 para el Chrome aislado del browser_mcp."
- name: timeout_s
desc: "Timeout en segundos por pagina para los scrapers (navegacion + espera de cards). Default 25.0."
output: "dict. En exito: {status:'ok', new_count:int (proyectos nuevos de esta corrida), total_in_db:int, new_projects:[...], xlsx_path:'<abs>', duckdb_path:'<abs>', sources:{workana:{count,status}, upwork:{count,status}|'skipped'}}. En error (sin lanzar): {status:'error', error:str, sources:{...}}."
---
## Ejemplo
```bash
# Requiere un Chrome con remote debugging vivo en el puerto indicado.
# Produccion (chromium-personal logueado, port 9222) con los paths por defecto:
fn run monitor_freelance_projects
# Probar contra el Chrome aislado del browser_mcp (port 9333) con paths efimeros:
fn run monitor_freelance_projects --port 9333 \
--duckdb-path /tmp/freelance.duckdb --xlsx-path /tmp/freelance.xlsx
```
```python
import os, sys
sys.path.insert(0, os.path.join("python", "functions"))
from pipelines.monitor_freelance_projects import monitor_freelance_projects
out = monitor_freelance_projects(
category="it-programming",
language="es",
pages=1,
port=9222, # chromium-personal logueado
)
print(out["new_count"], "proyectos nuevos;", out["total_in_db"], "en la DB")
print("Excel:", out["xlsx_path"])
```
## Cuando usarla
Monitor de captacion de clientes: detecta proyectos freelance NUEVOS de Workana
(programacion / software a medida) y los deja en DuckDB + Excel para revisar de un
vistazo. Resalta los que pintan a "software a medida" (`is_custom_software`) sin
filtrar el resto. Idempotente por `url`: re-correrlo no duplica ni pisa el
`first_seen_at`. Agendable con dag_engine (step `function:`) para una foto diaria de
oportunidades nuevas.
## Gotchas
- **Requiere un Chrome con CDP vivo en `port`**: los scrapers (Workana/Upwork son
SPAs) renderizan via Chrome DevTools Protocol. Sin remote debugging escuchando en
ese puerto el pipeline devuelve `status:'error'` con el detalle. Produccion = 9222
(chromium-personal logueado); Chrome aislado = 9333 (browser_mcp).
- **Upwork OFF por defecto**: sus selectores no estan validados en vivo (sin sesion
Upwork). Con `include_upwork=True`, si Upwork devuelve `status:'error'` el pipeline
loguea un WARN a stderr y sigue solo con Workana — nunca aborta por Upwork.
- **El Excel se sobrescribe** por completo en cada corrida (`write_xlsx_sheets`). La
fuente de verdad acumulativa es la DuckDB, no el .xlsx.
- **`first_seen_at` lo posee la DB**: el upsert usa ownership selectivo (no esta en
`update_cols`), asi que una re-corrida conserva la primera vez que se vio cada
proyecto. `new_count` cuenta solo urls que no existian antes de esta corrida.
- **Rate-limit / anti-bot**: scrapear muchas paginas seguidas puede disparar
defensas de las plataformas. Mantener `pages` bajo y espaciar las corridas.
- **Skills se guardan como `skills_json`** (TEXT con JSON) porque DuckDB no usa una
columna lista aqui; en el Excel se re-expanden a una cadena separada por comas.
## Notas
Pipeline impuro: compone seis funciones del registry sin reescribir su logica
(2 scrapers CDP del dominio browser + 3 primitivas del grupo `duckdb` + el exporter
`write_xlsx_sheets`). El flag `is_custom_software` se calcula con la constante
`CUSTOM_SW_KEYWORDS` (keywords fuertes de desarrollo a medida) sobre title + snippet
+ skills, normalizados a minusculas y sin acentos.
Validado end-to-end contra Workana real (CDP 9333) el 17/06/2026:
- Golden: `new_count=9`, `total_in_db=9`, 4 proyectos `is_custom_software=True`,
.xlsx con hojas "Nuevos" (9 filas + cabecera) y "Todos", DuckDB con 9 filas.
- Edge dedup: 2a corrida identica -> `new_count=0`, `total_in_db` sigue en 9 (no
duplica) y `first_seen_at` preservado (ownership del upsert por `url`).
@@ -0,0 +1,478 @@
"""monitor_freelance_projects — monitor de captacion de clientes freelance.
Pipeline one-shot que detecta proyectos freelance NUEVOS, los persiste con dedup en
DuckDB y los exporta a Excel para revisar. Es la pieza de orquestacion de un monitor
de captacion de clientes: convierte el patron "scrapear -> normalizar -> persistir
con dedup -> exportar" en una sola invocacion, agendable con dag_engine.
NO reescribe ninguna logica de scraping, persistencia ni exportacion: compone SEIS
funciones del registry que ya existen, importandolas tal cual.
Funciones del registry compuestas (importadas, no reimplementadas):
scrape_workana_projects (browser) — scrapea Workana via CDP.
scrape_upwork_projects (browser) — scrapea Upwork via CDP (opcional, tolerante).
duckdb_execute (infra) — DDL: CREATE TABLE IF NOT EXISTS.
duckdb_query_readonly (infra) — lee urls existentes + tabla completa para el Excel.
duckdb_upsert (infra) — UPSERT idempotente por url (dedup + ownership de first_seen_at).
write_xlsx_sheets (infra) — escribe el .xlsx con hojas "Nuevos" y "Todos".
Devuelve SIEMPRE un dict (estilo de los grupos recon/market-intel): nunca lanza.
NUNCA inventa datos: si Workana falla, propaga el error con contexto.
"""
import json
import os
import sys
import unicodedata
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
sys.path.insert(0, os.path.join(ROOT, "python", "functions"))
from browser.scrape_workana_projects import scrape_workana_projects # noqa: E402
from browser.scrape_upwork_projects import scrape_upwork_projects # noqa: E402
from infra.duckdb_execute import duckdb_execute # noqa: E402
from infra.duckdb_query_readonly import duckdb_query_readonly # noqa: E402
from infra.duckdb_upsert import duckdb_upsert # noqa: E402
from infra.write_xlsx_sheets import write_xlsx_sheets # noqa: E402
# Directorio por defecto para la DuckDB y el Excel del monitor. Se deriva con
# expanduser para no hardcodear ningun home concreto.
_DEFAULT_DIR = os.path.expanduser(os.path.join("~", ".fn_freelance"))
_DEFAULT_DB = os.path.join(_DEFAULT_DIR, "freelance.duckdb")
_DEFAULT_XLSX = os.path.join(_DEFAULT_DIR, "freelance_projects.xlsx")
_TABLE = "freelance_projects"
# Columnas de la tabla, en el orden del DDL. El upsert usa este orden estable.
_COLUMNS = [
"url", # PRIMARY KEY (clave de dedup)
"source",
"job_id",
"title",
"budget",
"posted",
"bids",
"skills_json",
"snippet",
"country",
"is_custom_software",
"scraped_at",
"first_seen_at", # ownership de la DB: se setea al insertar, no se pisa al re-upsert
]
# Columnas que el UPSERT refresca en conflicto: TODAS menos la clave (url) y
# first_seen_at (la DB es dueña — la primera vez que se vio el proyecto no cambia).
_UPDATE_COLS = [c for c in _COLUMNS if c not in ("url", "first_seen_at")]
# DDL idempotente. url es PRIMARY KEY: imprescindible para que el ON CONFLICT del
# upsert deduplique por url.
_DDL = f"""
CREATE TABLE IF NOT EXISTS {_TABLE} (
url VARCHAR PRIMARY KEY,
source VARCHAR,
job_id VARCHAR,
title VARCHAR,
budget VARCHAR,
posted VARCHAR,
bids VARCHAR,
skills_json VARCHAR,
snippet VARCHAR,
country VARCHAR,
is_custom_software BOOLEAN,
scraped_at VARCHAR,
first_seen_at VARCHAR
)
""".strip()
# Keywords fuertes que marcan un proyecto como "software a medida". Se buscan sobre
# title + snippet + skills, todo en minusculas y sin acentos. El flag SOLO marca
# (resalta) — no filtra: el usuario quiere ver todo lo de programacion.
CUSTOM_SW_KEYWORDS = [
"a medida",
"custom software",
"desarrollo de software",
"mvp",
"saas",
"aplicacion web",
"web app",
"aplicacion movil",
"app movil",
"automatizacion",
"bot",
"scraping",
"integracion api",
"api rest",
"sistema de gestion",
"plataforma",
"crm",
"erp",
"dashboard",
"backend",
"fullstack",
"full stack",
"microservicio",
]
# Headers legibles (espanol) de las hojas del Excel y el orden de sus columnas.
_XLSX_HEADERS = [
"Fuente",
"Título",
"Presupuesto",
"A medida",
"Publicado",
"Propuestas",
"Skills",
"País",
"URL",
"Snippet",
]
def _strip_accents(text: str) -> str:
"""Devuelve `text` en minusculas y sin tildes/diacriticos.
Normaliza con NFKD y descarta los caracteres combinantes para que el match de
keywords funcione igual con "aplicación" que con "aplicacion".
"""
norm = unicodedata.normalize("NFKD", text)
return "".join(c for c in norm if not unicodedata.combining(c)).lower()
def _is_custom_software(project: dict) -> bool:
"""Decide si un proyecto es "software a medida" por sus keywords.
Concatena title + snippet + skills del proyecto, lo normaliza (minusculas, sin
acentos) y devuelve True si alguna de las CUSTOM_SW_KEYWORDS aparece como
substring. Solo MARCA el proyecto; no lo filtra.
"""
skills = project.get("skills") or []
if not isinstance(skills, list):
skills = []
haystack_parts = [
str(project.get("title") or ""),
str(project.get("snippet") or ""),
" ".join(str(s) for s in skills),
]
haystack = _strip_accents(" ".join(haystack_parts))
return any(kw in haystack for kw in CUSTOM_SW_KEYWORDS)
def _normalize_project(project: dict) -> dict:
"""Convierte un project del scraper en una fila lista para DuckDB.
Serializa `skills` (list) a JSON string `skills_json`, calcula
`is_custom_software` y setea `first_seen_at = scraped_at` (solo se usa al
insertar; el upsert no lo pisa en conflicto). Devuelve un dict con EXACTAMENTE
las claves de `_COLUMNS`, en ese orden.
"""
skills = project.get("skills") or []
if not isinstance(skills, list):
skills = []
scraped_at = project.get("scraped_at") or ""
return {
"url": project.get("url") or "",
"source": project.get("source") or "",
"job_id": project.get("job_id") or "",
"title": project.get("title") or "",
"budget": project.get("budget") or "",
"posted": project.get("posted") or "",
"bids": project.get("bids") or "",
"skills_json": json.dumps(skills, ensure_ascii=False),
"snippet": project.get("snippet") or "",
"country": project.get("country") or "",
"is_custom_software": _is_custom_software(project),
"scraped_at": scraped_at,
"first_seen_at": scraped_at,
}
def _row_to_xlsx(row: dict) -> list:
"""Convierte una fila de la tabla en la lista de celdas del Excel.
Acepta tanto un dict recien normalizado (skills_json string) como una fila
leida de la DB. Convierte is_custom_software a ""/"No" y skills_json (JSON
string) de vuelta a una cadena legible separada por comas.
"""
skills_json = row.get("skills_json") or "[]"
try:
skills = json.loads(skills_json)
if not isinstance(skills, list):
skills = []
except (ValueError, TypeError):
skills = []
skills_str = ", ".join(str(s) for s in skills)
a_medida = "" if row.get("is_custom_software") else "No"
return [
row.get("source") or "",
row.get("title") or "",
row.get("budget") or "",
a_medida,
row.get("posted") or "",
row.get("bids") or "",
skills_str,
row.get("country") or "",
row.get("url") or "",
row.get("snippet") or "",
]
def monitor_freelance_projects(
category: str = "it-programming",
language: str = "es",
query: str = "",
pages: int = 1,
include_upwork: bool = False,
upwork_query: str = "custom software",
duckdb_path: str = "",
xlsx_path: str = "",
port: int = 9222,
timeout_s: float = 25.0,
) -> dict:
"""Detecta proyectos freelance nuevos, los persiste con dedup y exporta a Excel.
Pipeline IMPURO: requiere un Chrome con remote debugging escuchando en `port`
(los scrapers renderizan SPAs via CDP) y escribe en disco (DuckDB + .xlsx).
Compone seis funciones del registry y nunca lanza: cualquier fallo se refleja en
la clave `status` del dict devuelto. NUNCA inventa datos.
Pasos:
1. Scrapea Workana (siempre). Si include_upwork, scrapea Upwork tambien; si
Upwork falla (status='error'), se loguea y se sigue solo con Workana.
2. Normaliza cada project: skills -> skills_json (TEXT), anade
is_custom_software (BOOLEAN) por keywords, first_seen_at = scraped_at.
3. DDL idempotente (CREATE TABLE IF NOT EXISTS) via duckdb_execute.
4. Lee las urls ya existentes para identificar QUE proyectos son nuevos, y
hace UPSERT idempotente por url (dedup; first_seen_at no se pisa).
5. Lee la tabla completa y escribe un .xlsx con dos hojas: "Nuevos" (solo los
de esta corrida) y "Todos".
Args:
category: categoria de Workana (?category=). Default "it-programming".
language: idioma de los proyectos de Workana (?language=). Default "es".
query: query libre aplicada a ambas fuentes. En Workana se pasa como
extra_query; en Upwork sobrescribe upwork_query si no esta vacia.
pages: numero de paginas de listado a recorrer por fuente. Default 1.
include_upwork: si True, scrapea Upwork ademas de Workana. Default False
(sus selectores no estan validados en vivo y requiere login).
upwork_query: query para Upwork cuando include_upwork. Default
"custom software". `query` lo sobrescribe si se pasa.
duckdb_path: ruta del archivo DuckDB. Si "", usa ~/.fn_freelance/freelance.duckdb
(creando el directorio).
xlsx_path: ruta del .xlsx de salida. Si "", usa
~/.fn_freelance/freelance_projects.xlsx (creando el directorio).
port: puerto de remote debugging del Chrome a usar por los scrapers.
Default 9222 (chromium-personal logueado).
timeout_s: timeout en segundos por pagina para los scrapers. Default 25.0.
Returns:
dict. En exito::
{
"status": "ok",
"new_count": int, # proyectos nuevos de esta corrida
"total_in_db": int, # filas totales en la tabla
"new_projects": [ {...}, ], # los proyectos nuevos (normalizados)
"xlsx_path": "<abs>",
"duckdb_path": "<abs>",
"sources": {
"workana": {"count": int, "status": str},
"upwork": {"count": int, "status": str} | "skipped",
},
}
En error (sin lanzar): {"status": "error", "error": str, "sources": {...}}.
"""
sources_report: dict = {}
try:
# Resolver rutas: si vienen vacias, usar los defaults y crear el directorio.
db_path = os.path.abspath(duckdb_path) if duckdb_path else _DEFAULT_DB
out_xlsx = os.path.abspath(xlsx_path) if xlsx_path else _DEFAULT_XLSX
os.makedirs(os.path.dirname(db_path), exist_ok=True)
os.makedirs(os.path.dirname(out_xlsx), exist_ok=True)
# --- Paso 1: scrape Workana (siempre). Su fallo es error duro. ---
wk = scrape_workana_projects(
category=category,
language=language,
extra_query=query,
pages=pages,
port=port,
timeout_s=timeout_s,
)
wk_status = wk.get("status", "error")
wk_projects = wk.get("projects", []) if isinstance(wk, dict) else []
sources_report["workana"] = {
"count": len(wk_projects),
"status": wk_status,
}
if wk_status != "ok":
return {
"status": "error",
"error": f"Workana scrape fallo: {wk.get('error', 'sin detalle')}",
"sources": sources_report,
}
# --- Paso 1b: scrape Upwork (opcional, tolerante a fallo). ---
all_projects = list(wk_projects)
if include_upwork:
uw_q = query or upwork_query
uw = scrape_upwork_projects(
query=uw_q,
pages=pages,
port=port,
timeout_s=timeout_s,
)
uw_status = uw.get("status", "error") if isinstance(uw, dict) else "error"
uw_projects = uw.get("projects", []) if isinstance(uw, dict) else []
sources_report["upwork"] = {
"count": len(uw_projects),
"status": uw_status,
}
if uw_status == "ok":
all_projects.extend(uw_projects)
else:
# No abortamos: seguimos solo con Workana.
print(
f"[monitor_freelance_projects] WARN Upwork no devolvio datos "
f"(status={uw_status}, error={uw.get('error') if isinstance(uw, dict) else 'n/a'}); "
f"se continua solo con Workana.",
file=sys.stderr,
)
else:
sources_report["upwork"] = "skipped"
# --- Paso 2: normalizar + enriquecer. Dedup intra-corrida por url. ---
rows_by_url: dict = {}
for project in all_projects:
if not isinstance(project, dict):
continue
url = project.get("url")
if not url:
continue
rows_by_url[url] = _normalize_project(project)
rows = list(rows_by_url.values())
# --- Paso 3: DDL idempotente. ---
ddl_res = duckdb_execute(db_path, _DDL)
if ddl_res.get("status") != "ok":
return {
"status": "error",
"error": f"DDL fallo: {ddl_res.get('error', 'sin detalle')}",
"sources": sources_report,
}
# --- Paso 4a: leer urls ya existentes para saber cuales son nuevas. ---
existing_urls: set = set()
if rows:
q_urls = duckdb_query_readonly(
db_path,
f"SELECT url FROM {_TABLE}",
max_rows=1_000_000,
)
if q_urls.get("status") != "ok":
return {
"status": "error",
"error": f"lectura de urls existentes fallo: {q_urls.get('error', 'sin detalle')}",
"sources": sources_report,
}
existing_urls = {r.get("url") for r in q_urls.get("rows", [])}
new_projects = [r for r in rows if r["url"] not in existing_urls]
# --- Paso 4b: UPSERT idempotente por url. ---
if rows:
up = duckdb_upsert(
db_path,
_TABLE,
rows,
key_cols=["url"],
update_cols=_UPDATE_COLS,
)
if up.get("status") != "ok":
return {
"status": "error",
"error": f"upsert fallo: {up.get('error', 'sin detalle')}",
"sources": sources_report,
}
# --- Paso 5: leer toda la tabla y exportar a Excel. ---
q_all = duckdb_query_readonly(
db_path,
f"SELECT {', '.join(_COLUMNS)} FROM {_TABLE} ORDER BY scraped_at DESC",
max_rows=1_000_000,
)
if q_all.get("status") != "ok":
return {
"status": "error",
"error": f"lectura de la tabla para Excel fallo: {q_all.get('error', 'sin detalle')}",
"sources": sources_report,
}
all_rows_db = q_all.get("rows", [])
total_in_db = len(all_rows_db)
new_urls = {r["url"] for r in new_projects}
sheet_nuevos = [_row_to_xlsx(r) for r in all_rows_db if r.get("url") in new_urls]
sheet_todos = [_row_to_xlsx(r) for r in all_rows_db]
abs_xlsx = write_xlsx_sheets(
out_xlsx,
{
"Nuevos": {"headers": _XLSX_HEADERS, "rows": sheet_nuevos},
"Todos": {"headers": _XLSX_HEADERS, "rows": sheet_todos},
},
)
return {
"status": "ok",
"new_count": len(new_projects),
"total_in_db": total_in_db,
"new_projects": new_projects,
"xlsx_path": abs_xlsx,
"duckdb_path": db_path,
"sources": sources_report,
}
except Exception as e: # noqa: BLE001 — el pipeline nunca lanza
return {
"status": "error",
"error": f"{type(e).__name__}: {e}",
"sources": sources_report,
}
def main() -> int:
import argparse
ap = argparse.ArgumentParser(
description="Monitor de captacion de clientes freelance (Workana + Upwork -> DuckDB + Excel)."
)
ap.add_argument("--category", default="it-programming")
ap.add_argument("--language", default="es")
ap.add_argument("--query", default="")
ap.add_argument("--pages", type=int, default=1)
ap.add_argument("--include-upwork", action="store_true")
ap.add_argument("--upwork-query", default="custom software")
ap.add_argument("--duckdb-path", default="")
ap.add_argument("--xlsx-path", default="")
ap.add_argument("--port", type=int, default=9222)
ap.add_argument("--timeout-s", type=float, default=25.0)
args = ap.parse_args()
out = monitor_freelance_projects(
category=args.category,
language=args.language,
query=args.query,
pages=args.pages,
include_upwork=args.include_upwork,
upwork_query=args.upwork_query,
duckdb_path=args.duckdb_path,
xlsx_path=args.xlsx_path,
port=args.port,
timeout_s=args.timeout_s,
)
print(json.dumps(out, ensure_ascii=False, indent=2))
return 0 if out.get("status") == "ok" else 1
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,103 @@
---
name: profile_database
kind: pipeline
lang: py
domain: pipelines
purity: impure
version: "1.0.0"
signature: "def profile_database(db_path: str, tables: list = None, sample: int = 5000, report_dir: str = \"reports\", write_report: bool = True, min_inclusion: float = 0.9) -> dict"
description: "Orquestador one-shot del grupo eda a nivel de BASE: perfila TODA una base DuckDB (todas las tablas o las indicadas) componiendo profile_table por tabla, infiere las relaciones FK inter-tabla por containment y construye el join graph con diagrama Mermaid. Ensambla un DatabaseProfile (resumen por tabla + TableProfiles completos + fk_candidates + join_graph) y opcionalmente emite un report markdown DB-level + JSON sidecar. Es la composicion canonica para hazme un EDA de esta base de datos y entender su esquema relacional."
tags: [eda, relations, duckdb, profiling, data-quality, pipeline, dataops]
uses_functions:
- profile_table_py_pipelines
- infer_fk_containment_duckdb_py_datascience
- build_join_graph_py_datascience
- duckdb_list_tables_py_infra
- render_eda_markdown_py_datascience
uses_types: []
returns: []
returns_optional: false
error_type: error_go_core
imports: []
tested: true
tests:
- "profile_database_two_related_tables"
- "profile_database_writes_report"
test_file_path: "python/functions/pipelines/profile_database_test.py"
file_path: "python/functions/pipelines/profile_database.py"
params:
- name: db_path
desc: "Ruta al archivo DuckDB (read-only, debe existir; no se crea)."
- name: tables
desc: "Lista de tablas a perfilar. None (default) usa todas las del esquema main via duckdb_list_tables."
- name: sample
desc: "Maximo de valores no nulos muestreados por columna en el perfil de cada tabla (se pasa a profile_table). Default 5000."
- name: report_dir
desc: "Directorio donde escribir los reports DB-level si write_report. Default 'reports'. Se crea si no existe."
- name: write_report
desc: "Si True (default) escribe report markdown DB-level + JSON sidecar timestamped en report_dir; si False no toca disco y los paths del retorno son None."
- name: min_inclusion
desc: "Umbral minimo de inclusion (0-1) para emitir una FK candidata (se pasa a infer_fk_containment_duckdb). Default 0.9."
output: "dict {status:'ok', db_profile:<DatabaseProfile con db_path, profiled_at, n_tables, tables[resumen], table_profiles[completos], fk_candidates, join_graph{nodes,edges,mermaid,hubs}, errors>, report_md_path:str|None, report_json_path:str|None} o {status:'error', error:str} (dict-no-throw)."
---
## Ejemplo
```python
import os
import tempfile
import duckdb
from pipelines.profile_database import profile_database
# Base DuckDB de juguete en /tmp: customers <- orders (relacionadas).
db = os.path.join(tempfile.mkdtemp(), "shop.duckdb")
con = duckdb.connect(db)
con.execute("CREATE TABLE customers (id INTEGER, name VARCHAR, city VARCHAR)")
con.execute("INSERT INTO customers VALUES (1,'Ana','Madrid'),(2,'Luis','Sevilla'),(3,'Marta','Bilbao')")
con.execute("CREATE TABLE orders (order_id INTEGER, customer_id INTEGER, total DOUBLE)")
con.execute("INSERT INTO orders VALUES (10,1,99.5),(11,1,12.0),(12,2,45.0),(13,3,7.25)")
con.close()
r = profile_database(db, write_report=False)
print(r["status"], r["db_profile"]["n_tables"]) # ok 2
print([fk["from_table"]+"."+fk["from_col"]+"->"+fk["to_table"]+"."+fk["to_col"]
for fk in r["db_profile"]["fk_candidates"]])
# ['orders.customer_id->customers.id'] -> FK inferida por containment
print(r["db_profile"]["join_graph"]["mermaid"].splitlines()[0]) # graph LR
# Con report DB-level a disco (markdown con diagrama Mermaid + JSON sidecar):
r = profile_database(db, report_dir="reports")
print(r["report_md_path"], r["report_json_path"])
# reports/eda_db_20260620-101500.md reports/eda_db_20260620-101500.json
```
## Cuando usarla
Cuando necesites entender una BASE de datos entera de un golpe: el perfil de
todas sus tablas mas su esquema relacional (que tabla referencia a cual, con que
cardinalidad) en una sola llamada. Usala al recibir una base DuckDB desconocida,
para documentar un data warehouse, para descubrir el star schema (las tablas hub
del join graph) o antes de escribir joins sin tener el modelo declarado. Es el
escalon DB-level sobre `profile_table` (que perfila una sola tabla): aqui ademas
se infieren las FK y se dibuja el diagrama de relaciones.
## Gotchas
- Impura: con `write_report=True` (default) ESCRIBE dos archivos a `report_dir`
(markdown DB-level + JSON sidecar). Pasa `write_report=False` para un dry-run
sin tocar disco.
- Las FK se infieren por CONTAINMENT, es una HEURISTICA: A->B es candidata si los
valores distintos de A estan contenidos en B (>= `min_inclusion`) y B parece
clave (alta unicidad en su tabla). Puede dar falsos positivos (columnas que
comparten dominio sin ser FK real, p.ej. dos columnas de codigos de pais) o
perder FK reales si `min_inclusion` es muy alto o los datos estan sucios. Es un
punto de partida para mapear el esquema, no un DDL autoritativo.
- Perfila TODAS las tablas por defecto: en bases grandes (muchas tablas o tablas
muy anchas) puede TARDAR. Acota con `tables=[...]` o baja `sample`. La
inferencia de FK ademas salta pares hacia tablas con mas de 200k filas (lado
caro del INTERSECT); esas relaciones quedan sin evaluar.
- Tolera fallos por tabla: si el perfil de una tabla concreta falla, se anota en
`db_profile["errors"]` y se sigue con las demas; `n_tables` cuenta solo las
perfiladas con exito. Revisa `errors` para saber que quedo fuera.
- `db_path` debe existir: DuckDB read-only NO crea la base. El muestreo de cada
tabla usa el sandbox read-only por defecto (sin acceso a FS/red).
@@ -0,0 +1,227 @@
"""profile_database — orquestador one-shot del grupo `eda` a nivel de BASE.
Pipeline impuro: perfila TODA una base DuckDB (todas las tablas o las indicadas)
componiendo el grupo de capacidad `eda` y, encima, infiere las relaciones FK
entre tablas y construye el join graph. Es la composicion canonica para "hazme
un EDA de esta base de datos": una sola llamada en vez de orquestar el perfil de
cada tabla + la inferencia de relaciones a mano.
Funciones del registry compuestas (NO se reimplementa su logica):
- profile_table : perfila UNA tabla end-to-end (a su vez compone el grupo eda).
- infer_fk_containment_duckdb : infiere FK candidatas por containment de valores.
- build_join_graph : grafo de relaciones inter-tabla + diagrama Mermaid.
- duckdb_list_tables : introspeccion "que tablas hay" (read-only).
- render_eda_markdown : report legible de un TableProfile.
Aporta una capa propia de AGREGACION A NIVEL DE BASE: ensambla un DatabaseProfile
con el resumen de cada tabla, los TableProfiles completos, las FK candidatas y el
join graph, y opcionalmente emite un report markdown DB-level (con un diagrama
Mermaid) + un JSON sidecar a disco.
Estilo dict-no-throw del grupo: nunca lanza; captura cualquier error y devuelve
{status:'error', error:str}. Los fallos por tabla individual se toleran: se anota
el error en errors[] y se sigue con las demas tablas.
"""
import json
import os
from datetime import datetime, timezone
from datascience import (
build_join_graph,
infer_fk_containment_duckdb,
render_eda_markdown,
)
from infra import duckdb_list_tables
from pipelines.profile_table import profile_table
def _table_summary(prof: dict) -> dict:
"""Extrae el resumen de cabecera de un TableProfile para la vista DB-level."""
return {
"table": prof.get("table"),
"n_rows": prof.get("n_rows"),
"n_cols": prof.get("n_cols"),
"quality_score": prof.get("quality_score"),
"key_candidates": prof.get("key_candidates", []),
"type_breakdown": prof.get("type_breakdown", {}),
}
def _render_db_markdown(db_profile: dict) -> str:
"""Renderiza el report markdown a nivel de base.
Tabla resumen de tablas, tabla de relaciones inter-tabla (FK candidatas),
diagrama Mermaid del join graph, y un detalle por tabla reusando
render_eda_markdown sobre cada TableProfile completo.
"""
lines = []
lines.append(f"# EDA base — {db_profile.get('db_path')}")
lines.append("")
lines.append(f"- profiled_at: {db_profile.get('profiled_at')}")
lines.append(f"- n_tables: {db_profile.get('n_tables')}")
lines.append("")
# ## Tablas
lines.append("## Tablas")
lines.append("")
lines.append("| Tabla | Filas | Cols | Calidad | key_candidates |")
lines.append("|---|---|---|---|---|")
for t in db_profile.get("tables", []):
keys = ", ".join(t.get("key_candidates") or []) or ""
lines.append(
f"| {t.get('table')} | {t.get('n_rows')} | {t.get('n_cols')} "
f"| {t.get('quality_score')} | {keys} |"
)
lines.append("")
# ## Relaciones inter-tabla
lines.append("## Relaciones inter-tabla")
lines.append("")
fks = db_profile.get("fk_candidates", [])
if fks:
lines.append("| From | To | Inclusion | Cardinalidad |")
lines.append("|---|---|---|---|")
for fk in fks:
frm = f"{fk.get('from_table')}.{fk.get('from_col')}"
to = f"{fk.get('to_table')}.{fk.get('to_col')}"
inc = fk.get("inclusion")
inc_s = f"{inc:.3f}" if isinstance(inc, (int, float)) else str(inc)
lines.append(f"| {frm} | {to} | {inc_s} | {fk.get('cardinality')} |")
else:
lines.append("_Sin relaciones FK candidatas detectadas._")
lines.append("")
# ## Diagrama
lines.append("## Diagrama")
lines.append("")
mermaid = (db_profile.get("join_graph") or {}).get("mermaid", "")
lines.append("```mermaid")
lines.append(mermaid)
lines.append("```")
lines.append("")
# ## Detalle por tabla
lines.append("## Detalle por tabla")
lines.append("")
for prof in db_profile.get("table_profiles", []):
lines.append(render_eda_markdown(prof))
lines.append("")
return "\n".join(lines)
def profile_database(
db_path: str,
tables: list = None,
sample: int = 5000,
report_dir: str = "reports",
write_report: bool = True,
min_inclusion: float = 0.9,
) -> dict:
"""Perfila una base DuckDB entera + sus relaciones inter-tabla.
Args:
db_path: ruta al archivo DuckDB (read-only, debe existir).
tables: lista de tablas a perfilar. None (default) usa todas las del
esquema main (duckdb_list_tables).
sample: maximo de valores no nulos muestreados por columna en el perfil
de cada tabla (se pasa a profile_table). Default 5000.
report_dir: directorio donde escribir los reports DB-level si
write_report. Default "reports". Se crea si no existe.
write_report: si True (default), escribe un report markdown DB-level + un
JSON sidecar timestamped en report_dir. Si False, no toca disco y los
paths del retorno son None.
min_inclusion: umbral minimo de inclusion (0-1) para emitir una FK
candidata (se pasa a infer_fk_containment_duckdb). Default 0.9.
Returns:
dict dict-no-throw. En exito:
{status:'ok', db_profile:<DatabaseProfile>,
report_md_path:str|None, report_json_path:str|None}.
En error (sin lanzar): {status:'error', error:str}.
DatabaseProfile = {
db_path, profiled_at, n_tables,
tables:[{table, n_rows, n_cols, quality_score, key_candidates,
type_breakdown}, ...],
table_profiles:[<TableProfile completo>, ...],
fk_candidates:[...], join_graph:{nodes, edges, mermaid, hubs},
errors:[...]
}
"""
try:
# 1) Resolver lista de tablas.
if tables is None:
lst = duckdb_list_tables(db_path)
if lst.get("status") != "ok":
return {"status": "error", "error": lst.get("error", "list failed")}
tables = lst.get("tables", [])
if not isinstance(tables, list):
return {"status": "error", "error": "tables debe ser una lista o None"}
errors = []
table_profiles = []
table_summaries = []
# 2) Perfilar cada tabla (tolerando fallos individuales).
for table in tables:
r = profile_table(db_path, table, sample=sample, write_report=False)
if r.get("status") == "ok":
prof = r["profile"]
table_profiles.append(prof)
table_summaries.append(_table_summary(prof))
else:
errors.append(
{"table": table, "error": r.get("error", "profile failed")}
)
# 3) Inferir FK candidatas por containment.
fk = infer_fk_containment_duckdb(
db_path, tables=tables, min_inclusion=min_inclusion
)
if fk.get("status") == "ok":
fk_candidates = fk.get("fk_candidates", [])
else:
fk_candidates = []
errors.append({"step": "infer_fk", "error": fk.get("error", "fk failed")})
# 4) Construir el join graph.
graph = build_join_graph(fk_candidates, tables=tables)
# 5) Ensamblar el DatabaseProfile.
db_profile = {
"db_path": db_path,
"profiled_at": datetime.now(timezone.utc).isoformat(),
"n_tables": len(table_profiles),
"tables": table_summaries,
"table_profiles": table_profiles,
"fk_candidates": fk_candidates,
"join_graph": graph,
"errors": errors,
}
# 6) Reports opcionales.
report_md_path = None
report_json_path = None
if write_report:
os.makedirs(report_dir, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
report_json_path = os.path.join(report_dir, f"eda_db_{ts}.json")
report_md_path = os.path.join(report_dir, f"eda_db_{ts}.md")
with open(report_json_path, "w", encoding="utf-8") as fh:
fh.write(
json.dumps(db_profile, ensure_ascii=False, indent=1, default=str)
)
with open(report_md_path, "w", encoding="utf-8") as fh:
fh.write(_render_db_markdown(db_profile))
return {
"status": "ok",
"db_profile": db_profile,
"report_md_path": report_md_path,
"report_json_path": report_json_path,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
@@ -0,0 +1,96 @@
"""Tests para profile_database — perfilado de una base DuckDB + relaciones."""
import os
import sys
import tempfile
import duckdb
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from pipelines.profile_database import profile_database
def _build_related_db(path: str) -> None:
"""Crea una DuckDB con 2 tablas relacionadas: customers <- orders.
customers.id es clave; orders.customer_id contiene solo ids de customers,
de modo que orders.customer_id -> customers.id es una FK detectable por
containment.
"""
conn = duckdb.connect(path)
try:
conn.execute(
"CREATE TABLE customers (id INTEGER, name VARCHAR, city VARCHAR)"
)
conn.execute(
"INSERT INTO customers VALUES "
"(1,'Ana','Madrid'),(2,'Luis','Sevilla'),"
"(3,'Marta','Bilbao'),(4,'Jon','Vigo')"
)
conn.execute(
"CREATE TABLE orders (order_id INTEGER, customer_id INTEGER, total DOUBLE)"
)
conn.execute(
"INSERT INTO orders VALUES "
"(10,1,99.5),(11,1,12.0),(12,2,45.0),"
"(13,3,7.25),(14,4,200.0),(15,2,33.3)"
)
finally:
conn.close()
def test_profile_database_two_related_tables():
with tempfile.TemporaryDirectory() as d:
db_path = os.path.join(d, "shop.duckdb")
_build_related_db(db_path)
res = profile_database(db_path, write_report=False)
# status ok y dos tablas perfiladas
assert res["status"] == "ok", res
prof = res["db_profile"]
assert prof["n_tables"] == 2
# los TableProfiles completos llegan para ambas tablas
assert len(prof["table_profiles"]) == 2
profiled_tables = {tp["table"] for tp in prof["table_profiles"]}
assert profiled_tables == {"customers", "orders"}
# se detecta la relacion orders.customer_id -> customers.id
fks = prof["fk_candidates"]
assert any(
fk.get("from_table") == "orders"
and fk.get("from_col") == "customer_id"
and fk.get("to_table") == "customers"
and fk.get("to_col") == "id"
for fk in fks
), fks
# el join graph trae un diagrama mermaid
graph = prof["join_graph"]
assert "mermaid" in graph
assert isinstance(graph["mermaid"], str)
assert graph["mermaid"].startswith("graph LR")
# no se reportan paths cuando write_report=False
assert res["report_md_path"] is None
assert res["report_json_path"] is None
def test_profile_database_writes_report(tmp_path):
db_path = os.path.join(str(tmp_path), "shop2.duckdb")
_build_related_db(db_path)
report_dir = os.path.join(str(tmp_path), "reports")
res = profile_database(db_path, report_dir=report_dir, write_report=True)
assert res["status"] == "ok", res
assert res["report_md_path"] is not None
assert res["report_json_path"] is not None
assert os.path.exists(res["report_md_path"])
assert os.path.exists(res["report_json_path"])
md = open(res["report_md_path"], encoding="utf-8").read()
assert "# EDA base —" in md
assert "## Relaciones inter-tabla" in md
assert "```mermaid" in md
@@ -0,0 +1,89 @@
---
name: profile_table
kind: pipeline
lang: py
domain: pipelines
purity: impure
version: "1.0.0"
signature: "def profile_table(db_path: str, table: str, sample: int = 5000, report_dir: str = \"reports\", write_report: bool = True) -> dict"
description: "Orquestador one-shot del grupo de capacidad eda: perfila UNA tabla DuckDB end-to-end componiendo las 7 funciones del grupo (perfil base SQL + muestreo read-only + inferencia semantica + promocion de tipo + estadistica numerica/categorica + score de calidad + render markdown) y emite el TableProfile completo mas (opcional) un report markdown y un JSON sidecar. Es la composicion canonica para hazme un EDA de esta tabla."
tags: [eda, duckdb, profiling, data-quality, pipeline, dataops]
uses_functions:
- summarize_table_duckdb_py_datascience
- describe_numeric_py_datascience
- summarize_categorical_py_datascience
- infer_semantic_type_py_datascience
- column_quality_score_py_datascience
- render_eda_markdown_py_datascience
- duckdb_query_readonly_py_infra
uses_types: []
returns: []
returns_optional: false
error_type: error_go_core
imports: []
tested: true
tests:
- "VARCHAR-entera se promociona a numeric con bloque numeric y key_candidates es lista"
test_file_path: "python/functions/pipelines/profile_table_test.py"
file_path: "python/functions/pipelines/profile_table.py"
params:
- name: db_path
desc: "Ruta al archivo DuckDB (read-only, debe existir; no se crea)."
- name: table
desc: "Nombre de la tabla a perfilar."
- name: sample
desc: "Maximo de valores no nulos muestreados por columna para el enriquecimiento (describe_numeric / summarize_categorical / infer_semantic_type). Default 5000."
- name: report_dir
desc: "Directorio donde escribir los reports si write_report. Default 'reports'. Se crea si no existe."
- name: write_report
desc: "Si True (default) escribe report markdown + JSON sidecar timestamped en report_dir; si False no toca disco y los paths del retorno son None."
output: "dict {status:'ok', profile:<TableProfile enriquecido con quality_score, key_candidates y type_breakdown recalculado>, report_md_path:str|None, report_json_path:str|None} o {status:'error', error:str} (dict-no-throw)."
---
## Ejemplo
```python
import os
from pipelines.profile_table import profile_table
# Tabla real: freelance_projects (35 filas) en la DuckDB del monitor de captacion.
db = os.path.expanduser("~/.fn_freelance/freelance.duckdb")
r = profile_table(db, "freelance_projects", sample=5000, write_report=False)
print(r["status"], r["profile"]["quality_score"], r["profile"]["type_breakdown"])
# ok 98.9 {'numeric': 1, 'categorical': 9, 'datetime': 2, 'text': 0, 'boolean': 1}
# ^ 'bids' (VARCHAR '1'..'107') se promociono a numeric via semantic_type=integer.
# Con report a disco (markdown + JSON sidecar en reports/):
r = profile_table(db, "freelance_projects")
print(r["report_md_path"], r["report_json_path"])
# reports/eda_freelance_projects_20260620-101500.md reports/eda_freelance_projects_20260620-101500.json
```
## Cuando usarla
Cuando necesites un EDA completo de una tabla DuckDB en una sola llamada: perfil
por columna + estadistica fina + calidad + report listo para leer. Usala como
primer paso al recibir un dataset desconocido, antes de modelar o limpiar, o
para auditar la calidad de una tabla ya productiva. Reemplaza orquestar a mano
`summarize_table_duckdb` -> muestreo -> `describe_numeric`/`summarize_categorical`
-> `column_quality_score` -> `render_eda_markdown` columna por columna.
## Gotchas
- Impura: con `write_report=True` (default) ESCRIBE dos archivos a `report_dir`
(markdown + JSON). Pasa `write_report=False` para un dry-run sin tocar disco.
- La promocion de tipo es una HEURISTICA sobre la muestra: una columna VARCHAR se
reclasifica a `numeric` solo si su `semantic_type` es integer/decimal/currency
y al menos el 80% de la muestra parsea a float; a `datetime` si el
`semantic_type` es datetime_iso/date_eu. Tablas con datos sucios o muestras no
representativas pueden quedar mal clasificadas; sube `sample` para muestras mas
fiables (coste: mas filas traidas a RAM por columna).
- Las columnas promovidas a `datetime` aun NO reciben perfil fino:
`col["datetime"]` queda en `None` (la funcion `profile_datetime` del grupo
llega en otra fase). Su `semantic_type` si se conserva.
- El parseo numerico limpia simbolos de moneda (€/$/£/EUR/USD/GBP), espacios y
separadores de miles; con coma y punto juntos asume punto=miles, coma=decimal.
Formatos exoticos pueden descartarse silenciosamente del calculo numerico.
- `db_path` debe existir: DuckDB read-only NO crea la base. El muestreo usa el
sandbox por defecto de `duckdb_query_readonly` (sin acceso a FS/red).
+296
View File
@@ -0,0 +1,296 @@
"""profile_table — orquestador one-shot del grupo de capacidad `eda`.
Pipeline impuro: perfila UNA tabla DuckDB end-to-end componiendo las funciones
puras e impuras del grupo `eda` y, opcionalmente, escribe un report markdown +
JSON sidecar a disco. Es la composicion canonica para "hazme un EDA de esta
tabla": una sola llamada en vez de orquestar 7 funciones a mano.
Funciones del registry compuestas (NO se reimplementa su logica):
- summarize_table_duckdb : perfil base por columna (push-down SQL, sin RAM).
- duckdb_query_readonly : muestra read-only de valores no nulos por columna.
- infer_semantic_type : clasifica VARCHAR (email, integer, currency, ...).
- describe_numeric : estadistica fina sobre la muestra numerica.
- summarize_categorical : top-k, moda, entropia sobre la muestra categorica.
- column_quality_score : score 0-100 de calidad por columna.
- render_eda_markdown : report legible del TableProfile.
Aporta una capa propia de PROMOCION DE TIPO: muchas tablas guardan numeros y
fechas como VARCHAR. Tras el perfil base, se muestrea cada columna textual, se
infiere su semantic_type y, si encaja, se promociona inferred_type a "numeric"
o "datetime" antes de enriquecer. Asi una columna '10','20' (VARCHAR) recibe su
bloque numeric en vez de quedarse como categorica.
Estilo dict-no-throw del grupo: nunca lanza; captura cualquier error y devuelve
{status:'error', error:str}.
"""
import json
import os
from datetime import datetime, timezone
from datascience import (
association_matrix,
column_quality_score,
describe_numeric,
eda_llm_insights,
infer_semantic_type,
render_eda_markdown,
run_eda_models,
summarize_categorical,
summarize_table_duckdb,
)
from infra import duckdb_query_readonly
# semantic_types que justifican promocionar inferred_type -> "numeric".
_NUMERIC_SEMANTIC = ("integer", "decimal", "currency")
# semantic_types que justifican promocionar inferred_type -> "datetime".
_DATETIME_SEMANTIC = ("datetime_iso", "date_eu")
# Fraccion minima de la muestra que debe parsear a float para confirmar la
# promocion a numeric (evita promocionar columnas mayormente no parseables).
_PROMOTE_MIN_PARSE = 0.8
def _to_float(value):
"""Parsea un valor a float limpiando simbolos de moneda y separadores.
Quita simbolos de divisa (EUR/USD/GBP/€/$/£), espacios y separadores de
miles, y normaliza la coma decimal. Devuelve None si no parsea.
"""
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return float(value)
s = str(value).strip()
if not s:
return None
# Limpia simbolos de moneda y unidades textuales.
for tok in ("", "$", "£", "EUR", "USD", "GBP", "eur", "usd", "gbp"):
s = s.replace(tok, "")
s = s.strip()
# Normaliza separadores: si hay coma y punto, asume punto=miles, coma=decimal.
if "," in s and "." in s:
s = s.replace(".", "").replace(",", ".")
elif "," in s:
# Solo coma: tratar como separador decimal.
s = s.replace(",", ".")
s = s.replace(" ", "")
try:
return float(s)
except (TypeError, ValueError):
return None
def _sample_values(db_path: str, table: str, name: str, sample: int) -> list:
"""Trae hasta `sample` valores no nulos de una columna (read-only)."""
q = duckdb_query_readonly(
db_path,
f'SELECT "{name}" AS v FROM "{table}" WHERE "{name}" IS NOT NULL '
f"LIMIT {int(sample)}",
)
if q.get("status") != "ok":
return []
return [row.get("v") for row in q.get("rows", [])]
def _sample_rows(db_path: str, table: str, names: list, sample: int) -> list:
"""Trae hasta `sample` filas completas con las columnas alineadas por fila.
A diferencia de _sample_values (una columna, solo no nulos), esto preserva la
alineacion por fila entre columnas, requisito de la matriz de asociacion
(los pares (a_i, b_i) deben venir de la misma fila).
"""
if not names:
return []
cols_sql = ", ".join(f'"{n}"' for n in names)
q = duckdb_query_readonly(
db_path, f'SELECT {cols_sql} FROM "{table}" LIMIT {int(sample)}'
)
if q.get("status") != "ok":
return []
return q.get("rows", [])
def profile_table(
db_path: str,
table: str,
sample: int = 5000,
run_models: bool = False,
run_llm: bool = False,
report_dir: str = "reports",
write_report: bool = True,
) -> dict:
"""Perfila una tabla DuckDB end-to-end y emite el TableProfile completo.
Args:
db_path: ruta al archivo DuckDB (read-only, debe existir).
table: nombre de la tabla a perfilar.
sample: maximo de valores no nulos muestreados por columna para el
enriquecimiento (describe_numeric / summarize_categorical /
infer_semantic_type). Default 5000.
report_dir: directorio donde escribir los reports si write_report.
Default "reports". Se crea si no existe.
write_report: si True (default), escribe un report markdown + un JSON
sidecar timestamped en report_dir. Si False, no toca disco y los
paths del retorno son None.
Returns:
dict. En exito: {status:'ok', profile: <TableProfile>,
report_md_path: str|None, report_json_path: str|None}. En error (sin
lanzar): {status:'error', error:str}.
"""
try:
# 1) Perfil base por columna (push-down SQL).
r = summarize_table_duckdb(db_path, table)
if r.get("status") != "ok":
return {"status": "error", "error": r.get("error", "summarize failed")}
prof = r["profile"]
cols = prof.get("columns", [])
for col in cols:
name = col.get("name")
inferred = col.get("inferred_type")
# 2) Muestra de valores no nulos.
vals = _sample_values(db_path, table, name, sample)
# 3) Promocion de tipo sobre columnas textuales.
if inferred in ("categorical", "text"):
sem = infer_semantic_type(vals)
semantic = sem.get("semantic_type", "")
col["semantic_type"] = semantic
if semantic in _NUMERIC_SEMANTIC:
parsed = [_to_float(v) for v in vals]
ok = [f for f in parsed if f is not None]
if vals and (len(ok) / len(vals)) >= _PROMOTE_MIN_PARSE:
col["inferred_type"] = "numeric"
inferred = "numeric"
elif semantic in _DATETIME_SEMANTIC:
col["inferred_type"] = "datetime"
inferred = "datetime"
# 4) Enriquecer segun el inferred_type final.
if inferred == "numeric":
vals_float = [f for f in (_to_float(v) for v in vals) if f is not None]
col["numeric"] = describe_numeric(vals_float)
elif inferred in ("categorical", "text"):
col["categorical"] = summarize_categorical(vals)
# Para columnas no promovidas que ya eran categorical/text y no
# habian pasado por infer arriba, asegurar semantic_type seteado.
if not col.get("semantic_type"):
col["semantic_type"] = infer_semantic_type(vals).get(
"semantic_type", ""
)
elif inferred == "datetime":
# profile_datetime llega en otra fase; conserva semantic_type.
col["datetime"] = None
# 5) Score de calidad por columna.
col["quality_score"] = column_quality_score(col).get("score")
# 6) Score agregado de la tabla (media de columnas).
scores = [
c["quality_score"] for c in cols if c.get("quality_score") is not None
]
prof["quality_score"] = round(sum(scores) / len(scores), 1) if scores else None
# 7) Candidatos a clave.
key_candidates = []
for c in cols:
flags = c.get("flags") or []
unique_pct = c.get("unique_pct") or 0.0
null_pct = c.get("null_pct") or 0.0
if "possible_id" in flags or (unique_pct >= 0.99 and null_pct == 0):
key_candidates.append(c["name"])
prof["key_candidates"] = key_candidates
# 8) Recalcular type_breakdown tras la promocion.
type_breakdown = {
"numeric": 0,
"categorical": 0,
"datetime": 0,
"text": 0,
"boolean": 0,
}
for c in cols:
it = c.get("inferred_type")
if it in type_breakdown:
type_breakdown[it] += 1
prof["type_breakdown"] = type_breakdown
# 8.5) Matriz de correlacion/asociacion sobre una muestra de filas
# alineadas. Elige la metrica por par de tipos (Pearson/Spearman,
# Cramer's V/Theil's U, correlation ratio, MI) via association_matrix.
# Se salta el text de alta cardinalidad (ids/urls): solo mete ruido.
try:
corr_sample = min(int(sample), 5000)
# Excluye columnas id-like (possible_id / high_cardinality) de tipo
# categorical/text: su cardinalidad ~ n filas infla Cramer's V y MI
# con asociaciones espurias (cada valor unico empareja perfecto).
# Las numericas de alta cardinalidad SI se conservan (p.ej. precios).
def _skip_for_assoc(c):
it = c.get("inferred_type")
flags = c.get("flags") or []
return it in ("categorical", "text") and (
"possible_id" in flags or "high_cardinality" in flags
)
assoc_cols = [c for c in cols if not _skip_for_assoc(c)]
rows = _sample_rows(
db_path, table, [c["name"] for c in assoc_cols], corr_sample
)
assoc_input = {}
for c in assoc_cols:
name = c["name"]
it = c.get("inferred_type") or "categorical"
raw = [row.get(name) for row in rows]
if it == "numeric":
assoc_input[name] = {
"values": [_to_float(v) for v in raw],
"type": "numeric",
}
else:
assoc_input[name] = {"values": raw, "type": it}
prof["correlations"] = (
association_matrix(assoc_input) if len(assoc_input) >= 2 else None
)
# Modelos baratos opt-in (PCA/KMeans/IsolationForest/normalidad).
if run_models:
prof["models"] = run_eda_models(assoc_input)
except Exception: # noqa: BLE001
prof["correlations"] = None
prof["models"] = None
# 8.6) Capa LLM opcional: interpreta el perfil ya calculado en UNA
# llamada (data dictionary, resumen, granularidad de fila, PII, limpieza,
# analisis sugeridos). Solo envia el perfil agregado, nunca filas crudas.
if run_llm:
try:
res = eda_llm_insights(prof)
prof["llm"] = res.get("llm") if res.get("status") == "ok" else None
except Exception: # noqa: BLE001
prof["llm"] = None
# 9) Reports opcionales.
report_md_path = None
report_json_path = None
if write_report:
os.makedirs(report_dir, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
report_json_path = os.path.join(report_dir, f"eda_{table}_{ts}.json")
report_md_path = os.path.join(report_dir, f"eda_{table}_{ts}.md")
with open(report_json_path, "w", encoding="utf-8") as fh:
fh.write(json.dumps(prof, ensure_ascii=False, indent=1, default=str))
with open(report_md_path, "w", encoding="utf-8") as fh:
fh.write(render_eda_markdown(prof))
return {
"status": "ok",
"profile": prof,
"report_md_path": report_md_path,
"report_json_path": report_json_path,
}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
@@ -0,0 +1,83 @@
"""Tests para profile_table — pipeline EDA one-shot del grupo `eda`.
Crea una DuckDB temporal con tres columnas representativas:
- id_str: enteros guardados como VARCHAR ('10','20',...) -> debe promocionarse
a inferred_type "numeric" y recibir un bloque col["numeric"].
- precio: numerica nativa (DOUBLE).
- categoria: categorica textual.
Luego corre profile_table(write_report=False) y verifica el contrato.
"""
import os
import tempfile
import duckdb
from pipelines.profile_table import profile_table
def _make_db() -> str:
"""Crea una DuckDB temporal con la tabla de prueba y devuelve su path."""
tmp_dir = tempfile.mkdtemp(prefix="profile_table_test_")
db_path = os.path.join(tmp_dir, "t.duckdb")
con = duckdb.connect(db_path)
con.execute(
"CREATE TABLE items ("
" id_str VARCHAR," # enteros guardados como texto
" precio DOUBLE," # numerica nativa
" categoria VARCHAR" # categorica
")"
)
rows = [
("10", 9.5, "alfa"),
("20", 12.0, "beta"),
("30", 7.25, "alfa"),
("40", 15.75, "gamma"),
("50", 3.0, "beta"),
("60", 22.4, "alfa"),
]
con.executemany("INSERT INTO items VALUES (?, ?, ?)", rows)
con.close()
return db_path
def _col(profile: dict, name: str) -> dict:
return next(c for c in profile["columns"] if c["name"] == name)
def test_varchar_integer_promotes_to_numeric():
db_path = _make_db()
r = profile_table(db_path, "items", sample=5000, write_report=False)
# status ok y sin tocar disco.
assert r["status"] == "ok", r
assert r["report_md_path"] is None
assert r["report_json_path"] is None
prof = r["profile"]
# La columna VARCHAR-entera se promociono a numeric con bloque numeric.
id_col = _col(prof, "id_str")
assert id_col["inferred_type"] == "numeric", id_col["inferred_type"]
assert id_col["numeric"] is not None
assert id_col["numeric"]["min"] == 10.0
assert id_col["numeric"]["max"] == 60.0
# La numerica nativa sigue siendo numeric con su bloque.
precio_col = _col(prof, "precio")
assert precio_col["inferred_type"] == "numeric"
assert precio_col["numeric"] is not None
# La categorica recibe su bloque categorical.
cat_col = _col(prof, "categoria")
assert cat_col["inferred_type"] in ("categorical", "text")
assert cat_col["categorical"] is not None
assert cat_col["categorical"]["mode"] == "alfa"
# key_candidates es una lista; quality_score existe (tabla y columnas).
assert isinstance(prof["key_candidates"], list)
assert prof["quality_score"] is not None
assert id_col["quality_score"] is not None
# type_breakdown recalculado refleja la promocion (>=2 numeric).
assert prof["type_breakdown"]["numeric"] >= 2