9a7a874a76
El parser Python de audit_uses_functions solo reconocia "from <pkg> import X" con un unico componente de paquete (regex \w+), por lo que: - "from <pkg>.<subpkg> import X" (import anidado) no matcheaba y la funcion se reportaba como falso unused_in_app_md. - Las listas multilinea con parentesis "from <pkg> import (\n a,\n b,\n)" no se parseaban (escaneo linea a linea). Cambios: - Regex acepta puntos en el paquete y bloques parentizados multilinea. - Resolucion validada contra el directorio de paquete del registry derivado de file_path (no del campo domain: las funciones metabase viven en python/functions/metabase/ pero tienen domain=infra). Imports de librerias externas se ignoran -> sin falsos missing. - parsePyImportedSymbols descarta comentarios "# noqa", maneja "as alias" y star imports (tratados como vacio, no soportados por diseno). - auditFnMeta carga file_path; query SELECT anade file_path. Tests (functions/infra/audit_uses_functions_test.go): - TestAuditUsesFunctions_DetectsNestedImport (golden) - TestAuditUsesFunctions_NoFalsePositiveOnNested (edge: nested + multilinea) - TestAuditUsesFunctions_StarImport (error/edge: star import no cuenta) Verificado con fn doctor uses-functions sobre apps reales: drift baja de 11/42 a 9/42. mail_manager (9 falsos por "from infra.X import Y") y demand_radar (3 por lista multilinea) quedan en 0 drift. El residual de osint_db/osint_web es carga dinamica via importlib, documentado como fuera de alcance. audit_uses_functions v1.0.0 -> v1.1.0. CHANGELOG actualizado. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
350 lines
10 KiB
Go
350 lines
10 KiB
Go
package infra
|
|
|
|
import (
|
|
"database/sql"
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
// createTestRegistryDB creates a minimal registry.db with the given apps and
|
|
// a single function (random_hex_id_go_core in domain core, lang go).
|
|
func createTestRegistryDB(t *testing.T, root string, apps []struct {
|
|
id string
|
|
lang string
|
|
dirPath string
|
|
usesFunctions string
|
|
}) {
|
|
t.Helper()
|
|
dbPath := filepath.Join(root, "registry.db")
|
|
db, err := sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer db.Close()
|
|
|
|
_, err = db.Exec(`
|
|
CREATE TABLE functions (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT,
|
|
domain TEXT,
|
|
lang TEXT,
|
|
signature TEXT,
|
|
file_path TEXT
|
|
);
|
|
CREATE TABLE apps (
|
|
id TEXT PRIMARY KEY,
|
|
lang TEXT,
|
|
dir_path TEXT,
|
|
uses_functions TEXT DEFAULT '[]'
|
|
);
|
|
INSERT INTO functions (id, name, domain, lang, file_path)
|
|
VALUES ('random_hex_id_go_core','random_hex_id','core','go','functions/core/random_hex_id.go');
|
|
`)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
for _, a := range apps {
|
|
_, err = db.Exec(
|
|
`INSERT INTO apps (id, lang, dir_path, uses_functions) VALUES (?,?,?,?)`,
|
|
a.id, a.lang, a.dirPath, a.usesFunctions,
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("insert app %s: %v", a.id, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// insertTestFunctions appends extra functions to the test registry.db created by
|
|
// createTestRegistryDB. Used by the Python import tests, which need py functions
|
|
// whose file_path maps to a real package directory under python/functions/.
|
|
func insertTestFunctions(t *testing.T, root string, fns []struct {
|
|
id, name, domain, lang, filePath string
|
|
}) {
|
|
t.Helper()
|
|
db, err := sql.Open("sqlite3", filepath.Join(root, "registry.db"))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer db.Close()
|
|
for _, f := range fns {
|
|
if _, err := db.Exec(
|
|
`INSERT INTO functions (id, name, domain, lang, file_path) VALUES (?,?,?,?,?)`,
|
|
f.id, f.name, f.domain, f.lang, f.filePath,
|
|
); err != nil {
|
|
t.Fatalf("insert fn %s: %v", f.id, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// writePyApp creates a Python app directory with a single source file.
|
|
func writePyApp(t *testing.T, root, dirPath, src string) {
|
|
t.Helper()
|
|
appDir := filepath.Join(root, dirPath)
|
|
if err := os.MkdirAll(appDir, 0755); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if err := os.WriteFile(filepath.Join(appDir, "main.py"), []byte(src), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// containsID reports whether ids contains target.
|
|
func containsID(ids []string, target string) bool {
|
|
for _, id := range ids {
|
|
if id == target {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// TestAuditUsesFunctions_DetectsNestedImport verifies that a nested import
|
|
// ("from metabase.cards import metabase_get_card") resolves to its function ID.
|
|
// The app declares no uses_functions, so the detected import surfaces as Missing.
|
|
func TestAuditUsesFunctions_DetectsNestedImport(t *testing.T) {
|
|
root := t.TempDir()
|
|
createTestRegistryDB(t, root, []struct {
|
|
id, lang, dirPath, usesFunctions string
|
|
}{
|
|
{"nestedapp_py_tools", "py", "apps/nestedapp", `[]`},
|
|
})
|
|
insertTestFunctions(t, root, []struct {
|
|
id, name, domain, lang, filePath string
|
|
}{
|
|
{"metabase_get_card_py_infra", "metabase_get_card", "infra", "py", "python/functions/metabase/cards.py"},
|
|
})
|
|
|
|
writePyApp(t, root, "apps/nestedapp", `import sys
|
|
from metabase.cards import metabase_get_card # noqa: E402
|
|
|
|
def run():
|
|
return metabase_get_card(1)
|
|
`)
|
|
|
|
results, err := AuditUsesFunctions(root)
|
|
if err != nil {
|
|
t.Fatalf("AuditUsesFunctions: %v", err)
|
|
}
|
|
if len(results) != 1 {
|
|
t.Fatalf("expected 1 result, got %d", len(results))
|
|
}
|
|
got := results[0]
|
|
if !containsID(got.Missing, "metabase_get_card_py_infra") {
|
|
t.Errorf("nested import not detected: Missing = %v, want to contain metabase_get_card_py_infra", got.Missing)
|
|
}
|
|
if len(got.Unused) != 0 {
|
|
t.Errorf("Unused = %v, want []", got.Unused)
|
|
}
|
|
}
|
|
|
|
// TestAuditUsesFunctions_NoFalsePositiveOnNested verifies that when an app
|
|
// imports nested + multi-line parenthesised lists and declares them all in
|
|
// uses_functions, no function is reported as unused (the core regression fixed
|
|
// by this issue: false "unused" hits for nested/multi-line imports).
|
|
func TestAuditUsesFunctions_NoFalsePositiveOnNested(t *testing.T) {
|
|
root := t.TempDir()
|
|
createTestRegistryDB(t, root, []struct {
|
|
id, lang, dirPath, usesFunctions string
|
|
}{
|
|
{"nofp_py_tools", "py", "apps/nofp",
|
|
`["imap_connect_py_infra","smtp_send_py_infra","fetch_reddit_search_py_datascience","score_demand_signal_py_datascience"]`},
|
|
})
|
|
insertTestFunctions(t, root, []struct {
|
|
id, name, domain, lang, filePath string
|
|
}{
|
|
{"imap_connect_py_infra", "imap_connect", "infra", "py", "python/functions/infra/imap_connect.py"},
|
|
{"smtp_send_py_infra", "smtp_send", "infra", "py", "python/functions/infra/smtp_send.py"},
|
|
{"fetch_reddit_search_py_datascience", "fetch_reddit_search", "datascience", "py", "python/functions/datascience/fetch_reddit_search.py"},
|
|
{"score_demand_signal_py_datascience", "score_demand_signal", "datascience", "py", "python/functions/datascience/score_demand_signal.py"},
|
|
})
|
|
|
|
// Nested imports + a parenthesised multi-line list — both previously missed.
|
|
writePyApp(t, root, "apps/nofp", `import sys
|
|
from infra.imap_connect import imap_connect # noqa: E402
|
|
from infra.smtp_send import smtp_send, SMTPConfigPy # noqa: E402
|
|
from datascience import ( # noqa: E402
|
|
fetch_reddit_search,
|
|
score_demand_signal,
|
|
)
|
|
|
|
def run():
|
|
return imap_connect, smtp_send, fetch_reddit_search, score_demand_signal
|
|
`)
|
|
|
|
results, err := AuditUsesFunctions(root)
|
|
if err != nil {
|
|
t.Fatalf("AuditUsesFunctions: %v", err)
|
|
}
|
|
if len(results) != 1 {
|
|
t.Fatalf("expected 1 result, got %d", len(results))
|
|
}
|
|
got := results[0]
|
|
if len(got.Unused) != 0 {
|
|
t.Errorf("false positive unused detected: Unused = %v, want []", got.Unused)
|
|
}
|
|
if len(got.Missing) != 0 {
|
|
t.Errorf("Missing = %v, want []", got.Missing)
|
|
}
|
|
}
|
|
|
|
// TestAuditUsesFunctions_StarImport documents that star imports
|
|
// ("from <pkg> import *") are NOT treated as using any function: a declared
|
|
// function not otherwise referenced is reported as unused.
|
|
func TestAuditUsesFunctions_StarImport(t *testing.T) {
|
|
root := t.TempDir()
|
|
createTestRegistryDB(t, root, []struct {
|
|
id, lang, dirPath, usesFunctions string
|
|
}{
|
|
{"starapp_py_tools", "py", "apps/starapp", `["filter_list_py_core"]`},
|
|
})
|
|
insertTestFunctions(t, root, []struct {
|
|
id, name, domain, lang, filePath string
|
|
}{
|
|
{"filter_list_py_core", "filter_list", "core", "py", "python/functions/core/core.py"},
|
|
})
|
|
|
|
writePyApp(t, root, "apps/starapp", `from core import *
|
|
|
|
def run():
|
|
return None
|
|
`)
|
|
|
|
results, err := AuditUsesFunctions(root)
|
|
if err != nil {
|
|
t.Fatalf("AuditUsesFunctions: %v", err)
|
|
}
|
|
if len(results) != 1 {
|
|
t.Fatalf("expected 1 result, got %d", len(results))
|
|
}
|
|
got := results[0]
|
|
if !containsID(got.Unused, "filter_list_py_core") {
|
|
t.Errorf("star import should not count as usage: Unused = %v, want to contain filter_list_py_core", got.Unused)
|
|
}
|
|
if len(got.Missing) != 0 {
|
|
t.Errorf("Missing = %v, want []", got.Missing)
|
|
}
|
|
}
|
|
|
|
// TestAuditUsesFunctions_DetectsMissing verifies that a Go app that calls
|
|
// RandomHexID in its source but declares empty uses_functions gets
|
|
// random_hex_id_go_core reported as missing.
|
|
func TestAuditUsesFunctions_DetectsMissing(t *testing.T) {
|
|
t.Run("missing function detected for Go app", func(t *testing.T) {
|
|
root := t.TempDir()
|
|
createTestRegistryDB(t, root, []struct {
|
|
id, lang, dirPath, usesFunctions string
|
|
}{
|
|
{"testapp_go_tools", "go", "apps/testapp", `[]`},
|
|
})
|
|
|
|
appDir := filepath.Join(root, "apps", "testapp")
|
|
if err := os.MkdirAll(appDir, 0755); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
goSrc := `package main
|
|
|
|
import (
|
|
"fmt"
|
|
"fn-registry/functions/core"
|
|
)
|
|
|
|
func main() {
|
|
id := core.RandomHexID(8)
|
|
fmt.Println(id)
|
|
}
|
|
`
|
|
if err := os.WriteFile(filepath.Join(appDir, "main.go"), []byte(goSrc), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
results, err := AuditUsesFunctions(root)
|
|
if err != nil {
|
|
t.Fatalf("AuditUsesFunctions: %v", err)
|
|
}
|
|
if len(results) != 1 {
|
|
t.Fatalf("expected 1 result, got %d", len(results))
|
|
}
|
|
got := results[0]
|
|
if len(got.Missing) != 1 || got.Missing[0] != "random_hex_id_go_core" {
|
|
t.Errorf("Missing = %v, want [random_hex_id_go_core]", got.Missing)
|
|
}
|
|
if len(got.Unused) != 0 {
|
|
t.Errorf("Unused = %v, want []", got.Unused)
|
|
}
|
|
})
|
|
}
|
|
|
|
// TestAuditUsesFunctions_DetectsUnused verifies that a function declared in
|
|
// uses_functions but not called in source is reported as unused.
|
|
func TestAuditUsesFunctions_DetectsUnused(t *testing.T) {
|
|
t.Run("unused function detected for Go app", func(t *testing.T) {
|
|
root := t.TempDir()
|
|
createTestRegistryDB(t, root, []struct {
|
|
id, lang, dirPath, usesFunctions string
|
|
}{
|
|
{"testapp2_go_tools", "go", "apps/testapp2", `["random_hex_id_go_core"]`},
|
|
})
|
|
|
|
appDir := filepath.Join(root, "apps", "testapp2")
|
|
if err := os.MkdirAll(appDir, 0755); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
goSrc := `package main
|
|
|
|
import "fmt"
|
|
|
|
func main() { fmt.Println("hello") }
|
|
`
|
|
if err := os.WriteFile(filepath.Join(appDir, "main.go"), []byte(goSrc), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
results, err := AuditUsesFunctions(root)
|
|
if err != nil {
|
|
t.Fatalf("AuditUsesFunctions: %v", err)
|
|
}
|
|
if len(results) != 1 {
|
|
t.Fatalf("expected 1 result, got %d", len(results))
|
|
}
|
|
got := results[0]
|
|
if len(got.Unused) != 1 || got.Unused[0] != "random_hex_id_go_core" {
|
|
t.Errorf("Unused = %v, want [random_hex_id_go_core]", got.Unused)
|
|
}
|
|
if len(got.Missing) != 0 {
|
|
t.Errorf("Missing = %v, want []", got.Missing)
|
|
}
|
|
})
|
|
}
|
|
|
|
// TestAuditUsesFunctions_MissingDir verifies that apps whose dir_path does not
|
|
// exist on disk get an entry with nil Missing/Unused slices (cannot inspect).
|
|
func TestAuditUsesFunctions_MissingDir(t *testing.T) {
|
|
t.Run("missing dir returns entry with nil slices", func(t *testing.T) {
|
|
root := t.TempDir()
|
|
createTestRegistryDB(t, root, []struct {
|
|
id, lang, dirPath, usesFunctions string
|
|
}{
|
|
{"testapp3_go_tools", "go", "apps/testapp3", `[]`},
|
|
})
|
|
// intentionally do NOT create apps/testapp3 on disk
|
|
|
|
results, err := AuditUsesFunctions(root)
|
|
if err != nil {
|
|
t.Fatalf("AuditUsesFunctions: %v", err)
|
|
}
|
|
if len(results) != 1 {
|
|
t.Fatalf("expected 1 result, got %d", len(results))
|
|
}
|
|
got := results[0]
|
|
if got.Missing != nil {
|
|
t.Errorf("Missing should be nil for missing dir, got %v", got.Missing)
|
|
}
|
|
if got.Unused != nil {
|
|
t.Errorf("Unused should be nil for missing dir, got %v", got.Unused)
|
|
}
|
|
})
|
|
}
|