Files
fn_registry/cmd/fn/pyrunner.go
T
egutierrez a4b5651e8c feat: pyrunner mejorado para fn run Python
Refactoriza la ejecucion de funciones Python en fn run. Extrae la logica
a pyrunner.go con soporte para importar dependencias del registry y
ejecutar con el venv del proyecto. Agrega WalCheckpoint en db.go para
que lectores externos vean datos actualizados tras fn index.
2026-03-29 00:13:46 +01:00

460 lines
14 KiB
Go

package main
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"fn-registry/registry"
)
// pyParam represents a parsed parameter from a Python function signature.
type pyParam struct {
Name string
Type string // "str", "int", "bool", "dict", "list", "MetabaseClient", etc.
Default string // empty if required
IsKwargs bool // **kwargs
IsRegistry bool // type is a registry type (needs factory)
}
// pyFactory links a registry type to the function that creates it.
type pyFactory struct {
FuncID string
FuncName string
FilePath string // relative to registryRoot
Params []pyParam // factory's own params (all should be primitives)
}
// parsePySignature extracts parameters from a Python signature string like:
// "def func_name(client: MetabaseClient, name: str, count: int = 0) -> dict"
func parsePySignature(sig string) []pyParam {
// Extract the params part between ( and )
re := regexp.MustCompile(`\(([^)]*)\)`)
m := re.FindStringSubmatch(sig)
if len(m) < 2 {
return nil
}
raw := strings.TrimSpace(m[1])
if raw == "" {
return nil
}
// Split by comma, respecting nested brackets
parts := splitParams(raw)
var params []pyParam
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "" || part == "self" || part == "cls" {
continue
}
p := parseSingleParam(part)
params = append(params, p)
}
return params
}
// splitParams splits a comma-separated param string, respecting brackets.
func splitParams(s string) []string {
var parts []string
depth := 0
start := 0
for i, c := range s {
switch c {
case '[', '(':
depth++
case ']', ')':
depth--
case ',':
if depth == 0 {
parts = append(parts, s[start:i])
start = i + 1
}
}
}
parts = append(parts, s[start:])
return parts
}
// parseSingleParam parses "name: Type = default" or "**kwargs".
func parseSingleParam(s string) pyParam {
s = strings.TrimSpace(s)
if strings.HasPrefix(s, "**") {
return pyParam{Name: strings.TrimPrefix(s, "**"), IsKwargs: true}
}
p := pyParam{}
// Split on "=" for default value
if eqIdx := strings.Index(s, "="); eqIdx != -1 {
p.Default = strings.TrimSpace(s[eqIdx+1:])
s = strings.TrimSpace(s[:eqIdx])
}
// Split on ":" for type annotation
if colIdx := strings.Index(s, ":"); colIdx != -1 {
p.Name = strings.TrimSpace(s[:colIdx])
p.Type = strings.TrimSpace(s[colIdx+1:])
} else {
p.Name = s
}
return p
}
// isPrimitiveType checks if a Python type annotation is a primitive/builtin.
func isPrimitiveType(t string) bool {
t = strings.TrimSpace(t)
// Handle Optional, list[...], dict[...], etc.
base := t
if idx := strings.Index(t, "["); idx != -1 {
base = t[:idx]
}
// Handle "X | None"
if strings.Contains(base, "|") {
base = strings.TrimSpace(strings.Split(base, "|")[0])
}
switch strings.ToLower(base) {
case "str", "int", "float", "bool", "dict", "list", "tuple", "set",
"bytes", "none", "nonetype", "any", "optional":
return true
}
return false
}
// findFactory searches the registry for a function that returns the given type name.
// It looks for functions in the same language whose signature ends with "-> TypeName".
func findFactory(db *registry.DB, typeName, lang string) (*pyFactory, error) {
// Search for functions that return this type
pattern := "-> " + typeName
fns, err := db.SearchFunctions(typeName, "", "", lang, "")
if err != nil {
return nil, err
}
for _, fn := range fns {
if strings.Contains(fn.Signature, pattern) {
params := parsePySignature(fn.Signature)
// Factory should only have primitive params
allPrimitive := true
for _, p := range params {
if !isPrimitiveType(p.Type) && p.Type != "" {
allPrimitive = false
break
}
}
if allPrimitive {
return &pyFactory{
FuncID: fn.ID,
FuncName: fn.Name,
FilePath: fn.FilePath,
Params: params,
}, nil
}
}
}
return nil, fmt.Errorf("no factory function found that returns %s", typeName)
}
// paramToEnvVar converts a param name to an env var name.
// Convention: UPPER(type_name) + "_" + UPPER(param_name)
// e.g., for MetabaseClient factory: METABASE_BASE_URL, METABASE_EMAIL, METABASE_PASSWORD
func paramToEnvVar(typeName, paramName string) string {
// Extract prefix from type name: "MetabaseClient" -> "METABASE"
prefix := camelToUpperSnake(strings.TrimSuffix(typeName, "Client"))
return prefix + "_" + strings.ToUpper(paramName)
}
// camelToUpperSnake converts "MetabaseClient" -> "METABASE_CLIENT", "Metabase" -> "METABASE".
func camelToUpperSnake(s string) string {
re := regexp.MustCompile("([a-z])([A-Z])")
snake := re.ReplaceAllString(s, "${1}_${2}")
return strings.ToUpper(snake)
}
// generatePyRunner creates a temporary Python script that:
// 1. Imports the target function
// 2. Resolves registry type dependencies via factory functions + env vars
// 3. Parses CLI args for primitive parameters
// 4. Calls the function and prints result as JSON
func generatePyRunner(fn *registry.Function, db *registry.DB, registryRoot string) (string, error) {
params := parsePySignature(fn.Signature)
if params == nil {
// No params — simple call
return generateSimpleRunner(fn, registryRoot)
}
// Classify params
var factoryImports []string // import lines for factories
var factorySetup []string // code to create factory objects
var argLines []string // code to parse CLI args
var callArgs []string // arguments to pass to the function
cliArgIdx := 0
for _, p := range params {
if p.IsKwargs {
// Skip **kwargs for now — can't auto-resolve from CLI
continue
}
if p.Type != "" && !isPrimitiveType(p.Type) {
// Registry type — find factory
factory, err := findFactory(db, p.Type, fn.Lang)
if err != nil {
return "", fmt.Errorf("param %q of type %q: %w", p.Name, p.Type, err)
}
// Generate import for factory
factoryMod := filePathToModule(factory.FilePath)
factoryImports = append(factoryImports,
fmt.Sprintf("from %s import %s", factoryMod, factory.FuncName))
// Generate env var resolution for factory params
var factoryArgs []string
var envChecks []string
for _, fp := range factory.Params {
envName := paramToEnvVar(p.Type, fp.Name)
envChecks = append(envChecks, envName)
factoryArgs = append(factoryArgs,
fmt.Sprintf("os.environ[%q]", envName))
}
// Add check for missing env vars
factorySetup = append(factorySetup,
fmt.Sprintf("# Factory for %s: %s", p.Type, factory.FuncName))
factorySetup = append(factorySetup,
fmt.Sprintf("_missing = [k for k in %s if k not in os.environ]",
pythonList(envChecks)))
factorySetup = append(factorySetup,
fmt.Sprintf(`if _missing: sys.exit(f"error: missing env vars for %s: {', '.join(_missing)}")`,
p.Type))
factorySetup = append(factorySetup,
fmt.Sprintf("%s = %s(%s)", p.Name, factory.FuncName,
strings.Join(factoryArgs, ", ")))
callArgs = append(callArgs, p.Name)
} else {
// Primitive type — from CLI args
if p.Default != "" {
// Optional param with default
argLines = append(argLines,
fmt.Sprintf("%s = _args[%d] if len(_args) > %d else %s",
p.Name, cliArgIdx, cliArgIdx, convertDefault(p.Type, p.Default)))
argLines = append(argLines,
convertArg(p.Name, p.Type, true))
} else {
// Required param
argLines = append(argLines,
fmt.Sprintf("if len(_args) <= %d: sys.exit('error: missing required arg: %s (%s)')",
cliArgIdx, p.Name, p.Type))
argLines = append(argLines,
fmt.Sprintf("%s = _args[%d]", p.Name, cliArgIdx))
argLines = append(argLines,
convertArg(p.Name, p.Type, false))
}
callArgs = append(callArgs, p.Name)
cliArgIdx++
}
}
// Build the target function import
targetMod := filePathToModule(fn.FilePath)
targetImport := fmt.Sprintf("from %s import %s", targetMod, fn.Name)
// Assemble script
var sb strings.Builder
sb.WriteString("#!/usr/bin/env python3\n")
sb.WriteString("\"\"\"Auto-generated runner for fn run.\"\"\"\n")
sb.WriteString("import json, os, sys\n\n")
// Imports
sb.WriteString(targetImport + "\n")
for _, imp := range factoryImports {
sb.WriteString(imp + "\n")
}
sb.WriteString("\n")
// CLI args
sb.WriteString("_args = sys.argv[1:]\n\n")
// Factory setup (env vars → registry type instances)
if len(factorySetup) > 0 {
sb.WriteString("# --- resolve dependencies from env vars ---\n")
for _, line := range factorySetup {
sb.WriteString(line + "\n")
}
sb.WriteString("\n")
}
// Arg parsing
if len(argLines) > 0 {
sb.WriteString("# --- parse CLI args ---\n")
for _, line := range argLines {
sb.WriteString(line + "\n")
}
sb.WriteString("\n")
}
// Call
sb.WriteString("# --- execute ---\n")
sb.WriteString(fmt.Sprintf("_result = %s(%s)\n", fn.Name, strings.Join(callArgs, ", ")))
sb.WriteString("\n")
// Output
sb.WriteString("# --- output ---\n")
sb.WriteString("if _result is not None:\n")
sb.WriteString(" if isinstance(_result, (dict, list)):\n")
sb.WriteString(" print(json.dumps(_result, indent=2, default=str))\n")
sb.WriteString(" else:\n")
sb.WriteString(" print(_result)\n")
return sb.String(), nil
}
// generateSimpleRunner creates a runner for functions with no parameters.
func generateSimpleRunner(fn *registry.Function, _ string) (string, error) {
targetMod := filePathToModule(fn.FilePath)
var sb strings.Builder
sb.WriteString("#!/usr/bin/env python3\n")
sb.WriteString("import json\n\n")
sb.WriteString(fmt.Sprintf("from %s import %s\n\n", targetMod, fn.Name))
sb.WriteString(fmt.Sprintf("_result = %s()\n", fn.Name))
sb.WriteString("if _result is not None:\n")
sb.WriteString(" if isinstance(_result, (dict, list)):\n")
sb.WriteString(" print(json.dumps(_result, indent=2, default=str))\n")
sb.WriteString(" else:\n")
sb.WriteString(" print(_result)\n")
return sb.String(), nil
}
// filePathToModule converts "python/functions/metabase/databases.py" -> "metabase.databases".
func filePathToModule(filePath string) string {
// Strip "python/functions/" prefix
mod := filePath
mod = strings.TrimPrefix(mod, "python/functions/")
mod = strings.TrimSuffix(mod, ".py")
mod = strings.ReplaceAll(filepath.ToSlash(mod), "/", ".")
return mod
}
// convertArg generates Python code to convert a string arg to the right type.
func convertArg(name, typ string, _ bool) string {
switch strings.ToLower(typ) {
case "int":
return fmt.Sprintf("%s = int(%s)", name, name)
case "float":
return fmt.Sprintf("%s = float(%s)", name, name)
case "bool":
return fmt.Sprintf("%s = %s.lower() in ('true', '1', 'yes') if isinstance(%s, str) else %s",
name, name, name, name)
case "dict":
return fmt.Sprintf("%s = json.loads(%s) if isinstance(%s, str) else %s",
name, name, name, name)
case "list":
return fmt.Sprintf("%s = json.loads(%s) if isinstance(%s, str) else %s",
name, name, name, name)
case "bytes":
return fmt.Sprintf("%s = %s.encode('utf-8') if isinstance(%s, str) else %s",
name, name, name, name)
default:
// str or unknown — no conversion
return ""
}
}
// convertDefault ensures the default value is valid Python for the given type.
func convertDefault(_, def string) string {
// Most defaults from the signature are already valid Python
// Just handle the None case for Optional types
if def == "None" || def == "" {
return "None"
}
return def
}
// pythonList creates a Python list literal from strings: ["a", "b", "c"]
func pythonList(items []string) string {
quoted := make([]string, len(items))
for i, item := range items {
quoted[i] = fmt.Sprintf("%q", item)
}
return "[" + strings.Join(quoted, ", ") + "]"
}
// buildPyRunnerCommand creates the exec.Cmd that runs a generated Python runner script.
func buildPyRunnerCommand(fn *registry.Function, db *registry.DB, registryRoot string, args []string) (*exec.Cmd, error) {
// Check for __main__.py first — explicit package runner takes priority
dir := filepath.Join(registryRoot, filepath.Dir(fn.FilePath))
mainPy := filepath.Join(dir, "__main__.py")
if _, err := os.Stat(mainPy); err == nil {
return buildPyModuleCommand(fn, registryRoot, args)
}
// Generate runner script
script, err := generatePyRunner(fn, db, registryRoot)
if err != nil {
return nil, fmt.Errorf("generating runner: %w", err)
}
// Write to temp file
tmpFile, err := os.CreateTemp("", "fn_run_*.py")
if err != nil {
return nil, fmt.Errorf("creating temp file: %w", err)
}
if _, err := tmpFile.WriteString(script); err != nil {
tmpFile.Close()
os.Remove(tmpFile.Name())
return nil, err
}
tmpFile.Close()
// Log the generated script for debugging (only the runner path)
fmt.Fprintf(os.Stderr, "[fn run] generated runner: %s\n", tmpFile.Name())
// Build command
venvPython := filepath.Join(registryRoot, "python", ".venv", "bin", "python3")
pythonBin := "python3"
if _, err := os.Stat(venvPython); err == nil {
pythonBin = venvPython
}
pythonPath := filepath.Join(registryRoot, "python", "functions")
cmdArgs := append([]string{tmpFile.Name()}, args...)
cmd := exec.Command(pythonBin, cmdArgs...)
cmd.Dir = registryRoot
cmd.Env = append(os.Environ(), "PYTHONPATH="+pythonPath)
return cmd, nil
}
// buildPyModuleCommand runs a Python package with __main__.py (existing behavior).
func buildPyModuleCommand(fn *registry.Function, registryRoot string, args []string) (*exec.Cmd, error) {
venvPython := filepath.Join(registryRoot, "python", ".venv", "bin", "python3")
pythonBin := "python3"
if _, err := os.Stat(venvPython); err == nil {
pythonBin = venvPython
}
pythonPath := filepath.Join(registryRoot, "python", "functions")
absPath := filepath.Join(registryRoot, fn.FilePath)
dir := filepath.Dir(absPath)
relToRoot, _ := filepath.Rel(pythonPath, absPath)
modPath := strings.TrimSuffix(relToRoot, ".py")
modPath = strings.ReplaceAll(filepath.ToSlash(modPath), "/", ".")
// Check if it's a package directory with __main__.py — use package name
if _, err := os.Stat(filepath.Join(dir, "__main__.py")); err == nil {
modPath = strings.TrimSuffix(modPath, "."+filepath.Base(modPath))
}
cmdArgs := append([]string{"-m", modPath}, args...)
cmd := exec.Command(pythonBin, cmdArgs...)
cmd.Dir = pythonPath
cmd.Env = append(os.Environ(), "PYTHONPATH="+pythonPath)
return cmd, nil
}