a4b5651e8c
Refactoriza la ejecucion de funciones Python en fn run. Extrae la logica a pyrunner.go con soporte para importar dependencias del registry y ejecutar con el venv del proyecto. Agrega WalCheckpoint en db.go para que lectores externos vean datos actualizados tras fn index.
460 lines
14 KiB
Go
460 lines
14 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"fn-registry/registry"
|
|
)
|
|
|
|
// pyParam represents a parsed parameter from a Python function signature.
|
|
type pyParam struct {
|
|
Name string
|
|
Type string // "str", "int", "bool", "dict", "list", "MetabaseClient", etc.
|
|
Default string // empty if required
|
|
IsKwargs bool // **kwargs
|
|
IsRegistry bool // type is a registry type (needs factory)
|
|
}
|
|
|
|
// pyFactory links a registry type to the function that creates it.
|
|
type pyFactory struct {
|
|
FuncID string
|
|
FuncName string
|
|
FilePath string // relative to registryRoot
|
|
Params []pyParam // factory's own params (all should be primitives)
|
|
}
|
|
|
|
// parsePySignature extracts parameters from a Python signature string like:
|
|
// "def func_name(client: MetabaseClient, name: str, count: int = 0) -> dict"
|
|
func parsePySignature(sig string) []pyParam {
|
|
// Extract the params part between ( and )
|
|
re := regexp.MustCompile(`\(([^)]*)\)`)
|
|
m := re.FindStringSubmatch(sig)
|
|
if len(m) < 2 {
|
|
return nil
|
|
}
|
|
raw := strings.TrimSpace(m[1])
|
|
if raw == "" {
|
|
return nil
|
|
}
|
|
|
|
// Split by comma, respecting nested brackets
|
|
parts := splitParams(raw)
|
|
var params []pyParam
|
|
for _, part := range parts {
|
|
part = strings.TrimSpace(part)
|
|
if part == "" || part == "self" || part == "cls" {
|
|
continue
|
|
}
|
|
p := parseSingleParam(part)
|
|
params = append(params, p)
|
|
}
|
|
return params
|
|
}
|
|
|
|
// splitParams splits a comma-separated param string, respecting brackets.
|
|
func splitParams(s string) []string {
|
|
var parts []string
|
|
depth := 0
|
|
start := 0
|
|
for i, c := range s {
|
|
switch c {
|
|
case '[', '(':
|
|
depth++
|
|
case ']', ')':
|
|
depth--
|
|
case ',':
|
|
if depth == 0 {
|
|
parts = append(parts, s[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
}
|
|
parts = append(parts, s[start:])
|
|
return parts
|
|
}
|
|
|
|
// parseSingleParam parses "name: Type = default" or "**kwargs".
|
|
func parseSingleParam(s string) pyParam {
|
|
s = strings.TrimSpace(s)
|
|
if strings.HasPrefix(s, "**") {
|
|
return pyParam{Name: strings.TrimPrefix(s, "**"), IsKwargs: true}
|
|
}
|
|
|
|
p := pyParam{}
|
|
|
|
// Split on "=" for default value
|
|
if eqIdx := strings.Index(s, "="); eqIdx != -1 {
|
|
p.Default = strings.TrimSpace(s[eqIdx+1:])
|
|
s = strings.TrimSpace(s[:eqIdx])
|
|
}
|
|
|
|
// Split on ":" for type annotation
|
|
if colIdx := strings.Index(s, ":"); colIdx != -1 {
|
|
p.Name = strings.TrimSpace(s[:colIdx])
|
|
p.Type = strings.TrimSpace(s[colIdx+1:])
|
|
} else {
|
|
p.Name = s
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
// isPrimitiveType checks if a Python type annotation is a primitive/builtin.
|
|
func isPrimitiveType(t string) bool {
|
|
t = strings.TrimSpace(t)
|
|
// Handle Optional, list[...], dict[...], etc.
|
|
base := t
|
|
if idx := strings.Index(t, "["); idx != -1 {
|
|
base = t[:idx]
|
|
}
|
|
// Handle "X | None"
|
|
if strings.Contains(base, "|") {
|
|
base = strings.TrimSpace(strings.Split(base, "|")[0])
|
|
}
|
|
switch strings.ToLower(base) {
|
|
case "str", "int", "float", "bool", "dict", "list", "tuple", "set",
|
|
"bytes", "none", "nonetype", "any", "optional":
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// findFactory searches the registry for a function that returns the given type name.
|
|
// It looks for functions in the same language whose signature ends with "-> TypeName".
|
|
func findFactory(db *registry.DB, typeName, lang string) (*pyFactory, error) {
|
|
// Search for functions that return this type
|
|
pattern := "-> " + typeName
|
|
fns, err := db.SearchFunctions(typeName, "", "", lang, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, fn := range fns {
|
|
if strings.Contains(fn.Signature, pattern) {
|
|
params := parsePySignature(fn.Signature)
|
|
// Factory should only have primitive params
|
|
allPrimitive := true
|
|
for _, p := range params {
|
|
if !isPrimitiveType(p.Type) && p.Type != "" {
|
|
allPrimitive = false
|
|
break
|
|
}
|
|
}
|
|
if allPrimitive {
|
|
return &pyFactory{
|
|
FuncID: fn.ID,
|
|
FuncName: fn.Name,
|
|
FilePath: fn.FilePath,
|
|
Params: params,
|
|
}, nil
|
|
}
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("no factory function found that returns %s", typeName)
|
|
}
|
|
|
|
// paramToEnvVar converts a param name to an env var name.
|
|
// Convention: UPPER(type_name) + "_" + UPPER(param_name)
|
|
// e.g., for MetabaseClient factory: METABASE_BASE_URL, METABASE_EMAIL, METABASE_PASSWORD
|
|
func paramToEnvVar(typeName, paramName string) string {
|
|
// Extract prefix from type name: "MetabaseClient" -> "METABASE"
|
|
prefix := camelToUpperSnake(strings.TrimSuffix(typeName, "Client"))
|
|
return prefix + "_" + strings.ToUpper(paramName)
|
|
}
|
|
|
|
// camelToUpperSnake converts "MetabaseClient" -> "METABASE_CLIENT", "Metabase" -> "METABASE".
|
|
func camelToUpperSnake(s string) string {
|
|
re := regexp.MustCompile("([a-z])([A-Z])")
|
|
snake := re.ReplaceAllString(s, "${1}_${2}")
|
|
return strings.ToUpper(snake)
|
|
}
|
|
|
|
// generatePyRunner creates a temporary Python script that:
|
|
// 1. Imports the target function
|
|
// 2. Resolves registry type dependencies via factory functions + env vars
|
|
// 3. Parses CLI args for primitive parameters
|
|
// 4. Calls the function and prints result as JSON
|
|
func generatePyRunner(fn *registry.Function, db *registry.DB, registryRoot string) (string, error) {
|
|
params := parsePySignature(fn.Signature)
|
|
if params == nil {
|
|
// No params — simple call
|
|
return generateSimpleRunner(fn, registryRoot)
|
|
}
|
|
|
|
// Classify params
|
|
var factoryImports []string // import lines for factories
|
|
var factorySetup []string // code to create factory objects
|
|
var argLines []string // code to parse CLI args
|
|
var callArgs []string // arguments to pass to the function
|
|
|
|
cliArgIdx := 0
|
|
|
|
for _, p := range params {
|
|
if p.IsKwargs {
|
|
// Skip **kwargs for now — can't auto-resolve from CLI
|
|
continue
|
|
}
|
|
|
|
if p.Type != "" && !isPrimitiveType(p.Type) {
|
|
// Registry type — find factory
|
|
factory, err := findFactory(db, p.Type, fn.Lang)
|
|
if err != nil {
|
|
return "", fmt.Errorf("param %q of type %q: %w", p.Name, p.Type, err)
|
|
}
|
|
|
|
// Generate import for factory
|
|
factoryMod := filePathToModule(factory.FilePath)
|
|
factoryImports = append(factoryImports,
|
|
fmt.Sprintf("from %s import %s", factoryMod, factory.FuncName))
|
|
|
|
// Generate env var resolution for factory params
|
|
var factoryArgs []string
|
|
var envChecks []string
|
|
for _, fp := range factory.Params {
|
|
envName := paramToEnvVar(p.Type, fp.Name)
|
|
envChecks = append(envChecks, envName)
|
|
factoryArgs = append(factoryArgs,
|
|
fmt.Sprintf("os.environ[%q]", envName))
|
|
}
|
|
|
|
// Add check for missing env vars
|
|
factorySetup = append(factorySetup,
|
|
fmt.Sprintf("# Factory for %s: %s", p.Type, factory.FuncName))
|
|
factorySetup = append(factorySetup,
|
|
fmt.Sprintf("_missing = [k for k in %s if k not in os.environ]",
|
|
pythonList(envChecks)))
|
|
factorySetup = append(factorySetup,
|
|
fmt.Sprintf(`if _missing: sys.exit(f"error: missing env vars for %s: {', '.join(_missing)}")`,
|
|
p.Type))
|
|
factorySetup = append(factorySetup,
|
|
fmt.Sprintf("%s = %s(%s)", p.Name, factory.FuncName,
|
|
strings.Join(factoryArgs, ", ")))
|
|
|
|
callArgs = append(callArgs, p.Name)
|
|
} else {
|
|
// Primitive type — from CLI args
|
|
if p.Default != "" {
|
|
// Optional param with default
|
|
argLines = append(argLines,
|
|
fmt.Sprintf("%s = _args[%d] if len(_args) > %d else %s",
|
|
p.Name, cliArgIdx, cliArgIdx, convertDefault(p.Type, p.Default)))
|
|
argLines = append(argLines,
|
|
convertArg(p.Name, p.Type, true))
|
|
} else {
|
|
// Required param
|
|
argLines = append(argLines,
|
|
fmt.Sprintf("if len(_args) <= %d: sys.exit('error: missing required arg: %s (%s)')",
|
|
cliArgIdx, p.Name, p.Type))
|
|
argLines = append(argLines,
|
|
fmt.Sprintf("%s = _args[%d]", p.Name, cliArgIdx))
|
|
argLines = append(argLines,
|
|
convertArg(p.Name, p.Type, false))
|
|
}
|
|
callArgs = append(callArgs, p.Name)
|
|
cliArgIdx++
|
|
}
|
|
}
|
|
|
|
// Build the target function import
|
|
targetMod := filePathToModule(fn.FilePath)
|
|
targetImport := fmt.Sprintf("from %s import %s", targetMod, fn.Name)
|
|
|
|
// Assemble script
|
|
var sb strings.Builder
|
|
sb.WriteString("#!/usr/bin/env python3\n")
|
|
sb.WriteString("\"\"\"Auto-generated runner for fn run.\"\"\"\n")
|
|
sb.WriteString("import json, os, sys\n\n")
|
|
|
|
// Imports
|
|
sb.WriteString(targetImport + "\n")
|
|
for _, imp := range factoryImports {
|
|
sb.WriteString(imp + "\n")
|
|
}
|
|
sb.WriteString("\n")
|
|
|
|
// CLI args
|
|
sb.WriteString("_args = sys.argv[1:]\n\n")
|
|
|
|
// Factory setup (env vars → registry type instances)
|
|
if len(factorySetup) > 0 {
|
|
sb.WriteString("# --- resolve dependencies from env vars ---\n")
|
|
for _, line := range factorySetup {
|
|
sb.WriteString(line + "\n")
|
|
}
|
|
sb.WriteString("\n")
|
|
}
|
|
|
|
// Arg parsing
|
|
if len(argLines) > 0 {
|
|
sb.WriteString("# --- parse CLI args ---\n")
|
|
for _, line := range argLines {
|
|
sb.WriteString(line + "\n")
|
|
}
|
|
sb.WriteString("\n")
|
|
}
|
|
|
|
// Call
|
|
sb.WriteString("# --- execute ---\n")
|
|
sb.WriteString(fmt.Sprintf("_result = %s(%s)\n", fn.Name, strings.Join(callArgs, ", ")))
|
|
sb.WriteString("\n")
|
|
|
|
// Output
|
|
sb.WriteString("# --- output ---\n")
|
|
sb.WriteString("if _result is not None:\n")
|
|
sb.WriteString(" if isinstance(_result, (dict, list)):\n")
|
|
sb.WriteString(" print(json.dumps(_result, indent=2, default=str))\n")
|
|
sb.WriteString(" else:\n")
|
|
sb.WriteString(" print(_result)\n")
|
|
|
|
return sb.String(), nil
|
|
}
|
|
|
|
// generateSimpleRunner creates a runner for functions with no parameters.
|
|
func generateSimpleRunner(fn *registry.Function, _ string) (string, error) {
|
|
targetMod := filePathToModule(fn.FilePath)
|
|
var sb strings.Builder
|
|
sb.WriteString("#!/usr/bin/env python3\n")
|
|
sb.WriteString("import json\n\n")
|
|
sb.WriteString(fmt.Sprintf("from %s import %s\n\n", targetMod, fn.Name))
|
|
sb.WriteString(fmt.Sprintf("_result = %s()\n", fn.Name))
|
|
sb.WriteString("if _result is not None:\n")
|
|
sb.WriteString(" if isinstance(_result, (dict, list)):\n")
|
|
sb.WriteString(" print(json.dumps(_result, indent=2, default=str))\n")
|
|
sb.WriteString(" else:\n")
|
|
sb.WriteString(" print(_result)\n")
|
|
return sb.String(), nil
|
|
}
|
|
|
|
// filePathToModule converts "python/functions/metabase/databases.py" -> "metabase.databases".
|
|
func filePathToModule(filePath string) string {
|
|
// Strip "python/functions/" prefix
|
|
mod := filePath
|
|
mod = strings.TrimPrefix(mod, "python/functions/")
|
|
mod = strings.TrimSuffix(mod, ".py")
|
|
mod = strings.ReplaceAll(filepath.ToSlash(mod), "/", ".")
|
|
return mod
|
|
}
|
|
|
|
// convertArg generates Python code to convert a string arg to the right type.
|
|
func convertArg(name, typ string, _ bool) string {
|
|
switch strings.ToLower(typ) {
|
|
case "int":
|
|
return fmt.Sprintf("%s = int(%s)", name, name)
|
|
case "float":
|
|
return fmt.Sprintf("%s = float(%s)", name, name)
|
|
case "bool":
|
|
return fmt.Sprintf("%s = %s.lower() in ('true', '1', 'yes') if isinstance(%s, str) else %s",
|
|
name, name, name, name)
|
|
case "dict":
|
|
return fmt.Sprintf("%s = json.loads(%s) if isinstance(%s, str) else %s",
|
|
name, name, name, name)
|
|
case "list":
|
|
return fmt.Sprintf("%s = json.loads(%s) if isinstance(%s, str) else %s",
|
|
name, name, name, name)
|
|
case "bytes":
|
|
return fmt.Sprintf("%s = %s.encode('utf-8') if isinstance(%s, str) else %s",
|
|
name, name, name, name)
|
|
default:
|
|
// str or unknown — no conversion
|
|
return ""
|
|
}
|
|
}
|
|
|
|
// convertDefault ensures the default value is valid Python for the given type.
|
|
func convertDefault(_, def string) string {
|
|
// Most defaults from the signature are already valid Python
|
|
// Just handle the None case for Optional types
|
|
if def == "None" || def == "" {
|
|
return "None"
|
|
}
|
|
return def
|
|
}
|
|
|
|
// pythonList creates a Python list literal from strings: ["a", "b", "c"]
|
|
func pythonList(items []string) string {
|
|
quoted := make([]string, len(items))
|
|
for i, item := range items {
|
|
quoted[i] = fmt.Sprintf("%q", item)
|
|
}
|
|
return "[" + strings.Join(quoted, ", ") + "]"
|
|
}
|
|
|
|
// buildPyRunnerCommand creates the exec.Cmd that runs a generated Python runner script.
|
|
func buildPyRunnerCommand(fn *registry.Function, db *registry.DB, registryRoot string, args []string) (*exec.Cmd, error) {
|
|
// Check for __main__.py first — explicit package runner takes priority
|
|
dir := filepath.Join(registryRoot, filepath.Dir(fn.FilePath))
|
|
mainPy := filepath.Join(dir, "__main__.py")
|
|
if _, err := os.Stat(mainPy); err == nil {
|
|
return buildPyModuleCommand(fn, registryRoot, args)
|
|
}
|
|
|
|
// Generate runner script
|
|
script, err := generatePyRunner(fn, db, registryRoot)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("generating runner: %w", err)
|
|
}
|
|
|
|
// Write to temp file
|
|
tmpFile, err := os.CreateTemp("", "fn_run_*.py")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating temp file: %w", err)
|
|
}
|
|
if _, err := tmpFile.WriteString(script); err != nil {
|
|
tmpFile.Close()
|
|
os.Remove(tmpFile.Name())
|
|
return nil, err
|
|
}
|
|
tmpFile.Close()
|
|
|
|
// Log the generated script for debugging (only the runner path)
|
|
fmt.Fprintf(os.Stderr, "[fn run] generated runner: %s\n", tmpFile.Name())
|
|
|
|
// Build command
|
|
venvPython := filepath.Join(registryRoot, "python", ".venv", "bin", "python3")
|
|
pythonBin := "python3"
|
|
if _, err := os.Stat(venvPython); err == nil {
|
|
pythonBin = venvPython
|
|
}
|
|
|
|
pythonPath := filepath.Join(registryRoot, "python", "functions")
|
|
cmdArgs := append([]string{tmpFile.Name()}, args...)
|
|
cmd := exec.Command(pythonBin, cmdArgs...)
|
|
cmd.Dir = registryRoot
|
|
cmd.Env = append(os.Environ(), "PYTHONPATH="+pythonPath)
|
|
|
|
return cmd, nil
|
|
}
|
|
|
|
// buildPyModuleCommand runs a Python package with __main__.py (existing behavior).
|
|
func buildPyModuleCommand(fn *registry.Function, registryRoot string, args []string) (*exec.Cmd, error) {
|
|
venvPython := filepath.Join(registryRoot, "python", ".venv", "bin", "python3")
|
|
pythonBin := "python3"
|
|
if _, err := os.Stat(venvPython); err == nil {
|
|
pythonBin = venvPython
|
|
}
|
|
|
|
pythonPath := filepath.Join(registryRoot, "python", "functions")
|
|
absPath := filepath.Join(registryRoot, fn.FilePath)
|
|
dir := filepath.Dir(absPath)
|
|
|
|
relToRoot, _ := filepath.Rel(pythonPath, absPath)
|
|
modPath := strings.TrimSuffix(relToRoot, ".py")
|
|
modPath = strings.ReplaceAll(filepath.ToSlash(modPath), "/", ".")
|
|
|
|
// Check if it's a package directory with __main__.py — use package name
|
|
if _, err := os.Stat(filepath.Join(dir, "__main__.py")); err == nil {
|
|
modPath = strings.TrimSuffix(modPath, "."+filepath.Base(modPath))
|
|
}
|
|
|
|
cmdArgs := append([]string{"-m", modPath}, args...)
|
|
cmd := exec.Command(pythonBin, cmdArgs...)
|
|
cmd.Dir = pythonPath
|
|
cmd.Env = append(os.Environ(), "PYTHONPATH="+pythonPath)
|
|
return cmd, nil
|
|
}
|