3c250a9252
App Go para ejecutar scripts de navegación automatizada usando las funciones CDP del registry. Incluye script de creación de dashboard en Metabase para monitoreo. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
334 lines
9.0 KiB
Go
334 lines
9.0 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
fn_operations "fn-registry/fn_operations"
|
|
)
|
|
|
|
const opsDBName = "operations.db"
|
|
|
|
// initOpsDB inicializa o abre operations.db en el directorio de la app.
|
|
func initOpsDB(appDir string) (*fn_operations.DB, error) {
|
|
dbPath := filepath.Join(appDir, opsDBName)
|
|
|
|
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
|
|
if err := bootstrapOpsDB(appDir, dbPath); err != nil {
|
|
return nil, fmt.Errorf("inicializar operations.db: %w", err)
|
|
}
|
|
}
|
|
|
|
db, err := fn_operations.Open(dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("abrir operations.db: %w", err)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// bootstrapOpsDB intenta crear operations.db usando el CLI fn o directamente.
|
|
func bootstrapOpsDB(appDir, dbPath string) error {
|
|
registryRoot := os.Getenv("FN_REGISTRY_ROOT")
|
|
if registryRoot == "" {
|
|
registryRoot = filepath.Join(appDir, "..", "..")
|
|
}
|
|
|
|
fnBin := filepath.Join(registryRoot, "fn")
|
|
if _, err := os.Stat(fnBin); err == nil {
|
|
cmd := exec.Command(fnBin, "ops", "init", appDir)
|
|
cmd.Env = append(os.Environ(), "FN_REGISTRY_ROOT="+registryRoot)
|
|
out, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("fn ops init: %w\n%s", err, out)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
db, err := fn_operations.Open(dbPath)
|
|
if err != nil {
|
|
return fmt.Errorf("crear operations.db directamente: %w", err)
|
|
}
|
|
return db.Close()
|
|
}
|
|
|
|
// --- Entities ---
|
|
|
|
// EnsureEntities crea o actualiza las entities del pipeline de navegacion.
|
|
// Entities:
|
|
// - chrome_instance: la instancia de Chrome con CDP
|
|
// - cdp_session: la sesion CDP activa
|
|
// - script_file: el archivo YAML del script
|
|
func EnsureEntities(db *fn_operations.DB, port int, chromePath, userDataDir, scriptName, scriptPath string) (chromeID, cdpID, scriptID string, err error) {
|
|
now := time.Now()
|
|
|
|
chromeID = "chrome_instance"
|
|
cdpID = "cdp_session"
|
|
scriptID = fmt.Sprintf("script_%s", scriptName)
|
|
|
|
// Chrome instance
|
|
existing, _ := db.GetEntity(chromeID)
|
|
if existing == nil {
|
|
err = db.InsertEntity(&fn_operations.Entity{
|
|
ID: chromeID,
|
|
Name: "Chrome Windows",
|
|
TypeRef: "chrome_instance",
|
|
Status: fn_operations.StatusActive,
|
|
Description: "Instancia de Chrome con remote debugging habilitado",
|
|
Domain: "infra",
|
|
Tags: []string{"chrome", "cdp", "windows"},
|
|
Source: "script_navegador",
|
|
Metadata: map[string]any{
|
|
"port": port,
|
|
"chrome_path": chromePath,
|
|
"user_data_dir": userDataDir,
|
|
},
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
return "", "", "", fmt.Errorf("insertar entity chrome_instance: %w", err)
|
|
}
|
|
} else if existing.Status != fn_operations.StatusActive {
|
|
existing.Status = fn_operations.StatusActive
|
|
existing.UpdatedAt = now
|
|
db.UpdateEntity(existing)
|
|
}
|
|
|
|
// CDP session
|
|
existing, _ = db.GetEntity(cdpID)
|
|
if existing == nil {
|
|
err = db.InsertEntity(&fn_operations.Entity{
|
|
ID: cdpID,
|
|
Name: "CDP Session",
|
|
TypeRef: "cdp_session",
|
|
Status: fn_operations.StatusActive,
|
|
Description: "Sesion CDP WebSocket activa contra Chrome",
|
|
Domain: "infra",
|
|
Tags: []string{"cdp", "websocket"},
|
|
Source: "script_navegador",
|
|
Metadata: map[string]any{
|
|
"port": port,
|
|
"protocol": "CDP 1.3",
|
|
},
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
return "", "", "", fmt.Errorf("insertar entity cdp_session: %w", err)
|
|
}
|
|
} else if existing.Status != fn_operations.StatusActive {
|
|
existing.Status = fn_operations.StatusActive
|
|
existing.UpdatedAt = now
|
|
db.UpdateEntity(existing)
|
|
}
|
|
|
|
// Script
|
|
existing, _ = db.GetEntity(scriptID)
|
|
if existing == nil {
|
|
err = db.InsertEntity(&fn_operations.Entity{
|
|
ID: scriptID,
|
|
Name: scriptName,
|
|
TypeRef: "nav_script",
|
|
Status: fn_operations.StatusActive,
|
|
Description: fmt.Sprintf("Script de navegacion: %s", scriptName),
|
|
Domain: "automation",
|
|
Tags: []string{"script", "yaml", "navegacion"},
|
|
Source: scriptPath,
|
|
Metadata: map[string]any{
|
|
"script_name": scriptName,
|
|
"file_path": scriptPath,
|
|
},
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
return "", "", "", fmt.Errorf("insertar entity script: %w", err)
|
|
}
|
|
}
|
|
|
|
return chromeID, cdpID, scriptID, nil
|
|
}
|
|
|
|
// --- Relations ---
|
|
|
|
// EnsureRelations crea las relaciones entre entities si no existen.
|
|
// Relations:
|
|
// - chrome_to_cdp: Chrome -> CDP Session (via chrome_launch + cdp_connect)
|
|
// - cdp_to_script: CDP Session -> Script (via runner)
|
|
func EnsureRelations(db *fn_operations.DB, chromeID, cdpID, scriptID string) (string, error) {
|
|
now := time.Now()
|
|
|
|
// chrome -> cdp
|
|
chromeToCDP := "chrome_to_cdp"
|
|
existing, _ := db.GetRelation(chromeToCDP)
|
|
if existing == nil {
|
|
err := db.InsertRelation(&fn_operations.Relation{
|
|
ID: chromeToCDP,
|
|
Name: "chrome_to_cdp",
|
|
FromEntity: chromeID,
|
|
ToEntity: cdpID,
|
|
Via: "cdp_connect_go_infra",
|
|
Description: "Chrome expone CDP, la app se conecta via WebSocket",
|
|
Purity: "impure",
|
|
Direction: fn_operations.DirUnidirectional,
|
|
Status: fn_operations.RelImplemented,
|
|
Tags: []string{"cdp", "websocket"},
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("insertar relation chrome_to_cdp: %w", err)
|
|
}
|
|
}
|
|
|
|
// cdp -> script execution
|
|
cdpToScript := fmt.Sprintf("cdp_to_%s", scriptID)
|
|
existing, _ = db.GetRelation(cdpToScript)
|
|
if existing == nil {
|
|
startedAt := now
|
|
err := db.InsertRelation(&fn_operations.Relation{
|
|
ID: cdpToScript,
|
|
Name: cdpToScript,
|
|
FromEntity: cdpID,
|
|
ToEntity: scriptID,
|
|
Via: "script_navegador_runner",
|
|
Description: fmt.Sprintf("CDP ejecuta pasos del script %s", scriptID),
|
|
Purity: "impure",
|
|
Direction: fn_operations.DirUnidirectional,
|
|
Status: fn_operations.RelRunning,
|
|
StartedAt: &startedAt,
|
|
Tags: []string{"automation", "pipeline"},
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("insertar relation cdp_to_script: %w", err)
|
|
}
|
|
} else {
|
|
existing.Status = fn_operations.RelRunning
|
|
existing.UpdatedAt = now
|
|
db.UpdateRelation(existing)
|
|
}
|
|
|
|
return cdpToScript, nil
|
|
}
|
|
|
|
// UpdateRelationAfterRun actualiza el status de la relation segun el resultado.
|
|
func UpdateRelationAfterRun(db *fn_operations.DB, relationID string, runErr error) {
|
|
rel, err := db.GetRelation(relationID)
|
|
if err != nil || rel == nil {
|
|
return
|
|
}
|
|
if runErr != nil {
|
|
rel.Status = fn_operations.RelImplemented
|
|
} else {
|
|
rel.Status = fn_operations.RelTested
|
|
}
|
|
now := time.Now()
|
|
rel.EndedAt = &now
|
|
rel.UpdatedAt = now
|
|
db.UpdateRelation(rel)
|
|
}
|
|
|
|
// --- Executions ---
|
|
|
|
// RecordRun registra una ejecucion completa del script en operations.db.
|
|
func RecordRun(db *fn_operations.DB, script *Script, relationID string, results []StepResult, runErr error, startedAt, endedAt time.Time) (string, error) {
|
|
totalSteps := int64(len(results))
|
|
successSteps := int64(0)
|
|
for _, r := range results {
|
|
if r.Err == nil {
|
|
successSteps++
|
|
}
|
|
}
|
|
|
|
status := fn_operations.ExecSuccess
|
|
errMsg := ""
|
|
if runErr != nil {
|
|
status = fn_operations.ExecFailure
|
|
errMsg = runErr.Error()
|
|
} else if successSteps < totalSteps {
|
|
status = fn_operations.ExecPartial
|
|
}
|
|
|
|
durationMs := endedAt.Sub(startedAt).Milliseconds()
|
|
|
|
stepSummary := make([]map[string]any, 0, len(results))
|
|
for _, r := range results {
|
|
entry := map[string]any{
|
|
"index": r.Index,
|
|
"action": r.Action,
|
|
"elapsed_ms": r.Elapsed.Milliseconds(),
|
|
"ok": r.Err == nil,
|
|
}
|
|
if r.Output != "" {
|
|
entry["output"] = r.Output
|
|
}
|
|
if r.Err != nil {
|
|
entry["error"] = r.Err.Error()
|
|
}
|
|
stepSummary = append(stepSummary, entry)
|
|
}
|
|
|
|
execID := generateID()
|
|
execution := &fn_operations.Execution{
|
|
ID: execID,
|
|
PipelineID: "script_navegador",
|
|
RelationID: relationID,
|
|
Status: status,
|
|
StartedAt: startedAt,
|
|
EndedAt: &endedAt,
|
|
DurationMs: &durationMs,
|
|
RecordsIn: &totalSteps,
|
|
RecordsOut: &successSteps,
|
|
Error: errMsg,
|
|
Metrics: map[string]any{
|
|
"script_name": script.Name,
|
|
"total_steps": totalSteps,
|
|
"success_steps": successSteps,
|
|
"steps": stepSummary,
|
|
},
|
|
}
|
|
|
|
if err := db.InsertExecution(execution); err != nil {
|
|
return "", fmt.Errorf("insertar execution: %w", err)
|
|
}
|
|
|
|
return execID, nil
|
|
}
|
|
|
|
// --- Logs ---
|
|
|
|
// LogStep registra un paso individual como log en operations.db.
|
|
func LogStep(db *fn_operations.DB, execID string, res StepResult) error {
|
|
level := fn_operations.LogInfo
|
|
msg := fmt.Sprintf("step[%d] %s: ok", res.Index, res.Action)
|
|
if res.Err != nil {
|
|
level = fn_operations.LogError
|
|
msg = fmt.Sprintf("step[%d] %s: %v", res.Index, res.Action, res.Err)
|
|
}
|
|
|
|
meta := map[string]any{
|
|
"action": res.Action,
|
|
"elapsed_ms": res.Elapsed.Milliseconds(),
|
|
}
|
|
if res.Output != "" {
|
|
meta["output"] = res.Output
|
|
}
|
|
|
|
log := &fn_operations.Log{
|
|
ID: generateID(),
|
|
Level: level,
|
|
Source: "script_navegador",
|
|
ExecutionID: execID,
|
|
Message: msg,
|
|
Metadata: meta,
|
|
}
|
|
|
|
return db.InsertLog(log)
|
|
}
|