2 Commits

Author SHA1 Message Date
egutierrez d8db05e9c9 chore(dag_engine): app.md v0.2.0 + puerto 4200 (metadata trackeada por el padre)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 23:48:59 +02:00
egutierrez e22c33ee6d refactor(infra): split de drivers pesados a subpaquetes + fix TestSSEHandler
Mueve duckdb_open, clickhouse_open, postgres_open, matrix_* y keyring_token_store
del paquete monolitico functions/infra a subpaquetes propios
(functions/infra/{duckdb,clickhouse,postgres,matrix,keyring}). El paquete infra ya
no importa los drivers (go-duckdb, clickhouse-go, pgx, mautrix, go-keyring), por lo
que las apps que solo usan funciones ligeras (process, cron, http, sqlite) dejan de
arrastrarlos. Reduccion de binarios: dag_engine 72->10MB, registry_api 70->8.7MB,
services_api 70->9MB, call_monitor 68->6.6MB, sqlite_api 70->8.9MB.

Los IDs del registry se mantienen estables (domain: infra en frontmatter). Se
preservan los build tags goolm/libolm de matrix_crypto_init.

Tambien corrige TestSSEHandler: el test leia el body con un unico Read() que con
HTTP chunked solo capturaba el primer evento; ahora usa io.ReadAll hasta EOF.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 23:48:59 +02:00
28 changed files with 68 additions and 63 deletions
+6 -6
View File
@@ -16,7 +16,7 @@ Por defecto el systemd unit apunta a `apps/dag_engine/dags/`. Para usar otro dir
```ini
ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \
--port 8090 \
--port 4200 \
--dags-dir /home/lucas/fn_registry/apps/dag_engine/dags \
--db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \
--scheduler
@@ -52,12 +52,12 @@ systemctl --user restart dag_engine.service
Busca la linea `[scheduler] ticker started for <nombre> (<cron>)` en los logs.
5. **Verificar en frontend**:
- C++ ImGui: panel `DAGs` muestra el nuevo DAG. Pulsa `Refresh` si no aparece.
- Web: `http://localhost:8090`.
- Web: `http://localhost:4200`.
### Disparo manual desde curl o frontend
```bash
curl -X POST http://127.0.0.1:8090/api/dags/<nombre>/run
curl -X POST http://127.0.0.1:4200/api/dags/<nombre>/run
```
Devuelve `{"dag":"<nombre>","run_id":"...","status":"accepted"}` y dispara el WS broadcast — los frontends ven la run en `<1s`.
@@ -223,7 +223,7 @@ Flags del `server`:
| Flag | Default | Que |
|---|---|---|
| `--port` | 8090 | Puerto HTTP. |
| `--port` | 4200 | Puerto HTTP. |
| `--dags-dir` | `apps/dag_engine/dags` (via systemd unit) | Dir scaneado para YAMLs. |
| `--db` | `dag_engine.db` | SQLite con `dag_runs` + `dag_step_results`. |
| `--scheduler` | false | Si presente, arranca cron tickers automaticamente. |
@@ -287,7 +287,7 @@ Flags del `server`:
Debes ver `[scheduler] ticker started for <name> (<cron>), next: <ISO8601>`.
3. Si `next:` es muy lejano (ej. en una semana) y necesitas probar -> dispara manual:
```bash
curl -X POST http://127.0.0.1:8090/api/dags/<name>/run
curl -X POST http://127.0.0.1:4200/api/dags/<name>/run
```
4. Hora del sistema descalibrada:
```bash
@@ -302,7 +302,7 @@ Flags del `server`:
| Causa | Fix |
|---|---|
| Servidor caido | `systemctl --user status dag_engine.service`, `restart` si `inactive`. |
| Puerto cambiado | El cliente apunta a `127.0.0.1:8090` por codigo (constante `g_ws_port`). Reedificar si cambiaste el puerto del server. |
| Puerto cambiado | El cliente apunta a `127.0.0.1:4200` por codigo (constante `g_ws_port`). Reedificar si cambiaste el puerto del server. |
| Firewall Windows -> WSL | WSL2 expone `localhost`, normalmente OK. Si falla: `wsl --shutdown` y reabrir. |
### 5.6. Cleanup de runs viejos
+6 -5
View File
@@ -2,7 +2,7 @@
name: dag_engine
lang: go
domain: infra
version: 0.1.0
version: 0.2.0
description: "Motor de ejecucion de DAGs del fn_registry: CLI + servidor HTTP + scheduler cron. Schema YAML propio con `function:` para invocar funciones del registry (`fn run <id>`) y `command:` para shell. Historial en SQLite. Scheduler oficial del ecosistema."
tags: [service, dag, workflow, scheduler, web, cron]
uses_functions:
@@ -29,7 +29,7 @@ framework: "net/http + vite + react"
entry_point: "main.go"
dir_path: "apps/dag_engine"
service:
port: 8090
port: 4200
health_endpoint: /api/dags
health_timeout_s: 3
systemd_unit: dag_engine.service
@@ -88,13 +88,13 @@ cd .. && CGO_ENABLED=1 go build -tags fts5 -o dag-engine .
./dag-engine list apps/dag_engine/dags/
# Servidor web (production: gestionado por dag_engine.service systemd user unit)
./dag-engine server --port 8090 --dags-dir apps/dag_engine/dags/ --scheduler
# Browser: http://localhost:8090
./dag-engine server --port 4200 --dags-dir apps/dag_engine/dags/ --scheduler
# Browser: http://localhost:4200
```
## Notas
Schema YAML propio (ver `README.md` seccion 3 + ejemplos en `dags/`). Steps tipo `function:` invocan `fn run <id>` y propagan `function_id` a `dag_step_results` para el bucle reactivo. Puerto default 8090.
Schema YAML propio (ver `README.md` seccion 3 + ejemplos en `dags/`). Steps tipo `function:` invocan `fn run <id>` y propagan `function_id` a `dag_step_results` para el bucle reactivo. Puerto default 4200.
### 2026-05-16 — Fix function-not-found en steps `function:` + panel Logs en RunDetail `[done]`
@@ -141,3 +141,4 @@ Una linea por bump SemVer. Bump-type segun `.claude/commands/version.md`:
- `patch`: bugfix sin cambio observable.
- v0.1.0 (2026-05-18) — baseline.
- v0.2.0 (2026-06-02) — minor: limpieza de la herencia `dagu` (renombrado `DAGU_ENV``FN_DAG_ENV`, directorio `dags_migrated/``dags/`, eliminado DAGs legacy/ejemplo), historial de ejecuciones reseteado, frontend reescrito con el estilo fn (tema indigo + radius md + `FnProvider` con `@mantine/notifications`, fix de la API `Collapse in``expanded` de Mantine 9.2.1), daemon systemd-user sirviendo React + API en el puerto 4200, y reduccion del binario de ~72MB a ~10MB separando los drivers pesados (duckdb/clickhouse/postgres/matrix/keyring) del paquete `functions/infra` a subpaquetes propios. `go.mod` replace ahora relativo (`../..`).
+1 -1
View File
@@ -6,7 +6,7 @@ export default defineConfig({
server: {
port: 5175,
proxy: {
"/api": "http://localhost:8090",
"/api": "http://localhost:4200",
},
},
build: {
@@ -1,6 +1,4 @@
//go:build !noclickhouse
package infra
package clickhouse
import (
"database/sql"
@@ -29,7 +29,7 @@ output: "conexion sql.DB abierta a ClickHouse con ping verificado"
tested: false
tests: []
test_file_path: ""
file_path: "functions/infra/clickhouse_open.go"
file_path: "functions/infra/clickhouse/clickhouse_open.go"
---
## Ejemplo
@@ -1,6 +1,4 @@
//go:build !noduckdb
package infra
package duckdb
import (
"database/sql"
@@ -21,7 +21,7 @@ output: "conexion sql.DB abierta a DuckDB"
tested: false
tests: []
test_file_path: ""
file_path: "functions/infra/duckdb_open.go"
file_path: "functions/infra/duckdb/duckdb_open.go"
---
## Ejemplo
@@ -1,4 +1,4 @@
package infra
package keyring
import (
"encoding/json"
@@ -20,7 +20,7 @@ type Token struct {
UserID string `json:"user_id"`
DeviceID string `json:"device_id,omitempty"`
HomeserverURL string `json:"homeserver_url"`
Issuer string `json:"issuer,omitempty"` // MAS/OIDC issuer URL
Issuer string `json:"issuer,omitempty"` // MAS/OIDC issuer URL
ClientID string `json:"client_id,omitempty"` // MAS client_id used
}
@@ -54,8 +54,8 @@ tests:
- "Save then Delete then Load returns ErrNotFound"
- "Delete nonexistent is idempotent"
- "Save twice overwrites with second token"
test_file_path: "functions/infra/keyring_token_store_test.go"
file_path: "functions/infra/keyring_token_store.go"
test_file_path: "functions/infra/keyring/keyring_token_store_test.go"
file_path: "functions/infra/keyring/keyring_token_store.go"
---
## Ejemplo
@@ -1,4 +1,4 @@
package infra
package keyring
import (
"errors"
@@ -1,4 +1,4 @@
package infra
package matrix
import (
"context"
@@ -38,8 +38,8 @@ tests:
- "Whoami 401 token invalido"
- "EnableCrypto true devuelve error not implemented"
- "StoreDir se crea con permisos 0700"
test_file_path: "functions/infra/matrix_client_init_test.go"
file_path: "functions/infra/matrix_client_init.go"
test_file_path: "functions/infra/matrix/matrix_client_init_test.go"
file_path: "functions/infra/matrix/matrix_client_init.go"
---
## Ejemplo
@@ -1,4 +1,4 @@
package infra
package matrix
import (
"encoding/json"
@@ -1,6 +1,6 @@
//go:build goolm || libolm
package infra
package matrix
import (
"context"
@@ -38,8 +38,8 @@ tests:
- "directorio del store se crea con permisos 0700"
- "input valido Init exito helper no nil"
- "Synapse 401 en keys upload devuelve error"
test_file_path: "functions/infra/matrix_crypto_init_test.go"
file_path: "functions/infra/matrix_crypto_init.go"
test_file_path: "functions/infra/matrix/matrix_crypto_init_test.go"
file_path: "functions/infra/matrix/matrix_crypto_init.go"
---
## Ejemplo
@@ -1,6 +1,6 @@
//go:build goolm || libolm
package infra
package matrix
import (
"context"
@@ -72,10 +72,10 @@ func newSynapseMock(t *testing.T, uploadStatus int) *httptest.Server {
mux.HandleFunc("/_matrix/client/v3/keys/query", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
resp := map[string]any{
"device_keys": map[string]any{},
"failures": map[string]any{},
"master_keys": map[string]any{},
"user_signing_keys": map[string]any{},
"device_keys": map[string]any{},
"failures": map[string]any{},
"master_keys": map[string]any{},
"user_signing_keys": map[string]any{},
"self_signing_keys": map[string]any{},
}
_ = json.NewEncoder(w).Encode(resp)
@@ -85,9 +85,9 @@ func newSynapseMock(t *testing.T, uploadStatus int) *httptest.Server {
mux.HandleFunc("/_matrix/client/v3/sync", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
resp := map[string]any{
"next_batch": "s0_1",
"rooms": map[string]any{},
"to_device": map[string]any{"events": []any{}},
"next_batch": "s0_1",
"rooms": map[string]any{},
"to_device": map[string]any{"events": []any{}},
"device_one_time_keys_count": map[string]any{},
}
_ = json.NewEncoder(w).Encode(resp)
@@ -1,4 +1,4 @@
package infra
package matrix
import (
"bytes"
@@ -53,8 +53,8 @@ tests:
- "SendReply client nil devuelve error"
- "EditMessage client nil devuelve error"
- "SendReaction client nil devuelve error"
test_file_path: "functions/infra/matrix_message_send_test.go"
file_path: "functions/infra/matrix_message_send.go"
test_file_path: "functions/infra/matrix/matrix_message_send_test.go"
file_path: "functions/infra/matrix/matrix_message_send.go"
---
## Ejemplo
@@ -1,4 +1,4 @@
package infra
package matrix
import (
"context"
@@ -1,4 +1,4 @@
package infra
package matrix
import (
"context"
@@ -23,8 +23,8 @@ type RoomSummary struct {
IsSpace bool `json:"is_space"` // m.room.type == m.space
IsEncrypted bool `json:"is_encrypted"` // m.room.encryption state event presente
MemberCount int `json:"member_count"`
LastEventTs int64 `json:"last_event_ts"` // unix ms del ultimo evento conocido
UnreadCount int `json:"unread_count"` // notifications.unread + highlight
LastEventTs int64 `json:"last_event_ts"` // unix ms del ultimo evento conocido
UnreadCount int `json:"unread_count"` // notifications.unread + highlight
Tags []string `json:"tags,omitempty"` // m.tag account_data
}
@@ -30,8 +30,8 @@ tests:
- "IsDirect set correctamente segun m.direct"
- "IsEncrypted set segun presencia de m.room.encryption"
- "client nil devuelve error"
test_file_path: "functions/infra/matrix_room_list_test.go"
file_path: "functions/infra/matrix_room_list.go"
test_file_path: "functions/infra/matrix/matrix_room_list_test.go"
file_path: "functions/infra/matrix/matrix_room_list.go"
---
## Ejemplo
@@ -1,4 +1,4 @@
package infra
package matrix
import (
"context"
@@ -18,9 +18,9 @@ import (
// los handlers lo decodifican antes de hacer lookup.
type matrixTestServer struct {
*httptest.Server
joinedRooms []string // room IDs que devuelve /joined_rooms
roomNames map[string]string // roomID -> name (no seteado = sin m.room.name)
encryptedRooms map[string]bool // roomID -> tiene encryption event
joinedRooms []string // room IDs que devuelve /joined_rooms
roomNames map[string]string // roomID -> name (no seteado = sin m.room.name)
encryptedRooms map[string]bool // roomID -> tiene encryption event
directContent map[string][]string // userID -> []roomID
roomTags map[string][]string // roomID -> []tag names
}
@@ -1,4 +1,4 @@
package infra
package matrix
import (
"context"
@@ -36,8 +36,8 @@ tests:
- "Error401NoExit"
- "StopIdempotente"
- "CtxCancelCierraChannels"
test_file_path: "functions/infra/matrix_sync_service_test.go"
file_path: "functions/infra/matrix_sync_service.go"
test_file_path: "functions/infra/matrix/matrix_sync_service_test.go"
file_path: "functions/infra/matrix/matrix_sync_service.go"
---
## Ejemplo
@@ -1,6 +1,6 @@
//go:build goolm
package infra
package matrix
import (
"context"
@@ -163,7 +163,7 @@ func TestMatrixSyncService_BackoffRecovery(t *testing.T) {
cli := newTestSyncClient(t, srv.URL)
h, err := MatrixSyncService(context.Background(), MatrixSyncServiceConfig{
Client: cli,
InitialBackoffMS: 50, // backoff corto para tests
InitialBackoffMS: 50, // backoff corto para tests
MaxBackoffMS: 200,
ChannelBuffer: 16,
})
@@ -1,4 +1,4 @@
package infra
package postgres
import (
"database/sql"
@@ -31,7 +31,7 @@ output: "conexion sql.DB abierta a PostgreSQL"
tested: false
tests: []
test_file_path: ""
file_path: "functions/infra/postgres_open.go"
file_path: "functions/infra/postgres/postgres_open.go"
---
## Ejemplo
+11 -3
View File
@@ -2,6 +2,7 @@ package infra
import (
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
@@ -210,9 +211,16 @@ func TestSSEHandler(t *testing.T) {
}
defer resp.Body.Close()
buf := make([]byte, 4096)
n, _ := resp.Body.Read(buf)
body := string(buf[:n])
// Leer el body completo hasta EOF. El canal se cierra antes de la
// peticion, asi que el handler envia ambos eventos y termina, cerrando
// el stream. Un unico Read podria devolver solo el primer chunk
// (event: first), porque io.Reader.Read no garantiza llenar el buffer;
// io.ReadAll consume todos los chunks emitidos por el handler.
raw, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("read body failed: %v", err)
}
body := string(raw)
for _, want := range []string{"event: first", "event: second", "data: 1", "data: 2"} {
if !strings.Contains(body, want) {