merge: quick/apps-table-metabase-dashboard — tabla apps, dashboard Metabase, funciones Python

This commit is contained in:
2026-03-29 00:14:21 +01:00
73 changed files with 3433 additions and 50 deletions
+1 -1
View File
@@ -66,7 +66,7 @@ fn-registry/
frontend/types/ # .ts + .md por tipo
registry/ # Paquete Go: modelos, SQLite, parser, indexer, validacion, migraciones
fn_operations/ # Paquete Go: operations database (libreria)
apps/ # Apps ejecutables (TUIs, CLIs) — modulos Go independientes, cada una con su operations.db
apps/ # Apps ejecutables (TUIs, CLIs, scripts) — codigo NO reutilizable, cada una con su operations.db
cmd/fn/ # CLI principal
docs/ # Specs de diseño
docs/templates/ # Plantillas de frontmatter
+1
View File
@@ -13,3 +13,4 @@ Reglas operativas del proyecto. Cada archivo es una regla independiente.
| 07 | [proposals.md](proposals.md) | Quien crea proposals y cuando |
| 08 | [tag_launcher.md](tag_launcher.md) | Tag launcher para Pipeline Launcher TUI |
| 09 | [go_packages.md](go_packages.md) | Nombre de paquete Go = nombre del directorio |
| 10 | [apps_vs_functions.md](apps_vs_functions.md) | Codigo reutilizable en functions/, no reutilizable en apps/ |
+9
View File
@@ -0,0 +1,9 @@
Solo codigo reutilizable y componible va en `functions/`, `python/functions/`, `bash/functions/`, `frontend/functions/`.
Scripts especificos, dashboards hardcodeados, CLIs de un solo uso, y cualquier codigo que no sea una primitiva componible va en `apps/`. Cada app en `apps/` es independiente: puede importar funciones del registry pero nunca al reves.
Criterios para decidir:
- **functions/**: firma generica, sin credenciales ni config hardcodeada, util en multiples contextos
- **apps/**: orquesta funciones del registry para un caso concreto, tiene config/credenciales, layout fijo
Las apps Python importan funciones del registry con: `sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "python", "functions"))` y luego `from <paquete> import ...` (sin prefijo `functions.`).
+28
View File
@@ -0,0 +1,28 @@
---
name: docker_tui
lang: go
domain: infra
description: "TUI interactiva para gestion de contenedores, imagenes, volumenes y redes Docker."
tags: [docker, tui, bubbletea, containers]
uses_functions:
- docker_pull_image_go_infra
- docker_list_containers_go_infra
- docker_remove_container_go_infra
- docker_stop_container_go_infra
- docker_start_container_go_infra
- docker_list_images_go_infra
- docker_remove_image_go_infra
- docker_remove_network_go_infra
- docker_create_network_go_infra
- docker_inspect_container_go_infra
- docker_run_container_go_infra
- docker_container_logs_go_infra
uses_types: []
framework: bubbletea
entry_point: "main.go"
dir_path: "apps/docker_tui"
---
## Notas
Aplicacion TUI con pestanas para contenedores, imagenes, volumenes, redes y compose. Construida con Bubble Tea (Charmbracelet).
+24
View File
@@ -0,0 +1,24 @@
---
name: metabase_registry
lang: py
domain: analytics
description: "Setup y dashboards automaticos de Metabase para visualizar metricas del fn-registry."
tags: [metabase, dashboard, analytics, visualization]
uses_functions:
- metabase_auth_py_infra
- metabase_create_card_py_infra
- metabase_create_dashboard_py_infra
- metabase_update_dashboard_py_infra
- metabase_list_databases_py_infra
- metabase_add_database_py_infra
- metabase_list_dashboards_py_infra
- metabase_create_user_py_infra
uses_types: []
framework: httpx
entry_point: "main.py"
dir_path: "apps/metabase_registry"
---
## Notas
Scripts Python que conectan con la API REST de Metabase para crear datasources, cards SQL y dashboards automaticamente. Usa las funciones del paquete python/functions/metabase/ del registry. Credenciales en .env local.
@@ -0,0 +1,252 @@
"""Crea un dashboard en Metabase con metricas del fn-registry."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "python", "functions"))
from metabase.client import metabase_auth
from metabase import (
metabase_list_databases,
metabase_create_card,
metabase_create_dashboard,
metabase_update_dashboard,
metabase_list_dashboards,
)
# --- Config ---
METABASE_URL = "http://localhost:3000"
EMAIL = "admin@fnregistry.local"
PASSWORD = "FnRegistry2024!"
# --- SQL Queries ---
CARDS = [
{
"name": "Total de Funciones",
"display": "scalar",
"sql": "SELECT COUNT(*) AS total FROM functions;",
"size_x": 4, "size_y": 3, "col": 0, "row": 0,
},
{
"name": "Funciones con Tests",
"display": "scalar",
"sql": "SELECT COUNT(*) AS con_tests FROM functions WHERE tested = 1;",
"size_x": 4, "size_y": 3, "col": 4, "row": 0,
},
{
"name": "Funciones sin Tests",
"display": "scalar",
"sql": "SELECT COUNT(*) AS sin_tests FROM functions WHERE tested = 0;",
"size_x": 4, "size_y": 3, "col": 8, "row": 0,
},
{
"name": "Total de Tipos",
"display": "scalar",
"sql": "SELECT COUNT(*) AS total FROM types;",
"size_x": 3, "size_y": 3, "col": 12, "row": 0,
},
{
"name": "Proposals Pendientes",
"display": "scalar",
"sql": "SELECT COUNT(*) AS pendientes FROM proposals WHERE status = 'pending';",
"size_x": 3, "size_y": 3, "col": 15, "row": 0,
},
{
"name": "Funciones por Lenguaje",
"display": "bar",
"sql": "SELECT lang, COUNT(*) AS cantidad FROM functions GROUP BY lang ORDER BY cantidad DESC;",
"size_x": 6, "size_y": 5, "col": 0, "row": 3,
},
{
"name": "Funciones por Dominio",
"display": "pie",
"sql": "SELECT domain, COUNT(*) AS cantidad FROM functions GROUP BY domain ORDER BY cantidad DESC;",
"size_x": 6, "size_y": 5, "col": 6, "row": 3,
},
{
"name": "Funciones por Kind",
"display": "bar",
"sql": "SELECT kind, COUNT(*) AS cantidad FROM functions GROUP BY kind ORDER BY cantidad DESC;",
"size_x": 6, "size_y": 5, "col": 12, "row": 3,
},
{
"name": "Puras vs Impuras",
"display": "pie",
"sql": "SELECT purity, COUNT(*) AS cantidad FROM functions GROUP BY purity ORDER BY cantidad DESC;",
"size_x": 6, "size_y": 5, "col": 0, "row": 8,
},
{
"name": "Funciones Mas Usadas por Otras",
"display": "row",
"sql": """
WITH RECURSIVE split_uses(fn_id, rest, val) AS (
SELECT id, uses_functions || ',', NULL FROM functions WHERE uses_functions != '[]' AND uses_functions != ''
UNION ALL
SELECT fn_id,
SUBSTR(rest, INSTR(rest, ',') + 1),
TRIM(SUBSTR(rest, 1, INSTR(rest, ',') - 1), ' "[]')
FROM split_uses WHERE rest != ''
)
SELECT val AS funcion_usada, COUNT(*) AS veces_usada
FROM split_uses
WHERE val IS NOT NULL AND val != '' AND val != ']'
GROUP BY val
ORDER BY veces_usada DESC
LIMIT 15;
""",
"size_x": 6, "size_y": 5, "col": 6, "row": 8,
},
{
"name": "Funciones Mas Complejas (mas dependencias)",
"display": "row",
"sql": """
SELECT
name || ' (' || lang || ')' AS funcion,
(LENGTH(uses_functions) - LENGTH(REPLACE(uses_functions, ',', ''))
+ CASE WHEN uses_functions != '[]' AND uses_functions != '' THEN 1 ELSE 0 END) AS num_dependencias,
(LENGTH(uses_types) - LENGTH(REPLACE(uses_types, ',', ''))
+ CASE WHEN uses_types != '[]' AND uses_types != '' THEN 1 ELSE 0 END) AS num_tipos
FROM functions
WHERE uses_functions != '[]' AND uses_functions != ''
ORDER BY num_dependencias DESC
LIMIT 15;
""",
"size_x": 6, "size_y": 5, "col": 12, "row": 8,
},
{
"name": "Cobertura de Tests por Dominio",
"display": "bar",
"sql": """
SELECT
domain,
SUM(CASE WHEN tested = 1 THEN 1 ELSE 0 END) AS con_tests,
SUM(CASE WHEN tested = 0 THEN 1 ELSE 0 END) AS sin_tests
FROM functions
GROUP BY domain
ORDER BY domain;
""",
"size_x": 9, "size_y": 5, "col": 0, "row": 13,
},
{
"name": "Funciones por Lenguaje y Dominio",
"display": "table",
"sql": """
SELECT
domain,
SUM(CASE WHEN lang = 'go' THEN 1 ELSE 0 END) AS go,
SUM(CASE WHEN lang = 'py' THEN 1 ELSE 0 END) AS python,
SUM(CASE WHEN lang = 'bash' THEN 1 ELSE 0 END) AS bash,
SUM(CASE WHEN lang = 'ts' THEN 1 ELSE 0 END) AS typescript,
COUNT(*) AS total
FROM functions
GROUP BY domain
ORDER BY total DESC;
""",
"size_x": 9, "size_y": 5, "col": 9, "row": 13,
},
{
"name": "Tipos por Dominio y Algebraic",
"display": "table",
"sql": """
SELECT
domain,
algebraic,
COUNT(*) AS cantidad
FROM types
GROUP BY domain, algebraic
ORDER BY domain, cantidad DESC;
""",
"size_x": 9, "size_y": 4, "col": 0, "row": 18,
},
{
"name": "Funciones Recientes (ultimas 20 indexadas)",
"display": "table",
"sql": """
SELECT name, lang, domain, kind, purity, tested
FROM functions
ORDER BY created_at DESC
LIMIT 20;
""",
"size_x": 9, "size_y": 4, "col": 9, "row": 18,
},
]
def main():
print("Autenticando en Metabase...")
client = metabase_auth(METABASE_URL, EMAIL, PASSWORD)
# Encontrar la database registry.db
dbs = metabase_list_databases(client)
registry_db_id = None
for db in dbs:
if "registry" in db.get("name", "").lower() or (
db.get("engine") == "sqlite"
and "registry" in db.get("details", {}).get("db", "")
):
registry_db_id = db["id"]
print(f" Database encontrada: {db['name']} (id={db['id']})")
break
if not registry_db_id:
print("ERROR: No se encontro registry.db en Metabase.")
print("Databases disponibles:")
for db in dbs:
print(f" - {db['id']}: {db['name']} ({db['engine']})")
sys.exit(1)
# Verificar si ya existe un dashboard con este nombre
existing = metabase_list_dashboards(client)
for d in existing:
if d.get("name") == "fn-registry Overview":
print(f" Dashboard ya existe (id={d['id']}), recreando...")
from metabase import metabase_delete_dashboard
metabase_delete_dashboard(client, d["id"])
# Crear cards
print("Creando cards...")
created_cards = []
for i, card_def in enumerate(CARDS):
card = metabase_create_card(
client,
name=card_def["name"],
dataset_query={
"database": registry_db_id,
"type": "native",
"native": {"query": card_def["sql"]},
},
display=card_def["display"],
description=f"fn-registry: {card_def['name']}",
)
created_cards.append((card, card_def))
print(f" [{i+1}/{len(CARDS)}] {card_def['name']} (id={card['id']})")
# Crear dashboard
print("Creando dashboard...")
dashboard = metabase_create_dashboard(
client,
name="fn-registry Overview",
description="Dashboard de metricas del registry: funciones, tipos, tests, dependencias y complejidad.",
)
dash_id = dashboard["id"]
print(f" Dashboard creado: id={dash_id}")
# Agregar cards al dashboard con posiciones
dashcards = []
for idx, (card, card_def) in enumerate(created_cards):
dashcards.append({
"id": -(idx + 1),
"card_id": card["id"],
"size_x": card_def["size_x"],
"size_y": card_def["size_y"],
"col": card_def["col"],
"row": card_def["row"],
})
metabase_update_dashboard(client, dash_id, dashcards=dashcards)
print(f"\nDashboard listo: {METABASE_URL}/dashboard/{dash_id}")
client.close()
if __name__ == "__main__":
main()
+16
View File
@@ -0,0 +1,16 @@
---
name: pipeline_launcher
lang: go
domain: tools
description: "TUI para lanzar y monitorear pipelines del fn-registry con historial de ejecuciones."
tags: [pipeline, tui, bubbletea, runner, launcher]
uses_functions: []
uses_types: []
framework: bubbletea
entry_point: "main.go"
dir_path: "apps/pipeline_launcher"
---
## Notas
Aplicacion TUI que lista pipelines con tag `launcher` del registry, permite ejecutarlos y muestra historial de ejecuciones desde operations.db.
+74 -4
View File
@@ -102,7 +102,10 @@ func cmdIndex() {
os.Exit(1)
}
fmt.Printf("Indexed %d functions, %d types\n", result.Functions, result.Types)
// Flush WAL to main db file so external readers (e.g. Metabase) see changes.
db.WalCheckpoint()
fmt.Printf("Indexed %d functions, %d types, %d apps\n", result.Functions, result.Types, result.Apps)
for _, e := range result.ValidationErrors {
fmt.Fprintf(os.Stderr, " INVALID: %s\n", e)
}
@@ -151,7 +154,13 @@ func cmdSearch(args []string) {
os.Exit(1)
}
if len(fns) == 0 && len(types) == 0 {
apps, err := db.SearchApps(query, lang, domain)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
if len(fns) == 0 && len(types) == 0 && len(apps) == 0 {
fmt.Println("No results.")
return
}
@@ -174,6 +183,16 @@ func cmdSearch(args []string) {
fmt.Fprintf(w, "%s\t%s\t%s\n", t.Algebraic, t.ID, desc)
}
}
if len(apps) > 0 {
if len(fns) > 0 || len(types) > 0 {
fmt.Fprintln(w)
}
fmt.Fprintln(w, "APP\tID\tLANG\tDESCRIPTION")
for _, a := range apps {
desc := truncate(a.Description, 60)
fmt.Fprintf(w, "app\t%s\t%s\t%s\n", a.ID, a.Lang, desc)
}
}
w.Flush()
}
@@ -212,6 +231,12 @@ func cmdList(args []string) {
os.Exit(1)
}
apps, err := db.SearchApps("", lang, domain)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
if len(fns) > 0 {
fmt.Fprintln(w, "KIND\tID\tPURITY\tVERSION\tDOMAIN")
@@ -228,7 +253,16 @@ func cmdList(args []string) {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", t.Algebraic, t.ID, t.Version, t.Domain)
}
}
if len(fns) == 0 && len(types) == 0 {
if len(apps) > 0 {
if len(fns) > 0 || len(types) > 0 {
fmt.Fprintln(w)
}
fmt.Fprintln(w, "APP\tID\tLANG\tDOMAIN")
for _, a := range apps {
fmt.Fprintf(w, "app\t%s\t%s\t%s\n", a.ID, a.Lang, a.Domain)
}
}
if len(fns) == 0 && len(types) == 0 && len(apps) == 0 {
fmt.Println("Registry is empty. Run 'fn index' first.")
}
w.Flush()
@@ -258,6 +292,12 @@ func cmdShow(args []string) {
return
}
a, errA := db.GetApp(id)
if errA == nil {
printApp(a)
return
}
fmt.Fprintf(os.Stderr, "not found: %s\n", id)
os.Exit(1)
}
@@ -342,6 +382,34 @@ func printType(t *registry.Type) {
}
}
func printApp(a *registry.App) {
fmt.Printf("ID: %s\n", a.ID)
fmt.Printf("Name: %s\n", a.Name)
fmt.Printf("Lang: %s\n", a.Lang)
fmt.Printf("Domain: %s\n", a.Domain)
fmt.Printf("Description: %s\n", a.Description)
fmt.Printf("Tags: %s\n", strings.Join(a.Tags, ", "))
fmt.Printf("Dir: %s\n", a.DirPath)
if a.Framework != "" {
fmt.Printf("Framework: %s\n", a.Framework)
}
if a.EntryPoint != "" {
fmt.Printf("Entry point: %s\n", a.EntryPoint)
}
if len(a.UsesFunctions) > 0 {
fmt.Printf("Uses fns: %s\n", strings.Join(a.UsesFunctions, ", "))
}
if len(a.UsesTypes) > 0 {
fmt.Printf("Uses types: %s\n", strings.Join(a.UsesTypes, ", "))
}
if a.Notes != "" {
fmt.Printf("\nNotes:\n%s\n", a.Notes)
}
if a.Documentation != "" {
fmt.Printf("\nDocumentation:\n%s\n", a.Documentation)
}
}
// --- add ---
func cmdAdd(args []string) {
@@ -367,8 +435,10 @@ func cmdAdd(args []string) {
templatePath = filepath.Join(r, "docs", "templates", "pipeline.md")
case "component":
templatePath = filepath.Join(r, "docs", "templates", "component.md")
case "app":
templatePath = filepath.Join(r, "docs", "templates", "app.md")
default:
fmt.Fprintf(os.Stderr, "unknown kind: %s (use function, pipeline, or component)\n", kind)
fmt.Fprintf(os.Stderr, "unknown kind: %s (use function, pipeline, component, or app)\n", kind)
os.Exit(1)
}
+459
View File
@@ -0,0 +1,459 @@
package main
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"fn-registry/registry"
)
// pyParam represents a parsed parameter from a Python function signature.
type pyParam struct {
Name string
Type string // "str", "int", "bool", "dict", "list", "MetabaseClient", etc.
Default string // empty if required
IsKwargs bool // **kwargs
IsRegistry bool // type is a registry type (needs factory)
}
// pyFactory links a registry type to the function that creates it.
type pyFactory struct {
FuncID string
FuncName string
FilePath string // relative to registryRoot
Params []pyParam // factory's own params (all should be primitives)
}
// parsePySignature extracts parameters from a Python signature string like:
// "def func_name(client: MetabaseClient, name: str, count: int = 0) -> dict"
func parsePySignature(sig string) []pyParam {
// Extract the params part between ( and )
re := regexp.MustCompile(`\(([^)]*)\)`)
m := re.FindStringSubmatch(sig)
if len(m) < 2 {
return nil
}
raw := strings.TrimSpace(m[1])
if raw == "" {
return nil
}
// Split by comma, respecting nested brackets
parts := splitParams(raw)
var params []pyParam
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "" || part == "self" || part == "cls" {
continue
}
p := parseSingleParam(part)
params = append(params, p)
}
return params
}
// splitParams splits a comma-separated param string, respecting brackets.
func splitParams(s string) []string {
var parts []string
depth := 0
start := 0
for i, c := range s {
switch c {
case '[', '(':
depth++
case ']', ')':
depth--
case ',':
if depth == 0 {
parts = append(parts, s[start:i])
start = i + 1
}
}
}
parts = append(parts, s[start:])
return parts
}
// parseSingleParam parses "name: Type = default" or "**kwargs".
func parseSingleParam(s string) pyParam {
s = strings.TrimSpace(s)
if strings.HasPrefix(s, "**") {
return pyParam{Name: strings.TrimPrefix(s, "**"), IsKwargs: true}
}
p := pyParam{}
// Split on "=" for default value
if eqIdx := strings.Index(s, "="); eqIdx != -1 {
p.Default = strings.TrimSpace(s[eqIdx+1:])
s = strings.TrimSpace(s[:eqIdx])
}
// Split on ":" for type annotation
if colIdx := strings.Index(s, ":"); colIdx != -1 {
p.Name = strings.TrimSpace(s[:colIdx])
p.Type = strings.TrimSpace(s[colIdx+1:])
} else {
p.Name = s
}
return p
}
// isPrimitiveType checks if a Python type annotation is a primitive/builtin.
func isPrimitiveType(t string) bool {
t = strings.TrimSpace(t)
// Handle Optional, list[...], dict[...], etc.
base := t
if idx := strings.Index(t, "["); idx != -1 {
base = t[:idx]
}
// Handle "X | None"
if strings.Contains(base, "|") {
base = strings.TrimSpace(strings.Split(base, "|")[0])
}
switch strings.ToLower(base) {
case "str", "int", "float", "bool", "dict", "list", "tuple", "set",
"bytes", "none", "nonetype", "any", "optional":
return true
}
return false
}
// findFactory searches the registry for a function that returns the given type name.
// It looks for functions in the same language whose signature ends with "-> TypeName".
func findFactory(db *registry.DB, typeName, lang string) (*pyFactory, error) {
// Search for functions that return this type
pattern := "-> " + typeName
fns, err := db.SearchFunctions(typeName, "", "", lang, "")
if err != nil {
return nil, err
}
for _, fn := range fns {
if strings.Contains(fn.Signature, pattern) {
params := parsePySignature(fn.Signature)
// Factory should only have primitive params
allPrimitive := true
for _, p := range params {
if !isPrimitiveType(p.Type) && p.Type != "" {
allPrimitive = false
break
}
}
if allPrimitive {
return &pyFactory{
FuncID: fn.ID,
FuncName: fn.Name,
FilePath: fn.FilePath,
Params: params,
}, nil
}
}
}
return nil, fmt.Errorf("no factory function found that returns %s", typeName)
}
// paramToEnvVar converts a param name to an env var name.
// Convention: UPPER(type_name) + "_" + UPPER(param_name)
// e.g., for MetabaseClient factory: METABASE_BASE_URL, METABASE_EMAIL, METABASE_PASSWORD
func paramToEnvVar(typeName, paramName string) string {
// Extract prefix from type name: "MetabaseClient" -> "METABASE"
prefix := camelToUpperSnake(strings.TrimSuffix(typeName, "Client"))
return prefix + "_" + strings.ToUpper(paramName)
}
// camelToUpperSnake converts "MetabaseClient" -> "METABASE_CLIENT", "Metabase" -> "METABASE".
func camelToUpperSnake(s string) string {
re := regexp.MustCompile("([a-z])([A-Z])")
snake := re.ReplaceAllString(s, "${1}_${2}")
return strings.ToUpper(snake)
}
// generatePyRunner creates a temporary Python script that:
// 1. Imports the target function
// 2. Resolves registry type dependencies via factory functions + env vars
// 3. Parses CLI args for primitive parameters
// 4. Calls the function and prints result as JSON
func generatePyRunner(fn *registry.Function, db *registry.DB, registryRoot string) (string, error) {
params := parsePySignature(fn.Signature)
if params == nil {
// No params — simple call
return generateSimpleRunner(fn, registryRoot)
}
// Classify params
var factoryImports []string // import lines for factories
var factorySetup []string // code to create factory objects
var argLines []string // code to parse CLI args
var callArgs []string // arguments to pass to the function
cliArgIdx := 0
for _, p := range params {
if p.IsKwargs {
// Skip **kwargs for now — can't auto-resolve from CLI
continue
}
if p.Type != "" && !isPrimitiveType(p.Type) {
// Registry type — find factory
factory, err := findFactory(db, p.Type, fn.Lang)
if err != nil {
return "", fmt.Errorf("param %q of type %q: %w", p.Name, p.Type, err)
}
// Generate import for factory
factoryMod := filePathToModule(factory.FilePath)
factoryImports = append(factoryImports,
fmt.Sprintf("from %s import %s", factoryMod, factory.FuncName))
// Generate env var resolution for factory params
var factoryArgs []string
var envChecks []string
for _, fp := range factory.Params {
envName := paramToEnvVar(p.Type, fp.Name)
envChecks = append(envChecks, envName)
factoryArgs = append(factoryArgs,
fmt.Sprintf("os.environ[%q]", envName))
}
// Add check for missing env vars
factorySetup = append(factorySetup,
fmt.Sprintf("# Factory for %s: %s", p.Type, factory.FuncName))
factorySetup = append(factorySetup,
fmt.Sprintf("_missing = [k for k in %s if k not in os.environ]",
pythonList(envChecks)))
factorySetup = append(factorySetup,
fmt.Sprintf(`if _missing: sys.exit(f"error: missing env vars for %s: {', '.join(_missing)}")`,
p.Type))
factorySetup = append(factorySetup,
fmt.Sprintf("%s = %s(%s)", p.Name, factory.FuncName,
strings.Join(factoryArgs, ", ")))
callArgs = append(callArgs, p.Name)
} else {
// Primitive type — from CLI args
if p.Default != "" {
// Optional param with default
argLines = append(argLines,
fmt.Sprintf("%s = _args[%d] if len(_args) > %d else %s",
p.Name, cliArgIdx, cliArgIdx, convertDefault(p.Type, p.Default)))
argLines = append(argLines,
convertArg(p.Name, p.Type, true))
} else {
// Required param
argLines = append(argLines,
fmt.Sprintf("if len(_args) <= %d: sys.exit('error: missing required arg: %s (%s)')",
cliArgIdx, p.Name, p.Type))
argLines = append(argLines,
fmt.Sprintf("%s = _args[%d]", p.Name, cliArgIdx))
argLines = append(argLines,
convertArg(p.Name, p.Type, false))
}
callArgs = append(callArgs, p.Name)
cliArgIdx++
}
}
// Build the target function import
targetMod := filePathToModule(fn.FilePath)
targetImport := fmt.Sprintf("from %s import %s", targetMod, fn.Name)
// Assemble script
var sb strings.Builder
sb.WriteString("#!/usr/bin/env python3\n")
sb.WriteString("\"\"\"Auto-generated runner for fn run.\"\"\"\n")
sb.WriteString("import json, os, sys\n\n")
// Imports
sb.WriteString(targetImport + "\n")
for _, imp := range factoryImports {
sb.WriteString(imp + "\n")
}
sb.WriteString("\n")
// CLI args
sb.WriteString("_args = sys.argv[1:]\n\n")
// Factory setup (env vars → registry type instances)
if len(factorySetup) > 0 {
sb.WriteString("# --- resolve dependencies from env vars ---\n")
for _, line := range factorySetup {
sb.WriteString(line + "\n")
}
sb.WriteString("\n")
}
// Arg parsing
if len(argLines) > 0 {
sb.WriteString("# --- parse CLI args ---\n")
for _, line := range argLines {
sb.WriteString(line + "\n")
}
sb.WriteString("\n")
}
// Call
sb.WriteString("# --- execute ---\n")
sb.WriteString(fmt.Sprintf("_result = %s(%s)\n", fn.Name, strings.Join(callArgs, ", ")))
sb.WriteString("\n")
// Output
sb.WriteString("# --- output ---\n")
sb.WriteString("if _result is not None:\n")
sb.WriteString(" if isinstance(_result, (dict, list)):\n")
sb.WriteString(" print(json.dumps(_result, indent=2, default=str))\n")
sb.WriteString(" else:\n")
sb.WriteString(" print(_result)\n")
return sb.String(), nil
}
// generateSimpleRunner creates a runner for functions with no parameters.
func generateSimpleRunner(fn *registry.Function, _ string) (string, error) {
targetMod := filePathToModule(fn.FilePath)
var sb strings.Builder
sb.WriteString("#!/usr/bin/env python3\n")
sb.WriteString("import json\n\n")
sb.WriteString(fmt.Sprintf("from %s import %s\n\n", targetMod, fn.Name))
sb.WriteString(fmt.Sprintf("_result = %s()\n", fn.Name))
sb.WriteString("if _result is not None:\n")
sb.WriteString(" if isinstance(_result, (dict, list)):\n")
sb.WriteString(" print(json.dumps(_result, indent=2, default=str))\n")
sb.WriteString(" else:\n")
sb.WriteString(" print(_result)\n")
return sb.String(), nil
}
// filePathToModule converts "python/functions/metabase/databases.py" -> "metabase.databases".
func filePathToModule(filePath string) string {
// Strip "python/functions/" prefix
mod := filePath
mod = strings.TrimPrefix(mod, "python/functions/")
mod = strings.TrimSuffix(mod, ".py")
mod = strings.ReplaceAll(filepath.ToSlash(mod), "/", ".")
return mod
}
// convertArg generates Python code to convert a string arg to the right type.
func convertArg(name, typ string, _ bool) string {
switch strings.ToLower(typ) {
case "int":
return fmt.Sprintf("%s = int(%s)", name, name)
case "float":
return fmt.Sprintf("%s = float(%s)", name, name)
case "bool":
return fmt.Sprintf("%s = %s.lower() in ('true', '1', 'yes') if isinstance(%s, str) else %s",
name, name, name, name)
case "dict":
return fmt.Sprintf("%s = json.loads(%s) if isinstance(%s, str) else %s",
name, name, name, name)
case "list":
return fmt.Sprintf("%s = json.loads(%s) if isinstance(%s, str) else %s",
name, name, name, name)
case "bytes":
return fmt.Sprintf("%s = %s.encode('utf-8') if isinstance(%s, str) else %s",
name, name, name, name)
default:
// str or unknown — no conversion
return ""
}
}
// convertDefault ensures the default value is valid Python for the given type.
func convertDefault(_, def string) string {
// Most defaults from the signature are already valid Python
// Just handle the None case for Optional types
if def == "None" || def == "" {
return "None"
}
return def
}
// pythonList creates a Python list literal from strings: ["a", "b", "c"]
func pythonList(items []string) string {
quoted := make([]string, len(items))
for i, item := range items {
quoted[i] = fmt.Sprintf("%q", item)
}
return "[" + strings.Join(quoted, ", ") + "]"
}
// buildPyRunnerCommand creates the exec.Cmd that runs a generated Python runner script.
func buildPyRunnerCommand(fn *registry.Function, db *registry.DB, registryRoot string, args []string) (*exec.Cmd, error) {
// Check for __main__.py first — explicit package runner takes priority
dir := filepath.Join(registryRoot, filepath.Dir(fn.FilePath))
mainPy := filepath.Join(dir, "__main__.py")
if _, err := os.Stat(mainPy); err == nil {
return buildPyModuleCommand(fn, registryRoot, args)
}
// Generate runner script
script, err := generatePyRunner(fn, db, registryRoot)
if err != nil {
return nil, fmt.Errorf("generating runner: %w", err)
}
// Write to temp file
tmpFile, err := os.CreateTemp("", "fn_run_*.py")
if err != nil {
return nil, fmt.Errorf("creating temp file: %w", err)
}
if _, err := tmpFile.WriteString(script); err != nil {
tmpFile.Close()
os.Remove(tmpFile.Name())
return nil, err
}
tmpFile.Close()
// Log the generated script for debugging (only the runner path)
fmt.Fprintf(os.Stderr, "[fn run] generated runner: %s\n", tmpFile.Name())
// Build command
venvPython := filepath.Join(registryRoot, "python", ".venv", "bin", "python3")
pythonBin := "python3"
if _, err := os.Stat(venvPython); err == nil {
pythonBin = venvPython
}
pythonPath := filepath.Join(registryRoot, "python", "functions")
cmdArgs := append([]string{tmpFile.Name()}, args...)
cmd := exec.Command(pythonBin, cmdArgs...)
cmd.Dir = registryRoot
cmd.Env = append(os.Environ(), "PYTHONPATH="+pythonPath)
return cmd, nil
}
// buildPyModuleCommand runs a Python package with __main__.py (existing behavior).
func buildPyModuleCommand(fn *registry.Function, registryRoot string, args []string) (*exec.Cmd, error) {
venvPython := filepath.Join(registryRoot, "python", ".venv", "bin", "python3")
pythonBin := "python3"
if _, err := os.Stat(venvPython); err == nil {
pythonBin = venvPython
}
pythonPath := filepath.Join(registryRoot, "python", "functions")
absPath := filepath.Join(registryRoot, fn.FilePath)
dir := filepath.Dir(absPath)
relToRoot, _ := filepath.Rel(pythonPath, absPath)
modPath := strings.TrimSuffix(relToRoot, ".py")
modPath = strings.ReplaceAll(filepath.ToSlash(modPath), "/", ".")
// Check if it's a package directory with __main__.py — use package name
if _, err := os.Stat(filepath.Join(dir, "__main__.py")); err == nil {
modPath = strings.TrimSuffix(modPath, "."+filepath.Base(modPath))
}
cmdArgs := append([]string{"-m", modPath}, args...)
cmd := exec.Command(pythonBin, cmdArgs...)
cmd.Dir = pythonPath
cmd.Env = append(os.Environ(), "PYTHONPATH="+pythonPath)
return cmd, nil
}
+3 -43
View File
@@ -46,7 +46,7 @@ func cmdRun(args []string) {
os.Exit(1)
}
cmd, err := buildCommand(fn, registryRoot, absPath, passArgs)
cmd, err := buildCommand(fn, db, registryRoot, absPath, passArgs)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
@@ -93,12 +93,12 @@ func resolveFunction(db *registry.DB, idOrName string) (*registry.Function, erro
return nil, fmt.Errorf("%s", b.String())
}
func buildCommand(fn *registry.Function, registryRoot, absPath string, args []string) (*exec.Cmd, error) {
func buildCommand(fn *registry.Function, db *registry.DB, registryRoot, absPath string, args []string) (*exec.Cmd, error) {
switch fn.Lang {
case "go":
return buildGoCommand(fn, registryRoot, absPath, args)
case "py":
return buildPyCommand(registryRoot, absPath, args)
return buildPyRunnerCommand(fn, db, registryRoot, args)
case "bash":
return buildBashCommand(absPath, args)
case "ts":
@@ -147,46 +147,6 @@ func buildGoCommand(fn *registry.Function, registryRoot, absPath string, args []
return cmd, nil
}
func buildPyCommand(registryRoot, absPath string, args []string) (*exec.Cmd, error) {
venvPython := filepath.Join(registryRoot, "python", ".venv", "bin", "python3")
pythonBin := "python3"
if _, err := os.Stat(venvPython); err == nil {
pythonBin = venvPython
}
dir := filepath.Dir(absPath)
// If the file is inside a package (has __init__.py), use python -m
// so relative imports work. PYTHONPATH points to python/functions/ or
// the equivalent parent that contains the domain packages.
initPy := filepath.Join(dir, "__init__.py")
if _, err := os.Stat(initPy); err == nil {
// The pythonPath is the well-known python/functions/ directory
// which contains domain packages (metabase/, etc.)
pythonPath := filepath.Join(registryRoot, "python", "functions")
if _, err := os.Stat(pythonPath); os.IsNotExist(err) {
// Fallback: walk up from dir to find the parent of the top package
pythonPath = filepath.Dir(dir)
}
// Build module path: metabase/databases.py → metabase.databases
relToRoot, _ := filepath.Rel(pythonPath, absPath)
modPath := strings.TrimSuffix(relToRoot, ".py")
modPath = strings.ReplaceAll(filepath.ToSlash(modPath), "/", ".")
cmdArgs := append([]string{"-m", modPath}, args...)
cmd := exec.Command(pythonBin, cmdArgs...)
cmd.Dir = pythonPath
cmd.Env = append(os.Environ(), "PYTHONPATH="+pythonPath)
return cmd, nil
}
// Standalone script (no __init__.py)
cmdArgs := append([]string{absPath}, args...)
cmd := exec.Command(pythonBin, cmdArgs...)
cmd.Dir = dir
return cmd, nil
}
func buildBashCommand(absPath string, args []string) (*exec.Cmd, error) {
cmdArgs := append([]string{absPath}, args...)
+16
View File
@@ -0,0 +1,16 @@
---
name: my_app
lang: go
domain: tools
description: "Descripcion breve de la aplicacion."
tags: []
uses_functions: []
uses_types: []
framework: ""
entry_point: "main.go"
dir_path: "apps/my_app"
---
## Notas
Notas adicionales sobre la aplicacion.
+43
View File
@@ -0,0 +1,43 @@
"""Core functional programming utilities."""
from .core import (
all_of,
any_of,
chunk,
compose,
drop,
filter_list,
find,
find_index,
flat_map,
flatten,
group_by,
map_list,
partition,
pipe,
reduce_list,
take,
unique,
zip_with,
)
__all__ = [
"all_of",
"any_of",
"chunk",
"compose",
"drop",
"filter_list",
"find",
"find_index",
"flat_map",
"flatten",
"group_by",
"map_list",
"partition",
"pipe",
"reduce_list",
"take",
"unique",
"zip_with",
]
+32
View File
@@ -0,0 +1,32 @@
---
name: all_of
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def all_of(xs: list, pred: callable) -> bool"
description: "Retorna True si todos los elementos de la lista cumplen el predicado."
tags: [list, functional, predicate, all, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = all_of([2, 4, 6], lambda n: n % 2 == 0)
# True
```
## Notas
Funcion pura. Retorna True para lista vacia (vacuamente verdadero). Cortocircuita al primer False.
+32
View File
@@ -0,0 +1,32 @@
---
name: any_of
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def any_of(xs: list, pred: callable) -> bool"
description: "Retorna True si al menos un elemento de la lista cumple el predicado."
tags: [list, functional, predicate, any, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = any_of([1, 3, 5, 4], lambda n: n % 2 == 0)
# True
```
## Notas
Funcion pura. Retorna False para lista vacia. Cortocircuita al primer True.
+32
View File
@@ -0,0 +1,32 @@
---
name: chunk
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def chunk(xs: list, size: int) -> list"
description: "Divide una lista en sublistas de tamanio fijo. El ultimo chunk puede ser menor."
tags: [list, functional, chunk, partition, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = chunk([1, 2, 3, 4, 5], 2)
# [[1, 2], [3, 4], [5]]
```
## Notas
Funcion pura. Si size <= 0 retorna lista vacia.
+33
View File
@@ -0,0 +1,33 @@
---
name: compose
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def compose(*fns) -> callable"
description: "Compone funciones de derecha a izquierda. compose(f, g)(x) == f(g(x))."
tags: [functional, compose, composition, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
double_then_str = compose(str, lambda n: n * 2)
result = double_then_str(5)
# "10"
```
## Notas
Funcion pura. Composicion matematica: la ultima funcion se aplica primero. Inverso de pipe.
+135
View File
@@ -0,0 +1,135 @@
"""Core functional programming utilities — pure functions for list/collection operations."""
from functools import reduce as _reduce
from typing import Any, Callable, Dict, List, Tuple
def filter_list(xs: list, pred: Callable) -> list:
"""Filter list by predicate. Does not mutate the original."""
return [x for x in xs if pred(x)]
def map_list(xs: list, fn: Callable) -> list:
"""Map function over list. Does not mutate the original."""
return [fn(x) for x in xs]
def reduce_list(xs: list, initial: Any, fn: Callable) -> Any:
"""Reduce list with accumulator. fn(acc, x) -> acc."""
return _reduce(fn, xs, initial)
def flat_map(xs: list, fn: Callable) -> list:
"""Map function over list then flatten one level."""
result = []
for x in xs:
result.extend(fn(x))
return result
def flatten(xss: list) -> list:
"""Flatten a list of lists one level."""
result = []
for xs in xss:
result.extend(xs)
return result
def chunk(xs: list, size: int) -> list:
"""Split list into chunks of given size. Last chunk may be smaller."""
if size <= 0:
return []
return [xs[i : i + size] for i in range(0, len(xs), size)]
def take(xs: list, n: int) -> list:
"""Take first n elements from list."""
return xs[:n]
def drop(xs: list, n: int) -> list:
"""Drop first n elements from list."""
return xs[n:]
def unique(xs: list) -> list:
"""Remove duplicates preserving order. Uses identity for hashable elements."""
seen = set()
result = []
for x in xs:
if x not in seen:
seen.add(x)
result.append(x)
return result
def group_by(xs: list, key_fn: Callable) -> Dict:
"""Group elements by key function. Returns dict of key -> list."""
groups: Dict = {}
for x in xs:
k = key_fn(x)
if k not in groups:
groups[k] = []
groups[k].append(x)
return groups
def partition(xs: list, pred: Callable) -> Tuple[list, list]:
"""Split list into (matches, non_matches) based on predicate."""
matches = []
non_matches = []
for x in xs:
if pred(x):
matches.append(x)
else:
non_matches.append(x)
return (matches, non_matches)
def find(xs: list, pred: Callable) -> Any:
"""Find first element matching predicate. Returns None if not found."""
for x in xs:
if pred(x):
return x
return None
def find_index(xs: list, pred: Callable) -> int:
"""Find index of first element matching predicate. Returns -1 if not found."""
for i, x in enumerate(xs):
if pred(x):
return i
return -1
def zip_with(xs: list, ys: list, fn: Callable) -> list:
"""Zip two lists with a combining function. Stops at shorter list."""
return [fn(x, y) for x, y in zip(xs, ys)]
def all_of(xs: list, pred: Callable) -> bool:
"""Return True if all elements match predicate."""
return all(pred(x) for x in xs)
def any_of(xs: list, pred: Callable) -> bool:
"""Return True if any element matches predicate."""
return any(pred(x) for x in xs)
def pipe(value: Any, *fns: Callable) -> Any:
"""Pipe a value through a sequence of functions left-to-right."""
result = value
for fn in fns:
result = fn(result)
return result
def compose(*fns: Callable) -> Callable:
"""Compose functions right-to-left. compose(f, g)(x) == f(g(x))."""
def composed(x: Any) -> Any:
result = x
for fn in reversed(fns):
result = fn(result)
return result
return composed
+32
View File
@@ -0,0 +1,32 @@
---
name: drop
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def drop(xs: list, n: int) -> list"
description: "Descarta los primeros n elementos de una lista."
tags: [list, functional, drop, slice, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = drop([1, 2, 3, 4, 5], 2)
# [3, 4, 5]
```
## Notas
Funcion pura. Si n > len(xs), retorna lista vacia. No muta la original.
+32
View File
@@ -0,0 +1,32 @@
---
name: filter_list
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def filter_list(xs: list, pred: callable) -> list"
description: "Filtra una lista aplicando un predicado sin mutar la original."
tags: [list, functional, filter, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
evens = filter_list([1, 2, 3, 4], lambda n: n % 2 == 0)
# [2, 4]
```
## Notas
Funcion pura. No muta la lista original. Equivalente a `[x for x in xs if pred(x)]`.
+32
View File
@@ -0,0 +1,32 @@
---
name: find
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def find(xs: list, pred: callable)"
description: "Encuentra el primer elemento que cumple el predicado. Retorna None si no hay coincidencia."
tags: [list, functional, find, search, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = find([1, 2, 3, 4], lambda n: n > 2)
# 3
```
## Notas
Funcion pura. Retorna None si ningun elemento cumple el predicado. Cortocircuita al primer match.
+32
View File
@@ -0,0 +1,32 @@
---
name: find_index
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def find_index(xs: list, pred: callable) -> int"
description: "Encuentra el indice del primer elemento que cumple el predicado. Retorna -1 si no hay coincidencia."
tags: [list, functional, find, index, search, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
idx = find_index([10, 20, 30, 40], lambda n: n > 25)
# 2
```
## Notas
Funcion pura. Retorna -1 si ningun elemento cumple el predicado.
+32
View File
@@ -0,0 +1,32 @@
---
name: flat_map
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def flat_map(xs: list, fn: callable) -> list"
description: "Aplica una funcion que retorna listas a cada elemento y aplana el resultado un nivel."
tags: [list, functional, flatmap, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = flat_map([1, 2, 3], lambda n: [n, n * 10])
# [1, 10, 2, 20, 3, 30]
```
## Notas
Funcion pura. Equivalente a flatten(map_list(xs, fn)). Solo aplana un nivel.
+32
View File
@@ -0,0 +1,32 @@
---
name: flatten
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def flatten(xss: list) -> list"
description: "Aplana una lista de listas un nivel, concatenando las sublistas."
tags: [list, functional, flatten, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = flatten([[1, 2], [3], [4, 5]])
# [1, 2, 3, 4, 5]
```
## Notas
Funcion pura. Solo aplana un nivel de anidamiento.
+32
View File
@@ -0,0 +1,32 @@
---
name: group_by
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def group_by(xs: list, key_fn: callable) -> dict"
description: "Agrupa elementos de una lista por una funcion clave. Retorna dict de clave a lista."
tags: [list, functional, group, classify, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = group_by(["hi", "hey", "bye"], lambda s: s[0])
# {"h": ["hi", "hey"], "b": ["bye"]}
```
## Notas
Funcion pura. El orden de los elementos dentro de cada grupo se preserva.
+32
View File
@@ -0,0 +1,32 @@
---
name: map_list
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def map_list(xs: list, fn: callable) -> list"
description: "Aplica una funcion a cada elemento de una lista, retornando una nueva lista."
tags: [list, functional, map, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
doubled = map_list([1, 2, 3], lambda n: n * 2)
# [2, 4, 6]
```
## Notas
Funcion pura. No muta la lista original. Equivalente a `[fn(x) for x in xs]`.
+32
View File
@@ -0,0 +1,32 @@
---
name: partition
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def partition(xs: list, pred: callable) -> tuple"
description: "Divide una lista en dos: (elementos que cumplen el predicado, elementos que no)."
tags: [list, functional, partition, split, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
evens, odds = partition([1, 2, 3, 4, 5], lambda n: n % 2 == 0)
# evens = [2, 4], odds = [1, 3, 5]
```
## Notas
Funcion pura. Retorna tupla de dos listas: (matches, non_matches).
+36
View File
@@ -0,0 +1,36 @@
---
name: pipe
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def pipe(value, *fns)"
description: "Pasa un valor a traves de una secuencia de funciones de izquierda a derecha."
tags: [functional, pipe, composition, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = pipe(
[1, 2, 3, 4, 5],
lambda xs: filter_list(xs, lambda n: n > 2),
lambda xs: map_list(xs, lambda n: n * 10),
)
# [30, 40, 50]
```
## Notas
Funcion pura. Ejecuta las funciones en orden de izquierda a derecha: f1(value), luego f2(result), etc.
+32
View File
@@ -0,0 +1,32 @@
---
name: reduce_list
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def reduce_list(xs: list, initial, fn: callable)"
description: "Reduce una lista con un acumulador y una funcion binaria fn(acc, x)."
tags: [list, functional, reduce, fold, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
total = reduce_list([1, 2, 3, 4], 0, lambda acc, x: acc + x)
# 10
```
## Notas
Funcion pura. Usa functools.reduce internamente. El valor inicial es obligatorio para evitar errores con listas vacias.
+32
View File
@@ -0,0 +1,32 @@
---
name: take
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def take(xs: list, n: int) -> list"
description: "Toma los primeros n elementos de una lista."
tags: [list, functional, take, slice, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = take([1, 2, 3, 4, 5], 3)
# [1, 2, 3]
```
## Notas
Funcion pura. Si n > len(xs), retorna toda la lista. No muta la original.
+32
View File
@@ -0,0 +1,32 @@
---
name: unique
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def unique(xs: list) -> list"
description: "Elimina duplicados de una lista preservando el orden de aparicion."
tags: [list, functional, unique, deduplicate, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = unique([1, 2, 2, 3, 1, 4])
# [1, 2, 3, 4]
```
## Notas
Funcion pura. Preserva el orden de la primera aparicion. Requiere elementos hashables.
+32
View File
@@ -0,0 +1,32 @@
---
name: zip_with
kind: function
lang: py
domain: core
version: "1.0.0"
purity: pure
signature: "def zip_with(xs: list, ys: list, fn: callable) -> list"
description: "Combina dos listas elemento a elemento con una funcion. Se detiene en la mas corta."
tags: [list, functional, zip, combine, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/core/core.py"
---
## Ejemplo
```python
result = zip_with([1, 2, 3], [10, 20, 30], lambda a, b: a + b)
# [11, 22, 33]
```
## Notas
Funcion pura. Se detiene en la lista mas corta, como zip() de Python.
@@ -0,0 +1,25 @@
from .cybersecurity import (
hash_sha256,
hash_md5,
entropy_shannon,
detect_sql_injection,
extract_urls,
is_base64,
is_hex,
levenshtein_distance,
jaccard_similarity,
normalize_url,
)
__all__ = [
"hash_sha256",
"hash_md5",
"entropy_shannon",
"detect_sql_injection",
"extract_urls",
"is_base64",
"is_hex",
"levenshtein_distance",
"jaccard_similarity",
"normalize_url",
]
@@ -0,0 +1,167 @@
"""Cybersecurity pure functions: hashing, parsing, and security utilities."""
import hashlib
import math
import re
import base64
from collections import Counter
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
def hash_sha256(data: bytes) -> str:
"""Calcula el hash SHA-256 de datos binarios. Retorna hex digest."""
return hashlib.sha256(data).hexdigest()
def hash_md5(data: bytes) -> str:
"""Calcula el hash MD5 de datos binarios. Retorna hex digest."""
return hashlib.md5(data).hexdigest()
def entropy_shannon(data: bytes) -> float:
"""Calcula la entropia de Shannon de datos binarios (0-8 bits por byte).
Entropia alta (>7.5) sugiere datos cifrados o comprimidos.
Entropia baja (<3) sugiere datos estructurados o repetitivos.
"""
if not data:
return 0.0
length = len(data)
counts = Counter(data)
entropy = 0.0
for count in counts.values():
p = count / length
if p > 0:
entropy -= p * math.log2(p)
return entropy
_SQL_INJECTION_PATTERNS = [
(r"('\s*OR\s+'[^']*'\s*=\s*'[^']*'?)", "string_tautology"),
(r"('\s*(OR|AND)\s+'?\d+\s*=\s*\d+)", "tautology"),
(r"(;\s*(DROP|DELETE|UPDATE|INSERT)\b)", "stacked_query"),
(r"(UNION\s+(ALL\s+)?SELECT)", "union_select"),
(r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|EXEC)\b\s)", "sql_keyword"),
(r"(--\s*$|/\*|\*/)", "comment_injection"),
(r"(BENCHMARK\s*\(|SLEEP\s*\(|WAITFOR\s+DELAY)", "time_based"),
(r"(CHAR\s*\(\s*\d+)", "char_function"),
(r"(CONCAT\s*\()", "concat_function"),
(r"(0x[0-9a-fA-F]{4,})", "hex_literal"),
]
def detect_sql_injection(input_str: str) -> tuple:
"""Detecta patrones de SQL injection en un string.
Retorna (is_threat, pattern) donde pattern es el nombre del patron
detectado o cadena vacia si no hay amenaza.
"""
for pattern, name in _SQL_INJECTION_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
return (True, name)
return (False, "")
_URL_REGEX = re.compile(
r"https?://[^\s<>\"'\)\]]+",
re.IGNORECASE,
)
def extract_urls(text: str) -> list:
"""Extrae todas las URLs (http/https) de un texto."""
return _URL_REGEX.findall(text)
def is_base64(s: str) -> bool:
"""Verifica si un string es base64 valido.
Acepta base64 estandar y URL-safe. Requiere al menos 4 caracteres.
"""
if len(s) < 4:
return False
b64_pattern = re.compile(r"^[A-Za-z0-9+/\-_]*={0,2}$")
if not b64_pattern.match(s):
return False
try:
decoded = base64.b64decode(s, validate=True)
return len(decoded) > 0
except Exception:
try:
decoded = base64.urlsafe_b64decode(s)
return len(decoded) > 0
except Exception:
return False
def is_hex(s: str) -> bool:
"""Verifica si un string es hexadecimal valido.
Acepta con o sin prefijo 0x. Requiere al menos 2 caracteres (sin prefijo).
"""
clean = s.strip()
if clean.startswith(("0x", "0X")):
clean = clean[2:]
if len(clean) < 2:
return False
return bool(re.fullmatch(r"[0-9a-fA-F]+", clean))
def levenshtein_distance(a: str, b: str) -> int:
"""Calcula la distancia de Levenshtein (edit distance) entre dos strings.
Util para deteccion de typosquatting en dominios y fuzzy matching.
"""
if len(a) < len(b):
return levenshtein_distance(b, a)
if len(b) == 0:
return len(a)
prev_row = list(range(len(b) + 1))
for i, ca in enumerate(a):
curr_row = [i + 1]
for j, cb in enumerate(b):
cost = 0 if ca == cb else 1
curr_row.append(
min(
curr_row[j] + 1, # insert
prev_row[j + 1] + 1, # delete
prev_row[j] + cost, # substitute
)
)
prev_row = curr_row
return prev_row[-1]
def jaccard_similarity(a: list, b: list) -> float:
"""Calcula el coeficiente de similitud de Jaccard entre dos listas.
J(A,B) = |A interseccion B| / |A union B|. Retorna 0.0 si ambas vacias.
Util para comparar conjuntos de tokens, features, o IoCs.
"""
set_a = set(a)
set_b = set(b)
if not set_a and not set_b:
return 0.0
intersection = set_a & set_b
union = set_a | set_b
return len(intersection) / len(union)
def normalize_url(raw_url: str) -> str:
"""Normaliza una URL: lowercase del host, elimina fragmentos, ordena parametros.
Util para deduplicacion de URLs y comparacion de IoCs.
"""
parsed = urlparse(raw_url)
scheme = parsed.scheme.lower() or "http"
netloc = parsed.netloc.lower()
path = parsed.path or "/"
# Remove trailing slash except for root
if path != "/" and path.endswith("/"):
path = path.rstrip("/")
# Sort query parameters
params = parse_qs(parsed.query, keep_blank_values=True)
sorted_query = urlencode(sorted(params.items()), doseq=True)
# Drop fragment
return urlunparse((scheme, netloc, path, parsed.params, sorted_query, ""))
@@ -0,0 +1,38 @@
---
name: detect_sql_injection
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def detect_sql_injection(input_str: str) -> tuple"
description: "Detecta patrones de SQL injection en un string. Retorna (is_threat, pattern) con el nombre del patron detectado."
tags: [sqli, injection, detection, security, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
detect_sql_injection("' OR '1'='1")
# (True, "string_tautology")
detect_sql_injection("; DROP TABLE users")
# (True, "stacked_query")
detect_sql_injection("hello world")
# (False, "")
```
## Notas
Detecta 10 patrones: sql_keyword, tautology, stacked_query, comment_injection, string_tautology, union_select, hex_literal, char_function, concat_function, time_based. No reemplaza un WAF pero es util para logging y alertas tempranas.
@@ -0,0 +1,41 @@
---
name: entropy_shannon
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def entropy_shannon(data: bytes) -> float"
description: "Calcula la entropia de Shannon de datos binarios (0-8 bits por byte). Util para detectar datos cifrados o comprimidos."
tags: [entropy, shannon, analysis, crypto, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math, collections]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
# Datos aleatorios (alta entropia)
entropy_shannon(bytes(range(256)))
# ~8.0
# Datos repetitivos (baja entropia)
entropy_shannon(b"aaaaaaaaaa")
# 0.0
# Texto normal
entropy_shannon(b"hello world")
# ~2.84
```
## Notas
Entropia alta (>7.5) sugiere datos cifrados o comprimidos. Entropia baja (<3) sugiere datos estructurados o repetitivos. Retorna 0.0 para datos vacios.
@@ -0,0 +1,35 @@
---
name: extract_urls
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def extract_urls(text: str) -> list"
description: "Extrae todas las URLs (http/https) de un texto. Util para analisis de IoCs y threat intelligence."
tags: [url, extract, parsing, ioc, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
extract_urls("Visit https://example.com and http://test.org/path?q=1")
# ["https://example.com", "http://test.org/path?q=1"]
extract_urls("no urls here")
# []
```
## Notas
Usa regex para extraer URLs con esquema http/https. No valida que las URLs sean alcanzables. Util para extraer indicadores de compromiso (IoCs) de logs, emails o reportes de threat intelligence.
@@ -0,0 +1,32 @@
---
name: hash_md5
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def hash_md5(data: bytes) -> str"
description: "Calcula el hash MD5 de datos binarios. Retorna hex digest."
tags: [hash, md5, crypto, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [hashlib]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
h = hash_md5(b"hello")
# "5d41402abc4b2a76b9719d911017c592"
```
## Notas
Usa hashlib de stdlib. MD5 no es seguro para propositos criptograficos pero es util para checksums, fingerprinting de archivos e identificacion rapida de IoCs.
@@ -0,0 +1,32 @@
---
name: hash_sha256
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def hash_sha256(data: bytes) -> str"
description: "Calcula el hash SHA-256 de datos binarios. Retorna hex digest."
tags: [hash, sha256, crypto, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [hashlib]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
h = hash_sha256(b"hello")
# "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
```
## Notas
Usa hashlib de stdlib. Funcion pura sin side effects. SHA-256 produce un digest de 64 caracteres hexadecimales (256 bits).
@@ -0,0 +1,38 @@
---
name: is_base64
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def is_base64(s: str) -> bool"
description: "Verifica si un string es base64 valido. Acepta base64 estandar y URL-safe. Requiere minimo 4 caracteres."
tags: [base64, validation, encoding, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re, base64]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
is_base64("aGVsbG8=")
# True
is_base64("not!valid")
# False
is_base64("ab")
# False (menos de 4 caracteres)
```
## Notas
Verifica tanto el formato (regex) como que el decode sea exitoso. Util para detectar datos codificados en payloads sospechosos, headers HTTP o parametros de URL.
+41
View File
@@ -0,0 +1,41 @@
---
name: is_hex
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def is_hex(s: str) -> bool"
description: "Verifica si un string es hexadecimal valido. Acepta con o sin prefijo 0x. Requiere minimo 2 caracteres."
tags: [hex, validation, encoding, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [re]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
is_hex("4a6f686e")
# True
is_hex("0x4a6f686e")
# True
is_hex("xyz")
# False
is_hex("a")
# False (menos de 2 caracteres)
```
## Notas
Util para validar hashes, direcciones de memoria, shellcode y otros datos hexadecimales en contexto de seguridad.
@@ -0,0 +1,38 @@
---
name: jaccard_similarity
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def jaccard_similarity(a: list, b: list) -> float"
description: "Calcula el coeficiente de similitud de Jaccard entre dos listas. J(A,B) = |A interseccion B| / |A union B|."
tags: [jaccard, similarity, comparison, sets, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
jaccard_similarity(["a", "b", "c"], ["b", "c", "d"])
# 0.5
jaccard_similarity(["a", "b"], ["a", "b"])
# 1.0
jaccard_similarity([], [])
# 0.0
```
## Notas
Convierte las listas a sets internamente. Retorna 0.0 si ambas listas son vacias. Util para comparar conjuntos de tokens, features de malware, IoCs compartidos entre muestras, o tags de vulnerabilidades.
@@ -0,0 +1,38 @@
---
name: levenshtein_distance
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def levenshtein_distance(a: str, b: str) -> int"
description: "Calcula la distancia de Levenshtein (edit distance) entre dos strings. Util para deteccion de typosquatting en dominios."
tags: [levenshtein, distance, fuzzy, typosquatting, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
levenshtein_distance("google.com", "gooogle.com")
# 1
levenshtein_distance("paypal.com", "paypa1.com")
# 1
levenshtein_distance("abc", "abc")
# 0
```
## Notas
Implementacion O(n*m) con optimizacion de espacio (dos filas). Sin dependencias externas. Util para detectar dominios de typosquatting comparando contra dominios legitimos conocidos.
@@ -0,0 +1,35 @@
---
name: normalize_url
kind: function
lang: py
domain: cybersecurity
version: "1.0.0"
purity: pure
signature: "def normalize_url(raw_url: str) -> str"
description: "Normaliza una URL: lowercase del host, elimina fragmentos, ordena parametros. Util para deduplicacion de IoCs."
tags: [url, normalize, ioc, dedup, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [urllib.parse]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/cybersecurity/cybersecurity.py"
---
## Ejemplo
```python
normalize_url("HTTPS://Example.COM/path?b=2&a=1#frag")
# "https://example.com/path?a=1&b=2"
normalize_url("http://test.org/path/")
# "http://test.org/path"
```
## Notas
Operaciones de normalizacion: lowercase de scheme y host, eliminacion de trailing slash (excepto root), ordenamiento alfabetico de query parameters, eliminacion de fragmentos. Usa urllib.parse de stdlib.
+25
View File
@@ -0,0 +1,25 @@
from .datascience import (
pearson,
standardize,
min_max_scale,
clip,
detect_outliers,
impute,
histogram,
rolling_window,
autocorrelation,
linspace,
)
__all__ = [
"pearson",
"standardize",
"min_max_scale",
"clip",
"detect_outliers",
"impute",
"histogram",
"rolling_window",
"autocorrelation",
"linspace",
]
@@ -0,0 +1,32 @@
---
name: autocorrelation
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def autocorrelation(data: list, lag: int) -> float"
description: "Calcula la autocorrelacion de una serie temporal para un lag dado."
tags: [statistics, timeseries, correlation, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
autocorrelation([1, 2, 3, 4, 5, 4, 3, 2, 1], 1)
# ~0.489
```
## Notas
Autocorrelacion normalizada por la varianza. Retorna 0.0 si lag es invalido o la varianza es cero.
+32
View File
@@ -0,0 +1,32 @@
---
name: clip
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def clip(data: list, lo: float, hi: float) -> list"
description: "Recorta los valores de la lista al rango [lo, hi]."
tags: [clipping, bounds, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
clip([1, 5, 10, -3], 0, 7)
# [1, 5, 7, 0]
```
## Notas
Funcion pura sin dependencias. Equivalente a numpy.clip pero sin numpy.
+123
View File
@@ -0,0 +1,123 @@
"""Pure datascience utilities — statistics and numerical functions.
Uses only math stdlib. No external dependencies.
"""
import math
def pearson(xs: list, ys: list) -> float:
"""Pearson correlation coefficient between two lists of floats."""
n = len(xs)
if n != len(ys) or n == 0:
return 0.0
mean_x = sum(xs) / n
mean_y = sum(ys) / n
num = sum((x - mean_x) * (y - mean_y) for x, y in zip(xs, ys))
den_x = math.sqrt(sum((x - mean_x) ** 2 for x in xs))
den_y = math.sqrt(sum((y - mean_y) ** 2 for y in ys))
if den_x == 0.0 or den_y == 0.0:
return 0.0
return num / (den_x * den_y)
def standardize(data: list) -> list:
"""Z-score standardization (mean=0, std=1)."""
n = len(data)
if n == 0:
return []
mean = sum(data) / n
std = math.sqrt(sum((x - mean) ** 2 for x in data) / n)
if std == 0.0:
return [0.0] * n
return [(x - mean) / std for x in data]
def min_max_scale(data: list) -> list:
"""Scale values to [0, 1] range."""
if not data:
return []
lo = min(data)
hi = max(data)
if hi == lo:
return [0.0] * len(data)
return [(x - lo) / (hi - lo) for x in data]
def clip(data: list, lo: float, hi: float) -> list:
"""Clip values to [lo, hi]."""
return [max(lo, min(hi, x)) for x in data]
def detect_outliers(data: list, threshold: float) -> list:
"""Returns list of bools, True where |z-score| > threshold."""
n = len(data)
if n == 0:
return []
mean = sum(data) / n
std = math.sqrt(sum((x - mean) ** 2 for x in data) / n)
if std == 0.0:
return [False] * n
return [abs((x - mean) / std) > threshold for x in data]
def impute(data: list) -> list:
"""Replace None/NaN with mean of non-null values."""
valid = [x for x in data if x is not None and not (isinstance(x, float) and math.isnan(x))]
if not valid:
return [0.0] * len(data)
mean = sum(valid) / len(valid)
return [
mean if (x is None or (isinstance(x, float) and math.isnan(x))) else x
for x in data
]
def histogram(data: list, buckets: int) -> list:
"""Returns list of counts per bucket."""
if not data or buckets <= 0:
return []
lo = min(data)
hi = max(data)
if hi == lo:
counts = [0] * buckets
counts[0] = len(data)
return counts
width = (hi - lo) / buckets
counts = [0] * buckets
for x in data:
idx = int((x - lo) / width)
if idx >= buckets:
idx = buckets - 1
counts[idx] += 1
return counts
def rolling_window(xs: list, size: int) -> list:
"""Returns list of sublists (sliding windows of given size)."""
if size <= 0 or size > len(xs):
return []
return [xs[i : i + size] for i in range(len(xs) - size + 1)]
def autocorrelation(data: list, lag: int) -> float:
"""Autocorrelation at given lag."""
n = len(data)
if lag < 0 or lag >= n or n == 0:
return 0.0
mean = sum(data) / n
var = sum((x - mean) ** 2 for x in data) / n
if var == 0.0:
return 0.0
cov = sum((data[i] - mean) * (data[i + lag] - mean) for i in range(n - lag)) / n
return cov / var
def linspace(start: float, stop: float, num: int) -> list:
"""Generate evenly spaced values from start to stop (inclusive)."""
if num <= 0:
return []
if num == 1:
return [start]
step = (stop - start) / (num - 1)
return [start + i * step for i in range(num)]
@@ -0,0 +1,32 @@
---
name: detect_outliers
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def detect_outliers(data: list, threshold: float) -> list"
description: "Detecta outliers por z-score. Retorna lista de bools, True donde |z-score| > threshold."
tags: [statistics, outliers, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
detect_outliers([1, 2, 3, 100, 2, 3], 2.0)
# [False, False, False, True, False, False]
```
## Notas
Usa z-score poblacional. Threshold tipico: 2.0 o 3.0. Si la desviacion es cero, no hay outliers.
+32
View File
@@ -0,0 +1,32 @@
---
name: histogram
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def histogram(data: list, buckets: int) -> list"
description: "Calcula histograma con N buckets. Retorna lista de conteos por bucket."
tags: [statistics, histogram, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
histogram([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 5)
# [2, 2, 2, 2, 2]
```
## Notas
Los buckets cubren el rango [min, max] uniformemente. El ultimo bucket incluye el valor maximo. Si todos los valores son iguales, todos caen en el primer bucket.
+32
View File
@@ -0,0 +1,32 @@
---
name: impute
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def impute(data: list) -> list"
description: "Reemplaza None y NaN con la media de los valores validos."
tags: [imputation, missing, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
impute([1.0, None, 3.0, float('nan'), 5.0])
# [1.0, 3.0, 3.0, 3.0, 5.0]
```
## Notas
Detecta tanto None como float('nan'). Si no hay valores validos, rellena con 0.0.
+32
View File
@@ -0,0 +1,32 @@
---
name: linspace
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def linspace(start: float, stop: float, num: int) -> list"
description: "Genera una lista de valores equiespaciados entre start y stop (inclusivos)."
tags: [numeric, range, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
linspace(0, 1, 5)
# [0.0, 0.25, 0.5, 0.75, 1.0]
```
## Notas
Equivalente a numpy.linspace pero sin numpy. Si num=1, retorna [start]. Si num<=0, retorna lista vacia.
@@ -0,0 +1,32 @@
---
name: min_max_scale
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def min_max_scale(data: list) -> list"
description: "Escala los valores al rango [0, 1] usando min-max normalization."
tags: [normalization, scaling, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
min_max_scale([2, 4, 6, 8, 10])
# [0.0, 0.25, 0.5, 0.75, 1.0]
```
## Notas
Si todos los valores son iguales, retorna lista de ceros. No requiere imports externos.
+32
View File
@@ -0,0 +1,32 @@
---
name: pearson
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def pearson(xs: list, ys: list) -> float"
description: "Calcula el coeficiente de correlacion de Pearson entre dos listas de floats."
tags: [statistics, correlation, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
r = pearson([1, 2, 3], [2, 4, 6])
# r = 1.0
```
## Notas
Usa solo math stdlib. No requiere numpy. Retorna 0.0 si las listas tienen longitud diferente, estan vacias, o la desviacion es cero.
@@ -0,0 +1,32 @@
---
name: rolling_window
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def rolling_window(xs: list, size: int) -> list"
description: "Genera ventanas deslizantes de tamanio fijo sobre una lista."
tags: [windowing, timeseries, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
rolling_window([1, 2, 3, 4, 5], 3)
# [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
```
## Notas
Retorna lista vacia si size <= 0 o size > len(xs). Util para calcular medias moviles u otras metricas sobre ventanas.
@@ -0,0 +1,32 @@
---
name: standardize
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def standardize(data: list) -> list"
description: "Estandarizacion Z-score: transforma los datos a media=0 y desviacion=1."
tags: [statistics, normalization, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/datascience.py"
---
## Ejemplo
```python
standardize([10, 20, 30])
# [-1.2247..., 0.0, 1.2247...]
```
## Notas
Si la desviacion estandar es cero, retorna lista de ceros. Usa desviacion poblacional (N, no N-1).
@@ -0,0 +1,35 @@
---
name: annualized_volatility
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def annualized_volatility(returns: list, periods_per_year: float) -> float"
description: "Calcula la volatilidad anualizada de una serie de retornos."
tags: [finance, volatility, risk, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
daily_returns = [0.01, -0.005, 0.008, 0.003, -0.002, 0.006, 0.004]
vol = annualized_volatility(daily_returns, 252.0)
# Volatilidad anualizada (std * sqrt(252))
```
## Notas
Formula: std_muestral(returns) * sqrt(periods_per_year).
Usa desviacion estandar muestral (n-1) para ser consistente con la practica financiera.
Retorna 0.0 si hay menos de 2 retornos o periods_per_year es menor o igual a cero.
@@ -0,0 +1,35 @@
---
name: bollinger_bands
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def bollinger_bands(data: list, period: int, num_std: float) -> tuple"
description: "Calcula las Bandas de Bollinger (upper, middle, lower) de una serie de precios."
tags: [finance, bollinger, volatility, indicator, python]
uses_functions: [sma_py_finance]
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
prices = [10, 11, 12, 13, 14, 15, 14, 13, 12, 11]
upper, middle, lower = bollinger_bands(prices, 5, 2.0)
# middle es la SMA(5), upper/lower son middle +/- 2*std
```
## Notas
Retorna tupla de tres listas (upper, middle, lower). Cada lista tiene len(data) - period + 1 elementos.
La desviacion estandar se calcula sobre la ventana de tamanio period (poblacional, no muestral).
Usa internamente la funcion sma para la banda media.
+34
View File
@@ -0,0 +1,34 @@
---
name: ema
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def ema(data: list, period: int) -> list"
description: "Calcula la media movil exponencial (EMA) de una serie de precios."
tags: [finance, moving-average, exponential, indicator, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
prices = [10, 11, 12, 13, 14, 15]
result = ema(prices, 3)
# [11.0, 11.5, 12.25, 13.125]
```
## Notas
El primer valor de la EMA es el SMA del primer periodo. El multiplicador es 2 / (period + 1).
Retorna len(data) - period + 1 elementos. Lista vacia si period invalido.
+137
View File
@@ -0,0 +1,137 @@
"""Finance domain — pure functions for financial indicators and calculations."""
import math
def sma(data: list, period: int) -> list:
"""Calcula la media movil simple (SMA) de una serie de precios."""
if period <= 0 or period > len(data):
return []
result = []
for i in range(period - 1, len(data)):
window = data[i - period + 1 : i + 1]
result.append(sum(window) / period)
return result
def ema(data: list, period: int) -> list:
"""Calcula la media movil exponencial (EMA) de una serie de precios."""
if period <= 0 or period > len(data):
return []
multiplier = 2.0 / (period + 1)
# Primer valor es SMA del primer periodo
first_sma = sum(data[:period]) / period
result = [first_sma]
for i in range(period, len(data)):
val = (data[i] - result[-1]) * multiplier + result[-1]
result.append(val)
return result
def rsi(data: list, period: int) -> list:
"""Calcula el Relative Strength Index (RSI) de una serie de precios."""
if period <= 0 or len(data) < period + 1:
return []
deltas = [data[i] - data[i - 1] for i in range(1, len(data))]
gains = [d if d > 0 else 0.0 for d in deltas]
losses = [-d if d < 0 else 0.0 for d in deltas]
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
result = []
if avg_loss == 0:
result.append(100.0)
else:
rs = avg_gain / avg_loss
result.append(100.0 - 100.0 / (1.0 + rs))
for i in range(period, len(deltas)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
if avg_loss == 0:
result.append(100.0)
else:
rs = avg_gain / avg_loss
result.append(100.0 - 100.0 / (1.0 + rs))
return result
def bollinger_bands(data: list, period: int, num_std: float) -> tuple:
"""Calcula las Bandas de Bollinger (upper, middle, lower)."""
if period <= 0 or period > len(data):
return ([], [], [])
middle = sma(data, period)
upper = []
lower = []
for i in range(len(middle)):
window = data[i : i + period]
mean = middle[i]
variance = sum((x - mean) ** 2 for x in window) / period
std = math.sqrt(variance)
upper.append(mean + num_std * std)
lower.append(mean - num_std * std)
return (upper, middle, lower)
def sharpe_ratio(returns: list, risk_free_rate: float, periods_per_year: float) -> float:
"""Calcula el Sharpe Ratio anualizado."""
if len(returns) == 0 or periods_per_year <= 0:
return 0.0
n = len(returns)
mean_return = sum(returns) / n
excess = mean_return - risk_free_rate / periods_per_year
variance = sum((r - mean_return) ** 2 for r in returns) / n
std = math.sqrt(variance)
if std == 0:
return 0.0
return (excess / std) * math.sqrt(periods_per_year)
def max_drawdown(values: list) -> tuple:
"""Calcula el max drawdown y los indices de inicio y fin."""
if len(values) < 2:
return (0.0, 0, 0)
peak = values[0]
peak_idx = 0
max_dd = 0.0
dd_start = 0
dd_end = 0
for i in range(1, len(values)):
if values[i] > peak:
peak = values[i]
peak_idx = i
dd = (peak - values[i]) / peak if peak != 0 else 0.0
if dd > max_dd:
max_dd = dd
dd_start = peak_idx
dd_end = i
return (max_dd, dd_start, dd_end)
def vwap(prices: list, volumes: list) -> float:
"""Calcula el Volume-Weighted Average Price (VWAP)."""
if len(prices) == 0 or len(prices) != len(volumes):
return 0.0
total_volume = sum(volumes)
if total_volume == 0:
return 0.0
return sum(p * v for p, v in zip(prices, volumes)) / total_volume
def log_return(price_start: float, price_end: float) -> float:
"""Calcula el retorno logaritmico entre dos precios."""
if price_start <= 0 or price_end <= 0:
return 0.0
return math.log(price_end / price_start)
def annualized_volatility(returns: list, periods_per_year: float) -> float:
"""Calcula la volatilidad anualizada de una serie de retornos."""
if len(returns) < 2 or periods_per_year <= 0:
return 0.0
n = len(returns)
mean = sum(returns) / n
variance = sum((r - mean) ** 2 for r in returns) / (n - 1)
return math.sqrt(variance) * math.sqrt(periods_per_year)
+34
View File
@@ -0,0 +1,34 @@
---
name: log_return
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def log_return(price_start: float, price_end: float) -> float"
description: "Calcula el retorno logaritmico entre dos precios."
tags: [finance, return, logarithmic, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
r = log_return(100.0, 110.0)
# 0.09531... (aprox 9.53%)
```
## Notas
Formula: ln(price_end / price_start).
Retorna 0.0 si alguno de los precios es menor o igual a cero.
Los retornos logaritmicos son aditivos en el tiempo, a diferencia de los retornos simples.
+35
View File
@@ -0,0 +1,35 @@
---
name: max_drawdown
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def max_drawdown(values: list) -> tuple"
description: "Calcula el maximo drawdown y los indices de inicio y fin del peor periodo."
tags: [finance, drawdown, risk, performance, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
portfolio = [100, 110, 105, 95, 102, 108, 90, 95]
dd, start, end = max_drawdown(portfolio)
# dd = 0.1818..., start = 1, end = 6 (de 110 a 90)
```
## Notas
Retorna tupla (max_dd, start_idx, end_idx) donde max_dd es fraccion (0.0 a 1.0).
start_idx es el indice del pico previo, end_idx es el indice del valle.
Retorna (0.0, 0, 0) si la lista tiene menos de 2 elementos.
+36
View File
@@ -0,0 +1,36 @@
---
name: rsi
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def rsi(data: list, period: int) -> list"
description: "Calcula el Relative Strength Index (RSI) de una serie de precios."
tags: [finance, rsi, momentum, indicator, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
prices = [44, 44.34, 44.09, 43.61, 44.33, 44.83, 45.10, 45.42, 45.84, 46.08,
45.89, 46.03, 45.61, 46.28, 46.28, 46.00, 46.03, 46.41, 46.22, 45.64]
result = rsi(prices, 14)
# Lista de valores RSI entre 0 y 100
```
## Notas
Usa el metodo de suavizado de Wilder (media exponencial modificada).
Requiere al menos period + 1 datos de entrada. Retorna len(data) - period valores.
RSI = 100 si no hay perdidas en el periodo (avg_loss == 0).
+35
View File
@@ -0,0 +1,35 @@
---
name: sharpe_ratio
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def sharpe_ratio(returns: list, risk_free_rate: float, periods_per_year: float) -> float"
description: "Calcula el Sharpe Ratio anualizado de una serie de retornos."
tags: [finance, sharpe, risk, performance, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [math]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
daily_returns = [0.01, -0.005, 0.008, 0.003, -0.002, 0.006, 0.004]
sr = sharpe_ratio(daily_returns, 0.02, 252.0)
# Sharpe ratio anualizado
```
## Notas
risk_free_rate es la tasa anual (ej: 0.02 para 2%). Se convierte a tasa por periodo internamente.
periods_per_year indica la frecuencia de los retornos (252 para diarios, 12 para mensuales).
Retorna 0.0 si la desviacion estandar es cero o la lista esta vacia.
+34
View File
@@ -0,0 +1,34 @@
---
name: sma
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def sma(data: list, period: int) -> list"
description: "Calcula la media movil simple (SMA) de una serie de precios."
tags: [finance, moving-average, indicator, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
prices = [10, 11, 12, 13, 14, 15]
result = sma(prices, 3)
# [11.0, 12.0, 13.0, 14.0]
```
## Notas
Retorna lista mas corta que la entrada (len - period + 1 elementos).
Si period es mayor que len(data) o menor/igual a 0, retorna lista vacia.
+35
View File
@@ -0,0 +1,35 @@
---
name: vwap
kind: function
lang: py
domain: finance
version: "1.0.0"
purity: pure
signature: "def vwap(prices: list, volumes: list) -> float"
description: "Calcula el Volume-Weighted Average Price (VWAP)."
tags: [finance, vwap, volume, indicator, python]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: []
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/finance/finance.py"
---
## Ejemplo
```python
prices = [100.0, 101.0, 102.0, 101.5]
volumes = [1000, 1500, 1200, 800]
result = vwap(prices, volumes)
# 101.0888...
```
## Notas
Formula: sum(price_i * volume_i) / sum(volume_i).
Retorna 0.0 si las listas estan vacias, tienen distinto tamanio, o el volumen total es cero.
+6
View File
@@ -46,6 +46,12 @@ func (db *DB) Close() error {
return db.conn.Close()
}
// WalCheckpoint flushes the WAL to the main database file so external
// readers (e.g. Metabase via bind mount) see the latest data.
func (db *DB) WalCheckpoint() {
db.conn.Exec("PRAGMA wal_checkpoint(TRUNCATE)")
}
// Drop removes the database file. Used by `fn index` to regenerate.
func (db *DB) Drop() error {
db.Close()
+35
View File
@@ -11,6 +11,7 @@ import (
type IndexResult struct {
Functions int
Types int
Apps int
ValidationErrors []string
Errors []string
}
@@ -76,6 +77,28 @@ func Index(db *DB, root string) (*IndexResult, error) {
})
}
// Parse apps from apps/*/app.md
var apps []*App
appsDir := filepath.Join(root, "apps")
if fi, err := os.Stat(appsDir); err == nil && fi.IsDir() {
entries, _ := os.ReadDir(appsDir)
for _, e := range entries {
if !e.IsDir() {
continue
}
appMD := filepath.Join(appsDir, e.Name(), "app.md")
if _, err := os.Stat(appMD); err != nil {
continue
}
a, err := ParseAppMD(appMD, root)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("parse %s: %v", appMD, err))
continue
}
apps = append(apps, a)
}
}
// Build known ID sets
knownFunctions := make(map[string]bool, len(functions))
for _, f := range functions {
@@ -111,6 +134,18 @@ func Index(db *DB, root string) (*IndexResult, error) {
result.Functions++
}
for _, a := range apps {
if verr := ValidateApp(a, knownFunctions, knownTypes); verr != nil {
result.ValidationErrors = append(result.ValidationErrors, verr.Error())
continue
}
if err := db.InsertApp(a); err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("insert %s: %v", a.ID, err))
continue
}
result.Apps++
}
return result, nil
}
+48
View File
@@ -0,0 +1,48 @@
-- Apps table: applications that consume functions/types from the registry.
CREATE TABLE IF NOT EXISTS apps (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
lang TEXT NOT NULL,
domain TEXT NOT NULL,
description TEXT NOT NULL,
tags TEXT NOT NULL DEFAULT '[]',
uses_functions TEXT NOT NULL DEFAULT '[]',
uses_types TEXT NOT NULL DEFAULT '[]',
framework TEXT NOT NULL DEFAULT '',
entry_point TEXT NOT NULL DEFAULT '',
documentation TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
dir_path TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE VIRTUAL TABLE IF NOT EXISTS apps_fts USING fts5(
id,
name,
description,
tags,
domain,
documentation,
notes,
content='apps',
content_rowid='rowid'
);
CREATE TRIGGER IF NOT EXISTS apps_ai AFTER INSERT ON apps BEGIN
INSERT INTO apps_fts(rowid, id, name, description, tags, domain, documentation, notes)
VALUES (new.rowid, new.id, new.name, new.description, new.tags, new.domain, new.documentation, new.notes);
END;
CREATE TRIGGER IF NOT EXISTS apps_ad AFTER DELETE ON apps BEGIN
INSERT INTO apps_fts(apps_fts, rowid, id, name, description, tags, domain, documentation, notes)
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags, old.domain, old.documentation, old.notes);
END;
CREATE TRIGGER IF NOT EXISTS apps_au AFTER UPDATE ON apps BEGIN
INSERT INTO apps_fts(apps_fts, rowid, id, name, description, tags, domain, documentation, notes)
VALUES ('delete', old.rowid, old.id, old.name, old.description, old.tags, old.domain, old.documentation, old.notes);
INSERT INTO apps_fts(rowid, id, name, description, tags, domain, documentation, notes)
VALUES (new.rowid, new.id, new.name, new.description, new.tags, new.domain, new.documentation, new.notes);
END;
+19
View File
@@ -94,6 +94,25 @@ type Type struct {
UpdatedAt time.Time `json:"updated_at"`
}
// App represents an entry in the apps table.
type App struct {
ID string `json:"id"`
Name string `json:"name"`
Lang string `json:"lang"`
Domain string `json:"domain"`
Description string `json:"description"`
Tags []string `json:"tags"`
UsesFunctions []string `json:"uses_functions"`
UsesTypes []string `json:"uses_types"`
Framework string `json:"framework"`
EntryPoint string `json:"entry_point"`
Documentation string `json:"documentation"`
Notes string `json:"notes"`
DirPath string `json:"dir_path"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// ProposalKind classifies a proposal.
type ProposalKind string
+59
View File
@@ -54,6 +54,20 @@ type rawType struct {
FilePath string `yaml:"file_path"`
}
// rawApp mirrors the YAML frontmatter of an app .md file.
type rawApp struct {
Name string `yaml:"name"`
Lang string `yaml:"lang"`
Domain string `yaml:"domain"`
Description string `yaml:"description"`
Tags []string `yaml:"tags"`
UsesFunctions []string `yaml:"uses_functions"`
UsesTypes []string `yaml:"uses_types"`
Framework string `yaml:"framework"`
EntryPoint string `yaml:"entry_point"`
DirPath string `yaml:"dir_path"`
}
// extractFrontmatter splits a .md file into YAML frontmatter and body.
func extractFrontmatter(data []byte) ([]byte, []byte, error) {
content := data
@@ -198,6 +212,51 @@ func ParseTypeMD(path string, root string) (*Type, error) {
return t, nil
}
// ParseAppMD parses an app .md file into an App.
func ParseAppMD(path string, root string) (*App, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("reading %s: %w", path, err)
}
fm, body, err := extractFrontmatter(data)
if err != nil {
return nil, fmt.Errorf("parsing %s: %w", path, err)
}
var raw rawApp
if err := yaml.Unmarshal(fm, &raw); err != nil {
return nil, fmt.Errorf("parsing YAML in %s: %w", path, err)
}
if raw.Name == "" {
return nil, fmt.Errorf("%s: name is required", path)
}
if raw.Description == "" {
return nil, fmt.Errorf("%s: description is required", path)
}
sections := extractSections(body)
a := &App{
ID: GenerateID(raw.Name, raw.Lang, raw.Domain),
Name: raw.Name,
Lang: raw.Lang,
Domain: raw.Domain,
Description: raw.Description,
Tags: raw.Tags,
UsesFunctions: raw.UsesFunctions,
UsesTypes: raw.UsesTypes,
Framework: raw.Framework,
EntryPoint: raw.EntryPoint,
Documentation: sections.documentation,
Notes: sections.notes,
DirPath: raw.DirPath,
}
return a, nil
}
// bodySections holds the extracted sections from a .md body.
type bodySections struct {
example string // content under ## Ejemplo
+108 -2
View File
@@ -261,12 +261,118 @@ func (db *DB) DeleteType(id string) error {
return err
}
// Purge deletes all data from both tables. Used before re-indexing.
// InsertApp inserts or replaces an app entry.
func (db *DB) InsertApp(a *App) error {
now := time.Now().UTC().Format(time.RFC3339)
if a.CreatedAt.IsZero() {
a.CreatedAt = time.Now().UTC()
}
a.UpdatedAt = time.Now().UTC()
if a.ID == "" {
a.ID = GenerateID(a.Name, a.Lang, a.Domain)
}
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO apps (
id, name, lang, domain, description, tags,
uses_functions, uses_types, framework, entry_point,
documentation, notes, dir_path, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
a.ID, a.Name, a.Lang, a.Domain, a.Description, marshalStrings(a.Tags),
marshalStrings(a.UsesFunctions), marshalStrings(a.UsesTypes), a.Framework, a.EntryPoint,
a.Documentation, a.Notes, a.DirPath, a.CreatedAt.Format(time.RFC3339), now,
)
return err
}
// GetApp returns a single app by ID.
func (db *DB) GetApp(id string) (*App, error) {
rows, err := db.conn.Query("SELECT * FROM apps WHERE id = ?", id)
if err != nil {
return nil, err
}
defer rows.Close()
apps, err := scanApps(rows)
if err != nil {
return nil, err
}
if len(apps) == 0 {
return nil, fmt.Errorf("app %q not found", id)
}
return &apps[0], nil
}
// SearchApps performs FTS search on apps with optional filters.
func (db *DB) SearchApps(query string, lang, domain string) ([]App, error) {
where := []string{}
args := []any{}
if query != "" {
where = append(where, "a.id IN (SELECT id FROM apps_fts WHERE apps_fts MATCH ?)")
args = append(args, query)
}
if lang != "" {
where = append(where, "a.lang = ?")
args = append(args, lang)
}
if domain != "" {
where = append(where, "a.domain = ?")
args = append(args, domain)
}
sql := "SELECT * FROM apps a"
if len(where) > 0 {
sql += " WHERE " + strings.Join(where, " AND ")
}
sql += " ORDER BY a.name"
rows, err := db.conn.Query(sql, args...)
if err != nil {
return nil, fmt.Errorf("search apps: %w", err)
}
defer rows.Close()
return scanApps(rows)
}
func scanApps(rows interface{ Next() bool; Scan(...any) error }) ([]App, error) {
var result []App
for rows.Next() {
var a App
var tagsJSON, usesFnJSON, usesTypJSON string
var createdAt, updatedAt string
err := rows.Scan(
&a.ID, &a.Name, &a.Lang, &a.Domain, &a.Description, &tagsJSON,
&usesFnJSON, &usesTypJSON, &a.Framework, &a.EntryPoint,
&a.Documentation, &a.Notes, &a.DirPath, &createdAt, &updatedAt,
)
if err != nil {
return nil, fmt.Errorf("scanning app: %w", err)
}
a.Tags = unmarshalStrings(tagsJSON)
a.UsesFunctions = unmarshalStrings(usesFnJSON)
a.UsesTypes = unmarshalStrings(usesTypJSON)
a.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
a.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
result = append(result, a)
}
return result, nil
}
// Purge deletes all data from functions, types and apps. Used before re-indexing.
func (db *DB) Purge() error {
if _, err := db.conn.Exec("DELETE FROM functions"); err != nil {
return err
}
_, err := db.conn.Exec("DELETE FROM types")
if _, err := db.conn.Exec("DELETE FROM types"); err != nil {
return err
}
_, err := db.conn.Exec("DELETE FROM apps")
return err
}
+38
View File
@@ -161,6 +161,44 @@ func ValidateProposal(p *Proposal) *ValidationError {
return nil
}
// ValidateApp checks integrity rules for apps.
func ValidateApp(a *App, knownFunctions, knownTypes map[string]bool) *ValidationError {
var errs []string
if a.Name == "" {
errs = append(errs, "name is required")
}
if a.Lang == "" {
errs = append(errs, "lang is required")
}
if a.Domain == "" {
errs = append(errs, "domain is required")
}
if a.Description == "" {
errs = append(errs, "description is required")
}
if a.DirPath != "" && strings.HasPrefix(a.DirPath, "/") {
errs = append(errs, "dir_path must be relative to registry root")
}
for _, ref := range a.UsesFunctions {
if !knownFunctions[ref] {
errs = append(errs, fmt.Sprintf("uses_functions references unknown function: %s", ref))
}
}
for _, ref := range a.UsesTypes {
if !knownTypes[ref] {
errs = append(errs, fmt.Sprintf("uses_types references unknown type: %s", ref))
}
}
if len(errs) > 0 {
return &ValidationError{ID: a.ID, Errors: errs}
}
return nil
}
// ValidateType checks integrity rules for types.
func ValidateType(t *Type, knownTypes map[string]bool) *ValidationError {
var errs []string