feat: app script_navegador y dashboard Metabase

App Go para ejecutar scripts de navegación automatizada usando las
funciones CDP del registry. Incluye script de creación de dashboard
en Metabase para monitoreo.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-30 14:24:39 +02:00
parent 9ed0f2e16f
commit 3c250a9252
15 changed files with 1322 additions and 0 deletions
@@ -0,0 +1,195 @@
"""Crea un dashboard en Metabase para monitorear operations de script_navegador."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "python", "functions"))
from metabase.client import metabase_auth
from metabase import (
metabase_list_databases,
metabase_create_card,
metabase_create_dashboard,
metabase_update_dashboard,
metabase_list_dashboards,
)
from metabase.databases import metabase_add_database
# --- Config ---
METABASE_URL = "http://localhost:3000"
EMAIL = "admin@fnregistry.local"
PASSWORD = "FnRegistry2024!"
# Path de operations.db dentro del contenedor Docker
# Copiar con: docker exec metabase mkdir -p /data/ops-script-navegador && docker cp apps/script_navegador/operations.db metabase:/data/ops-script-navegador/operations.db
OPS_DB_PATH = "/data/ops-script-navegador/operations.db"
DB_NAME = "ops-script-navegador"
CARDS = [
# ---- Fila 0: KPIs (h=5) ----
{
"name": "Total Ejecuciones",
"display": "scalar",
"sql": "SELECT COUNT(*) AS total FROM executions;",
"size_x": 6, "size_y": 5, "col": 0, "row": 0,
},
{
"name": "Ejecuciones Exitosas",
"display": "scalar",
"sql": "SELECT COUNT(*) AS exitosas FROM executions WHERE status = 'success';",
"size_x": 6, "size_y": 5, "col": 6, "row": 0,
},
{
"name": "Ejecuciones Fallidas",
"display": "scalar",
"sql": "SELECT COUNT(*) AS fallidas FROM executions WHERE status = 'failure';",
"size_x": 6, "size_y": 5, "col": 12, "row": 0,
},
{
"name": "Duracion Promedio (ms)",
"display": "scalar",
"sql": "SELECT ROUND(AVG(duration_ms)) AS avg_ms FROM executions WHERE status = 'success';",
"size_x": 6, "size_y": 5, "col": 18, "row": 0,
},
# ---- Fila 5: Tendencias (h=8) ----
{
"name": "Ejecuciones por Estado",
"display": "pie",
"sql": "SELECT status, COUNT(*) AS cantidad FROM executions GROUP BY status;",
"size_x": 8, "size_y": 8, "col": 0, "row": 5,
},
{
"name": "Duracion por Ejecucion (timeline)",
"display": "line",
"sql": """
SELECT
started_at,
duration_ms,
status
FROM executions
ORDER BY started_at;
""",
"size_x": 16, "size_y": 8, "col": 8, "row": 5,
},
# ---- Fila 13: Detalle de pasos (h=9) ----
{
"name": "Pasos por Script (metricas)",
"display": "table",
"sql": """
SELECT
id,
status,
records_in AS pasos_total,
records_out AS pasos_exitosos,
duration_ms,
CASE WHEN error = '' THEN '-' ELSE error END AS error,
json_extract(metrics, '$.script_name') AS script,
started_at
FROM executions
ORDER BY started_at DESC
LIMIT 20;
""",
"size_x": 24, "size_y": 9, "col": 0, "row": 13,
},
# ---- Fila 22: Logs (h=9) ----
{
"name": "Logs Recientes",
"display": "table",
"sql": """
SELECT
level,
source,
message,
json_extract(metadata, '$.action') AS action,
json_extract(metadata, '$.elapsed_ms') AS elapsed_ms,
created_at
FROM logs
ORDER BY created_at DESC
LIMIT 50;
""",
"size_x": 24, "size_y": 9, "col": 0, "row": 22,
},
]
DASHBOARD_NAME = "script_navegador Operations"
def main():
print("Autenticando en Metabase...")
client = metabase_auth(METABASE_URL, EMAIL, PASSWORD)
# Buscar si ya existe la database
dbs = metabase_list_databases(client)
ops_db_id = None
for db in dbs:
if db.get("name") == DB_NAME:
ops_db_id = db["id"]
print(f" Database ya existe: {DB_NAME} (id={ops_db_id})")
break
if not ops_db_id:
print(f"Registrando {DB_NAME} como datasource SQLite ({OPS_DB_PATH})...")
new_db = metabase_add_database(
client=client,
name=DB_NAME,
engine="sqlite",
details={"db": OPS_DB_PATH},
)
ops_db_id = new_db["id"]
print(f" Database registrada: id={ops_db_id}")
# Eliminar dashboard existente si lo hay
existing = metabase_list_dashboards(client)
for d in existing:
if d.get("name") == DASHBOARD_NAME:
print(f" Dashboard ya existe (id={d['id']}), recreando...")
from metabase import metabase_delete_dashboard
metabase_delete_dashboard(client, d["id"])
# Crear cards
print("Creando cards...")
created_cards = []
for i, card_def in enumerate(CARDS):
card = metabase_create_card(
client,
name=card_def["name"],
dataset_query={
"database": ops_db_id,
"type": "native",
"native": {"query": card_def["sql"]},
},
display=card_def["display"],
description=f"script_navegador: {card_def['name']}",
)
created_cards.append((card, card_def))
print(f" [{i+1}/{len(CARDS)}] {card_def['name']} (id={card['id']})")
# Crear dashboard
print("Creando dashboard...")
dashboard = metabase_create_dashboard(
client,
name=DASHBOARD_NAME,
description="Monitoreo de ejecuciones de script_navegador: KPIs, tendencias, detalle de pasos y logs.",
)
dash_id = dashboard["id"]
print(f" Dashboard creado: id={dash_id}")
# Agregar cards al dashboard
dashcards = []
for idx, (card, card_def) in enumerate(created_cards):
dashcards.append({
"id": -(idx + 1),
"card_id": card["id"],
"size_x": card_def["size_x"],
"size_y": card_def["size_y"],
"col": card_def["col"],
"row": card_def["row"],
})
metabase_update_dashboard(client, dash_id, dashcards=dashcards)
print(f"\nDashboard listo: {METABASE_URL}/dashboard/{dash_id}")
client.close()
if __name__ == "__main__":
main()
+6
View File
@@ -0,0 +1,6 @@
operations.db
operations.db-wal
operations.db-shm
build/
*.exe
script_navegador
+100
View File
@@ -0,0 +1,100 @@
---
name: script_navegador
lang: go
domain: infra
description: "Ejecutor de scripts de navegador CDP sobre Chrome. Lee pasos desde YAML y los ejecuta en secuencia registrando cada resultado en operations.db."
tags: [cdp, chrome, browser, automation, yaml]
uses_functions:
- chrome_launch_go_infra
- cdp_connect_go_infra
- cdp_navigate_go_infra
- cdp_click_go_infra
- cdp_type_text_go_infra
- cdp_wait_element_go_infra
- cdp_evaluate_go_infra
- cdp_get_html_go_infra
- cdp_screenshot_go_infra
- cdp_close_go_infra
uses_types: []
framework: ""
entry_point: "main.go"
dir_path: "apps/script_navegador"
---
## Descripcion
CLI Go que lee un archivo YAML con pasos de navegacion CDP y los ejecuta sobre Chrome, registrando cada paso y su resultado en `operations.db`.
## Uso
```bash
# Conectarse a Chrome ya corriendo en puerto 9222
go run . --script examples/busqueda_google.yaml
# Lanzar Chrome nuevo (headless)
go run . --script examples/busqueda_google.yaml --launch --headless
# Puerto personalizado
go run . --script examples/busqueda_google.yaml --port 9333
```
## Formato del script YAML
```yaml
name: "nombre_del_script"
steps:
- action: navigate
url: "https://ejemplo.com"
- action: wait
selector: "#elemento"
timeout_ms: 5000 # opcional, default 10000
- action: click
selector: "#boton"
continue_on_error: true # opcional, default false
- action: type
selector: "input[name=q]" # hace click primero para enfocar
text: "texto a escribir"
- action: screenshot
path: "/tmp/captura.png"
full_page: false # opcional, default false
- action: evaluate
expr: "document.title"
- action: get_html
# sin parametros adicionales
- action: sleep
ms: 500 # pausa en milisegundos
```
## Acciones soportadas
| Accion | Parametros obligatorios | Parametros opcionales |
|-------------|-------------------------|-------------------------------|
| `navigate` | `url` | |
| `wait` | `selector` | `timeout_ms` (default 10000) |
| `click` | `selector` | `continue_on_error` |
| `type` | `selector`, `text` | `continue_on_error` |
| `screenshot`| `path` | `full_page`, `continue_on_error` |
| `evaluate` | `expr` | `continue_on_error` |
| `get_html` | — | `continue_on_error` |
| `sleep` | `ms` | |
## Registro en operations.db
- **Entity `script_run`**: una por ejecucion del script, con metadata del script y resultado final
- **Execution**: una por ejecucion, con `pipeline_id = "script_navegador"`, duration_ms, records_in=pasos totales, records_out=pasos exitosos
- **Logs**: un log por cada paso ejecutado con nivel info/error
## Notas
- Si Chrome no esta corriendo y no se pasa `--launch`, la conexion falla con error claro
- `continue_on_error: true` por paso permite continuar aunque ese paso falle
- Flag global `--abort-on-error` (default false) aborta todo el script al primer error
- Al terminar (exito o error), siempre se ejecuta `cdp_close` para limpiar recursos
- operations.db se inicializa automaticamente si no existe usando `fn ops init`
@@ -0,0 +1,28 @@
name: "busqueda_google"
steps:
- action: navigate
url: "https://www.google.com"
- action: wait
selector: "textarea[name=q]"
timeout_ms: 8000
- action: type
selector: "textarea[name=q]"
text: "golang cdp automation"
- action: screenshot
path: "/tmp/busqueda_antes.png"
- action: evaluate
expr: "document.title"
- action: sleep
ms: 500
- action: evaluate
expr: "document.querySelector('textarea[name=q]').value"
- action: screenshot
path: "/tmp/busqueda_despues.png"
full_page: false
@@ -0,0 +1,20 @@
name: "demo_continue_on_error"
steps:
- action: navigate
url: "https://example.com"
- action: wait
selector: "h1"
timeout_ms: 5000
# Este paso fallara porque el selector no existe, pero el script continua
- action: click
selector: "#boton-que-no-existe"
continue_on_error: true
# Este paso se ejecuta aunque el anterior fallo
- action: evaluate
expr: "document.title"
- action: screenshot
path: "/tmp/continue_on_error.png"
@@ -0,0 +1,16 @@
name: "scrape_titulo"
steps:
- action: navigate
url: "https://example.com"
- action: wait
selector: "h1"
timeout_ms: 5000
- action: evaluate
expr: "document.querySelector('h1').textContent"
- action: get_html
- action: screenshot
path: "/tmp/example_com.png"
@@ -0,0 +1,6 @@
name: "navegar_youtube"
steps:
- action: navigate
url: "https://www.youtube.com"
- action: screenshot
path: "/tmp/youtube.png"
+12
View File
@@ -0,0 +1,12 @@
module script-navegador
go 1.24.2
require (
fn-registry v0.0.0
gopkg.in/yaml.v3 v3.0.1
)
require github.com/mattn/go-sqlite3 v1.14.37 // indirect
replace fn-registry => /home/lucas/fn_registry
+6
View File
@@ -0,0 +1,6 @@
github.com/mattn/go-sqlite3 v1.14.37 h1:3DOZp4cXis1cUIpCfXLtmlGolNLp2VEqhiB/PARNBIg=
github.com/mattn/go-sqlite3 v1.14.37/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+20
View File
@@ -0,0 +1,20 @@
package main
import (
"crypto/rand"
"fmt"
)
// generateID genera un UUID v4 simple sin dependencias externas.
func generateID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
// Fallback con timestamp si rand falla (muy improbable)
return fmt.Sprintf("fallback-%x", b)
}
// Ajustar bits para UUID v4
b[6] = (b[6] & 0x0f) | 0x40
b[8] = (b[8] & 0x3f) | 0x80
return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x",
b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
}
+214
View File
@@ -0,0 +1,214 @@
package main
import (
"flag"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"fn-registry/functions/infra"
)
func main() {
// Flags
scriptPath := flag.String("script", "", "Ruta al archivo YAML con el script de navegacion (obligatorio)")
port := flag.Int("port", 9222, "Puerto CDP de Chrome")
launch := flag.Bool("launch", false, "Lanzar Chrome nuevo en vez de conectarse a uno existente")
headless := flag.Bool("headless", false, "Lanzar Chrome en modo headless (requiere --launch)")
chromePath := flag.String("chrome-path", "", "Ruta al ejecutable de Chrome (ej: '/mnt/c/Program Files/Google/Chrome/Application/chrome.exe')")
userDataDir := flag.String("user-data-dir", "", "Directorio de perfil de Chrome (path WSL, se convierte a Windows automaticamente)")
keepOpen := flag.Bool("keep-open", false, "No cerrar Chrome al terminar")
abortOnError := flag.Bool("abort-on-error", false, "Abortar el script al primer error en cualquier paso")
flag.Parse()
if *scriptPath == "" {
fmt.Fprintln(os.Stderr, "error: --script es obligatorio")
flag.Usage()
os.Exit(1)
}
if err := run(*scriptPath, *port, *launch, *headless, *abortOnError, *userDataDir, *keepOpen, *chromePath); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
func run(scriptPath string, port int, launch, headless, abortOnError bool, userDataDir string, keepOpen bool, chromePath string) error {
// 1. Cargar y validar el script YAML
script, err := LoadScript(scriptPath)
if err != nil {
return fmt.Errorf("cargar script: %w", err)
}
fmt.Printf("[script_navegador] script: %q (%d pasos)\n", script.Name, len(script.Steps))
// 2. Inicializar operations.db
appDir, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
// Fallback al directorio de trabajo
appDir, _ = os.Getwd()
}
// Si estamos corriendo con `go run .`, os.Args[0] es un tmp, usar cwd
if cwd, e := os.Getwd(); e == nil {
if _, e2 := os.Stat(filepath.Join(cwd, "app.md")); e2 == nil {
appDir = cwd
}
}
db, err := initOpsDB(appDir)
if err != nil {
// No es fatal: seguir sin operations.db, solo logear
fmt.Fprintf(os.Stderr, "[ops] aviso: no se pudo inicializar operations.db: %v\n", err)
}
if db != nil {
defer db.Close()
}
// 3. Lanzar Chrome o conectarse al existente
var pid int
if launch {
// Convertir path WSL a Windows para chrome.exe
// Si empieza con / es un path Linux (WSL), convertir. Si empieza con letra:\ ya es Windows.
winDataDir := userDataDir
if winDataDir != "" && strings.HasPrefix(winDataDir, "/") {
out, err := exec.Command("wslpath", "-w", winDataDir).Output()
if err == nil {
winDataDir = strings.TrimSpace(string(out))
}
}
fmt.Printf("[chrome] lanzando Chrome en puerto %d (headless=%v, user-data-dir=%q)...\n", port, headless, winDataDir)
pid, err = infra.ChromeLaunch(infra.ChromeLaunchOpts{
Port: port,
Headless: headless,
UserDataDir: winDataDir,
ChromePath: chromePath,
})
if err != nil {
return fmt.Errorf("lanzar Chrome: %w", err)
}
fmt.Printf("[chrome] Chrome lanzado (pid=%d)\n", pid)
} else {
fmt.Printf("[chrome] conectando a Chrome en localhost:%d...\n", port)
}
// 4. Conectar CDP (con mirrored networking, localhost es compartido WSL<->Windows)
fmt.Printf("[cdp] conectando a localhost:%d...\n", port)
conn, err := infra.CdpConnect(port)
if err != nil {
// Si lanzamos Chrome, matar el proceso antes de salir
if pid > 0 {
_ = infra.CdpClose(nil, pid)
}
return fmt.Errorf("conectar CDP en localhost:%d: %w", port, err)
}
fmt.Printf("[cdp] conexion establecida\n")
// Asegurar cierre al salir (respetar --keep-open)
defer func() {
if keepOpen {
fmt.Printf("[cdp] cerrando conexion CDP (Chrome sigue abierto, pid=%d, puerto=%d)\n", pid, port)
// Solo cerrar la conexion WebSocket, no matar Chrome
if err := infra.CdpClose(conn, 0); err != nil {
fmt.Fprintf(os.Stderr, "[cdp] aviso al cerrar conexion: %v\n", err)
}
} else {
fmt.Printf("[cdp] cerrando conexion y limpiando recursos...\n")
if err := infra.CdpClose(conn, pid); err != nil {
fmt.Fprintf(os.Stderr, "[cdp] aviso al cerrar: %v\n", err)
}
}
}()
// 5. Registrar entities y relations en operations.db
var relationID string
if db != nil {
_, _, _, err := EnsureEntities(db, port, chromePath, userDataDir, script.Name, scriptPath)
if err != nil {
fmt.Fprintf(os.Stderr, "[ops] aviso: no se pudieron crear entities: %v\n", err)
} else {
fmt.Printf("[ops] entities registradas\n")
}
relationID, err = EnsureRelations(db, "chrome_instance", "cdp_session", fmt.Sprintf("script_%s", script.Name))
if err != nil {
fmt.Fprintf(os.Stderr, "[ops] aviso: no se pudieron crear relations: %v\n", err)
} else {
fmt.Printf("[ops] relations registradas\n")
}
}
// 6. Ejecutar el script
runner := NewRunner(conn, RunnerOpts{AbortOnError: abortOnError})
startedAt := time.Now()
fmt.Printf("[run] iniciando ejecucion: %s\n", startedAt.Format(time.RFC3339))
results, runErr := runner.Run(script)
endedAt := time.Now()
// 7. Imprimir resumen de pasos
printSummary(script, results, runErr, startedAt, endedAt)
// 8. Registrar execution y actualizar relation en operations.db
if db != nil {
execID, err := RecordRun(db, script, relationID, results, runErr, startedAt, endedAt)
if err != nil {
fmt.Fprintf(os.Stderr, "[ops] aviso: no se pudo registrar ejecucion: %v\n", err)
} else {
fmt.Printf("[ops] ejecucion registrada en operations.db (id=%s)\n", execID[:8])
}
// Registrar cada paso como log
for _, r := range results {
if logErr := LogStep(db, execID, r); logErr != nil {
fmt.Fprintf(os.Stderr, "[ops] aviso: no se pudo registrar log step[%d]: %v\n", r.Index, logErr)
}
}
// Actualizar relation status
if relationID != "" {
UpdateRelationAfterRun(db, relationID, runErr)
}
}
if runErr != nil {
return runErr
}
return nil
}
// printSummary imprime un resumen legible de la ejecucion.
func printSummary(script *Script, results []StepResult, runErr error, startedAt, endedAt time.Time) {
duration := endedAt.Sub(startedAt)
success := 0
for _, r := range results {
if r.Err == nil {
success++
}
}
fmt.Printf("\n--- Resumen: %q ---\n", script.Name)
fmt.Printf("Duracion: %v\n", duration.Round(time.Millisecond))
fmt.Printf("Pasos: %d/%d exitosos\n", success, len(results))
fmt.Println()
for _, r := range results {
status := "ok"
detail := ""
if r.Err != nil {
status = "ERROR"
detail = fmt.Sprintf(" -> %v", r.Err)
} else if r.Output != "" {
detail = fmt.Sprintf(" -> %q", r.Output)
}
fmt.Printf(" [%d] %-12s %s (%dms)%s\n",
r.Index, r.Action, status, r.Elapsed.Milliseconds(), detail)
}
if runErr != nil {
fmt.Printf("\nAbortado: %v\n", runErr)
} else {
fmt.Printf("\nScript completado.\n")
}
}
+333
View File
@@ -0,0 +1,333 @@
package main
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"time"
fn_operations "fn-registry/fn_operations"
)
const opsDBName = "operations.db"
// initOpsDB inicializa o abre operations.db en el directorio de la app.
func initOpsDB(appDir string) (*fn_operations.DB, error) {
dbPath := filepath.Join(appDir, opsDBName)
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
if err := bootstrapOpsDB(appDir, dbPath); err != nil {
return nil, fmt.Errorf("inicializar operations.db: %w", err)
}
}
db, err := fn_operations.Open(dbPath)
if err != nil {
return nil, fmt.Errorf("abrir operations.db: %w", err)
}
return db, nil
}
// bootstrapOpsDB intenta crear operations.db usando el CLI fn o directamente.
func bootstrapOpsDB(appDir, dbPath string) error {
registryRoot := os.Getenv("FN_REGISTRY_ROOT")
if registryRoot == "" {
registryRoot = filepath.Join(appDir, "..", "..")
}
fnBin := filepath.Join(registryRoot, "fn")
if _, err := os.Stat(fnBin); err == nil {
cmd := exec.Command(fnBin, "ops", "init", appDir)
cmd.Env = append(os.Environ(), "FN_REGISTRY_ROOT="+registryRoot)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("fn ops init: %w\n%s", err, out)
}
return nil
}
db, err := fn_operations.Open(dbPath)
if err != nil {
return fmt.Errorf("crear operations.db directamente: %w", err)
}
return db.Close()
}
// --- Entities ---
// EnsureEntities crea o actualiza las entities del pipeline de navegacion.
// Entities:
// - chrome_instance: la instancia de Chrome con CDP
// - cdp_session: la sesion CDP activa
// - script_file: el archivo YAML del script
func EnsureEntities(db *fn_operations.DB, port int, chromePath, userDataDir, scriptName, scriptPath string) (chromeID, cdpID, scriptID string, err error) {
now := time.Now()
chromeID = "chrome_instance"
cdpID = "cdp_session"
scriptID = fmt.Sprintf("script_%s", scriptName)
// Chrome instance
existing, _ := db.GetEntity(chromeID)
if existing == nil {
err = db.InsertEntity(&fn_operations.Entity{
ID: chromeID,
Name: "Chrome Windows",
TypeRef: "chrome_instance",
Status: fn_operations.StatusActive,
Description: "Instancia de Chrome con remote debugging habilitado",
Domain: "infra",
Tags: []string{"chrome", "cdp", "windows"},
Source: "script_navegador",
Metadata: map[string]any{
"port": port,
"chrome_path": chromePath,
"user_data_dir": userDataDir,
},
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
return "", "", "", fmt.Errorf("insertar entity chrome_instance: %w", err)
}
} else if existing.Status != fn_operations.StatusActive {
existing.Status = fn_operations.StatusActive
existing.UpdatedAt = now
db.UpdateEntity(existing)
}
// CDP session
existing, _ = db.GetEntity(cdpID)
if existing == nil {
err = db.InsertEntity(&fn_operations.Entity{
ID: cdpID,
Name: "CDP Session",
TypeRef: "cdp_session",
Status: fn_operations.StatusActive,
Description: "Sesion CDP WebSocket activa contra Chrome",
Domain: "infra",
Tags: []string{"cdp", "websocket"},
Source: "script_navegador",
Metadata: map[string]any{
"port": port,
"protocol": "CDP 1.3",
},
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
return "", "", "", fmt.Errorf("insertar entity cdp_session: %w", err)
}
} else if existing.Status != fn_operations.StatusActive {
existing.Status = fn_operations.StatusActive
existing.UpdatedAt = now
db.UpdateEntity(existing)
}
// Script
existing, _ = db.GetEntity(scriptID)
if existing == nil {
err = db.InsertEntity(&fn_operations.Entity{
ID: scriptID,
Name: scriptName,
TypeRef: "nav_script",
Status: fn_operations.StatusActive,
Description: fmt.Sprintf("Script de navegacion: %s", scriptName),
Domain: "automation",
Tags: []string{"script", "yaml", "navegacion"},
Source: scriptPath,
Metadata: map[string]any{
"script_name": scriptName,
"file_path": scriptPath,
},
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
return "", "", "", fmt.Errorf("insertar entity script: %w", err)
}
}
return chromeID, cdpID, scriptID, nil
}
// --- Relations ---
// EnsureRelations crea las relaciones entre entities si no existen.
// Relations:
// - chrome_to_cdp: Chrome -> CDP Session (via chrome_launch + cdp_connect)
// - cdp_to_script: CDP Session -> Script (via runner)
func EnsureRelations(db *fn_operations.DB, chromeID, cdpID, scriptID string) (string, error) {
now := time.Now()
// chrome -> cdp
chromeToCDP := "chrome_to_cdp"
existing, _ := db.GetRelation(chromeToCDP)
if existing == nil {
err := db.InsertRelation(&fn_operations.Relation{
ID: chromeToCDP,
Name: "chrome_to_cdp",
FromEntity: chromeID,
ToEntity: cdpID,
Via: "cdp_connect_go_infra",
Description: "Chrome expone CDP, la app se conecta via WebSocket",
Purity: "impure",
Direction: fn_operations.DirUnidirectional,
Status: fn_operations.RelImplemented,
Tags: []string{"cdp", "websocket"},
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
return "", fmt.Errorf("insertar relation chrome_to_cdp: %w", err)
}
}
// cdp -> script execution
cdpToScript := fmt.Sprintf("cdp_to_%s", scriptID)
existing, _ = db.GetRelation(cdpToScript)
if existing == nil {
startedAt := now
err := db.InsertRelation(&fn_operations.Relation{
ID: cdpToScript,
Name: cdpToScript,
FromEntity: cdpID,
ToEntity: scriptID,
Via: "script_navegador_runner",
Description: fmt.Sprintf("CDP ejecuta pasos del script %s", scriptID),
Purity: "impure",
Direction: fn_operations.DirUnidirectional,
Status: fn_operations.RelRunning,
StartedAt: &startedAt,
Tags: []string{"automation", "pipeline"},
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
return "", fmt.Errorf("insertar relation cdp_to_script: %w", err)
}
} else {
existing.Status = fn_operations.RelRunning
existing.UpdatedAt = now
db.UpdateRelation(existing)
}
return cdpToScript, nil
}
// UpdateRelationAfterRun actualiza el status de la relation segun el resultado.
func UpdateRelationAfterRun(db *fn_operations.DB, relationID string, runErr error) {
rel, err := db.GetRelation(relationID)
if err != nil || rel == nil {
return
}
if runErr != nil {
rel.Status = fn_operations.RelImplemented
} else {
rel.Status = fn_operations.RelTested
}
now := time.Now()
rel.EndedAt = &now
rel.UpdatedAt = now
db.UpdateRelation(rel)
}
// --- Executions ---
// RecordRun registra una ejecucion completa del script en operations.db.
func RecordRun(db *fn_operations.DB, script *Script, relationID string, results []StepResult, runErr error, startedAt, endedAt time.Time) (string, error) {
totalSteps := int64(len(results))
successSteps := int64(0)
for _, r := range results {
if r.Err == nil {
successSteps++
}
}
status := fn_operations.ExecSuccess
errMsg := ""
if runErr != nil {
status = fn_operations.ExecFailure
errMsg = runErr.Error()
} else if successSteps < totalSteps {
status = fn_operations.ExecPartial
}
durationMs := endedAt.Sub(startedAt).Milliseconds()
stepSummary := make([]map[string]any, 0, len(results))
for _, r := range results {
entry := map[string]any{
"index": r.Index,
"action": r.Action,
"elapsed_ms": r.Elapsed.Milliseconds(),
"ok": r.Err == nil,
}
if r.Output != "" {
entry["output"] = r.Output
}
if r.Err != nil {
entry["error"] = r.Err.Error()
}
stepSummary = append(stepSummary, entry)
}
execID := generateID()
execution := &fn_operations.Execution{
ID: execID,
PipelineID: "script_navegador",
RelationID: relationID,
Status: status,
StartedAt: startedAt,
EndedAt: &endedAt,
DurationMs: &durationMs,
RecordsIn: &totalSteps,
RecordsOut: &successSteps,
Error: errMsg,
Metrics: map[string]any{
"script_name": script.Name,
"total_steps": totalSteps,
"success_steps": successSteps,
"steps": stepSummary,
},
}
if err := db.InsertExecution(execution); err != nil {
return "", fmt.Errorf("insertar execution: %w", err)
}
return execID, nil
}
// --- Logs ---
// LogStep registra un paso individual como log en operations.db.
func LogStep(db *fn_operations.DB, execID string, res StepResult) error {
level := fn_operations.LogInfo
msg := fmt.Sprintf("step[%d] %s: ok", res.Index, res.Action)
if res.Err != nil {
level = fn_operations.LogError
msg = fmt.Sprintf("step[%d] %s: %v", res.Index, res.Action, res.Err)
}
meta := map[string]any{
"action": res.Action,
"elapsed_ms": res.Elapsed.Milliseconds(),
}
if res.Output != "" {
meta["output"] = res.Output
}
log := &fn_operations.Log{
ID: generateID(),
Level: level,
Source: "script_navegador",
ExecutionID: execID,
Message: msg,
Metadata: meta,
}
return db.InsertLog(log)
}
+143
View File
@@ -0,0 +1,143 @@
package main
import (
"fmt"
"time"
"fn-registry/functions/infra"
)
// StepResult es el resultado de ejecutar un paso.
type StepResult struct {
Index int
Action string
Output string // resultado de evaluate/get_html, path de screenshot, etc.
Err error
Elapsed time.Duration
}
// RunnerOpts configura la ejecucion del runner.
type RunnerOpts struct {
AbortOnError bool
}
// Runner ejecuta los pasos de un Script sobre una conexion CDP activa.
type Runner struct {
conn *infra.CDPConn
opts RunnerOpts
}
// NewRunner crea un Runner con la conexion CDP dada.
func NewRunner(conn *infra.CDPConn, opts RunnerOpts) *Runner {
return &Runner{conn: conn, opts: opts}
}
// Run ejecuta todos los pasos del script y retorna los resultados de cada paso.
// Siempre retorna todos los resultados procesados hasta el momento, incluso si aborta.
func (r *Runner) Run(script *Script) ([]StepResult, error) {
results := make([]StepResult, 0, len(script.Steps))
for i, step := range script.Steps {
start := time.Now()
output, err := r.runStep(step)
elapsed := time.Since(start)
res := StepResult{
Index: i,
Action: step.Action,
Output: output,
Err: err,
Elapsed: elapsed,
}
results = append(results, res)
if err != nil {
if step.ContinueOnError {
// Continuar con el siguiente paso aunque este fallo
continue
}
if r.opts.AbortOnError {
return results, fmt.Errorf("step[%d] %s: %w", i, step.Action, err)
}
// Por defecto: abortar si el paso fallo y no tiene continue_on_error
return results, fmt.Errorf("step[%d] %s: %w", i, step.Action, err)
}
}
return results, nil
}
// runStep ejecuta un paso individual y retorna su output y error.
func (r *Runner) runStep(step Step) (string, error) {
switch step.Action {
case "navigate":
if err := infra.CdpNavigate(r.conn, step.URL); err != nil {
return "", err
}
// Esperar a que la página cargue completamente
timeout := time.Duration(step.TimeoutMs) * time.Millisecond
if timeout <= 0 {
timeout = 15 * time.Second
}
return "", infra.CdpWaitLoad(r.conn, timeout)
case "wait_load":
timeout := time.Duration(step.TimeoutMs) * time.Millisecond
if timeout <= 0 {
timeout = 15 * time.Second
}
return "", infra.CdpWaitLoad(r.conn, timeout)
case "wait":
timeout := time.Duration(step.TimeoutMs) * time.Millisecond
if timeout <= 0 {
timeout = 10 * time.Second
}
return "", infra.CdpWaitElement(r.conn, step.Selector, timeout)
case "click":
return "", infra.CdpClick(r.conn, step.Selector)
case "type":
// Hacer click primero para enfocar el elemento
if err := infra.CdpClick(r.conn, step.Selector); err != nil {
return "", fmt.Errorf("enfocar elemento para type: %w", err)
}
return "", infra.CdpTypeText(r.conn, step.Text)
case "screenshot":
opts := infra.CdpScreenshotOpts{
FullPage: step.FullPage,
Format: "png",
}
if err := infra.CdpScreenshot(r.conn, step.Path, opts); err != nil {
return "", err
}
return step.Path, nil
case "evaluate":
result, err := infra.CdpEvaluate(r.conn, step.Expr)
if err != nil {
return "", err
}
return result, nil
case "get_html":
html, err := infra.CdpGetHTML(r.conn)
if err != nil {
return "", err
}
// Truncar para el log (el HTML puede ser muy largo)
if len(html) > 200 {
return html[:200] + "...", nil
}
return html, nil
case "sleep":
time.Sleep(time.Duration(step.Ms) * time.Millisecond)
return fmt.Sprintf("slept %dms", step.Ms), nil
default:
return "", fmt.Errorf("accion desconocida: %q", step.Action)
}
}
+121
View File
@@ -0,0 +1,121 @@
package main
import (
"fmt"
"os"
"gopkg.in/yaml.v3"
)
// Script representa un archivo YAML de pasos de navegacion.
type Script struct {
Name string `yaml:"name"`
Steps []Step `yaml:"steps"`
}
// Step es un paso individual dentro del script.
type Step struct {
// Comun a todos los pasos
Action string `yaml:"action"`
ContinueOnError bool `yaml:"continue_on_error"`
// navigate
URL string `yaml:"url"`
// wait
Selector string `yaml:"selector"`
TimeoutMs int `yaml:"timeout_ms"`
// type
Text string `yaml:"text"`
// screenshot
Path string `yaml:"path"`
FullPage bool `yaml:"full_page"`
// evaluate
Expr string `yaml:"expr"`
// sleep
Ms int `yaml:"ms"`
}
// Validate comprueba que el script tiene los campos minimos correctos.
func (s *Script) Validate() error {
if s.Name == "" {
return fmt.Errorf("script: campo 'name' obligatorio")
}
if len(s.Steps) == 0 {
return fmt.Errorf("script %q: sin pasos definidos", s.Name)
}
for i, step := range s.Steps {
if err := step.Validate(i); err != nil {
return err
}
}
return nil
}
// Validate comprueba que el paso tiene los campos requeridos segun su action.
func (s *Step) Validate(idx int) error {
prefix := fmt.Sprintf("step[%d] action=%q", idx, s.Action)
switch s.Action {
case "navigate":
if s.URL == "" {
return fmt.Errorf("%s: campo 'url' obligatorio", prefix)
}
case "wait":
if s.Selector == "" {
return fmt.Errorf("%s: campo 'selector' obligatorio", prefix)
}
case "click":
if s.Selector == "" {
return fmt.Errorf("%s: campo 'selector' obligatorio", prefix)
}
case "type":
if s.Selector == "" {
return fmt.Errorf("%s: campo 'selector' obligatorio", prefix)
}
if s.Text == "" {
return fmt.Errorf("%s: campo 'text' obligatorio", prefix)
}
case "screenshot":
if s.Path == "" {
return fmt.Errorf("%s: campo 'path' obligatorio", prefix)
}
case "evaluate":
if s.Expr == "" {
return fmt.Errorf("%s: campo 'expr' obligatorio", prefix)
}
case "get_html":
// sin parametros requeridos
case "wait_load":
// sin parametros requeridos (timeout_ms opcional)
case "sleep":
if s.Ms <= 0 {
return fmt.Errorf("%s: campo 'ms' debe ser mayor que 0", prefix)
}
default:
return fmt.Errorf("%s: accion desconocida (navigate|wait|wait_load|click|type|screenshot|evaluate|get_html|sleep)", prefix)
}
return nil
}
// LoadScript lee y parsea un archivo YAML de script de navegador.
func LoadScript(path string) (*Script, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("leer script %q: %w", path, err)
}
var s Script
if err := yaml.Unmarshal(data, &s); err != nil {
return nil, fmt.Errorf("parsear script %q: %w", path, err)
}
if err := s.Validate(); err != nil {
return nil, err
}
return &s, nil
}
+102
View File
@@ -0,0 +1,102 @@
package main
import (
"fmt"
"io"
"net"
"os"
"strings"
"time"
)
// getWindowsHostIP obtiene la IP del host Windows desde WSL2.
// Lee /etc/resolv.conf que WSL2 configura con la IP del host.
func getWindowsHostIP() string {
data, err := os.ReadFile("/etc/resolv.conf")
if err != nil {
return ""
}
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "nameserver ") {
ip := strings.TrimPrefix(line, "nameserver ")
ip = strings.TrimSpace(ip)
if ip != "" {
return ip
}
}
}
return ""
}
// getWindowsGatewayIP obtiene la IP del gateway (host Windows) desde la tabla de rutas.
func getWindowsGatewayIP() string {
data, err := os.ReadFile("/proc/net/route")
if err != nil {
return ""
}
for _, line := range strings.Split(string(data), "\n") {
fields := strings.Fields(line)
if len(fields) >= 3 && fields[1] == "00000000" { // default route
hexIP := fields[2]
if len(hexIP) == 8 {
// /proc/net/route stores IPs as little-endian 32-bit hex
// "011017AC" -> bytes [01,10,17,AC] -> IP 172.23.16.1 (reversed)
var a, b, c, d uint8
fmt.Sscanf(hexIP[0:2], "%02x", &a)
fmt.Sscanf(hexIP[2:4], "%02x", &b)
fmt.Sscanf(hexIP[4:6], "%02x", &c)
fmt.Sscanf(hexIP[6:8], "%02x", &d)
return fmt.Sprintf("%d.%d.%d.%d", d, c, b, a)
}
}
}
return ""
}
// waitForCDP espera a que el puerto CDP esté accesible desde WSL.
func waitForCDP(host string, port int, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
addr := fmt.Sprintf("%s:%d", host, port)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, 300*time.Millisecond)
if err == nil {
conn.Close()
return nil
}
time.Sleep(300 * time.Millisecond)
}
return fmt.Errorf("CDP %s no disponible despues de %s", addr, timeout)
}
// startCDPProxy levanta un proxy TCP local que reenvía conexiones al host Windows.
// Chrome CDP solo acepta conexiones desde localhost, así que el proxy en WSL
// conecta al host Windows vía portproxy/netsh y expone el puerto localmente.
// Retorna el puerto local del proxy y una función para cerrarlo.
func startCDPProxy(windowsHost string, remotePort, localPort int) (net.Listener, error) {
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", localPort))
if err != nil {
return nil, fmt.Errorf("proxy listen: %w", err)
}
go func() {
for {
client, err := ln.Accept()
if err != nil {
return // listener cerrado
}
go proxyConn(client, windowsHost, remotePort)
}
}()
return ln, nil
}
func proxyConn(client net.Conn, host string, port int) {
defer client.Close()
remote, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), 5*time.Second)
if err != nil {
return
}
defer remote.Close()
go io.Copy(remote, client)
io.Copy(client, remote)
}