Compare commits
2 Commits
4461875b18
...
d8db05e9c9
| Author | SHA1 | Date | |
|---|---|---|---|
| d8db05e9c9 | |||
| e22c33ee6d |
@@ -16,7 +16,7 @@ Por defecto el systemd unit apunta a `apps/dag_engine/dags/`. Para usar otro dir
|
||||
|
||||
```ini
|
||||
ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \
|
||||
--port 8090 \
|
||||
--port 4200 \
|
||||
--dags-dir /home/lucas/fn_registry/apps/dag_engine/dags \
|
||||
--db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \
|
||||
--scheduler
|
||||
@@ -52,12 +52,12 @@ systemctl --user restart dag_engine.service
|
||||
Busca la linea `[scheduler] ticker started for <nombre> (<cron>)` en los logs.
|
||||
5. **Verificar en frontend**:
|
||||
- C++ ImGui: panel `DAGs` muestra el nuevo DAG. Pulsa `Refresh` si no aparece.
|
||||
- Web: `http://localhost:8090`.
|
||||
- Web: `http://localhost:4200`.
|
||||
|
||||
### Disparo manual desde curl o frontend
|
||||
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:8090/api/dags/<nombre>/run
|
||||
curl -X POST http://127.0.0.1:4200/api/dags/<nombre>/run
|
||||
```
|
||||
|
||||
Devuelve `{"dag":"<nombre>","run_id":"...","status":"accepted"}` y dispara el WS broadcast — los frontends ven la run en `<1s`.
|
||||
@@ -223,7 +223,7 @@ Flags del `server`:
|
||||
|
||||
| Flag | Default | Que |
|
||||
|---|---|---|
|
||||
| `--port` | 8090 | Puerto HTTP. |
|
||||
| `--port` | 4200 | Puerto HTTP. |
|
||||
| `--dags-dir` | `apps/dag_engine/dags` (via systemd unit) | Dir scaneado para YAMLs. |
|
||||
| `--db` | `dag_engine.db` | SQLite con `dag_runs` + `dag_step_results`. |
|
||||
| `--scheduler` | false | Si presente, arranca cron tickers automaticamente. |
|
||||
@@ -287,7 +287,7 @@ Flags del `server`:
|
||||
Debes ver `[scheduler] ticker started for <name> (<cron>), next: <ISO8601>`.
|
||||
3. Si `next:` es muy lejano (ej. en una semana) y necesitas probar -> dispara manual:
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:8090/api/dags/<name>/run
|
||||
curl -X POST http://127.0.0.1:4200/api/dags/<name>/run
|
||||
```
|
||||
4. Hora del sistema descalibrada:
|
||||
```bash
|
||||
@@ -302,7 +302,7 @@ Flags del `server`:
|
||||
| Causa | Fix |
|
||||
|---|---|
|
||||
| Servidor caido | `systemctl --user status dag_engine.service`, `restart` si `inactive`. |
|
||||
| Puerto cambiado | El cliente apunta a `127.0.0.1:8090` por codigo (constante `g_ws_port`). Reedificar si cambiaste el puerto del server. |
|
||||
| Puerto cambiado | El cliente apunta a `127.0.0.1:4200` por codigo (constante `g_ws_port`). Reedificar si cambiaste el puerto del server. |
|
||||
| Firewall Windows -> WSL | WSL2 expone `localhost`, normalmente OK. Si falla: `wsl --shutdown` y reabrir. |
|
||||
|
||||
### 5.6. Cleanup de runs viejos
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
name: dag_engine
|
||||
lang: go
|
||||
domain: infra
|
||||
version: 0.1.0
|
||||
version: 0.2.0
|
||||
description: "Motor de ejecucion de DAGs del fn_registry: CLI + servidor HTTP + scheduler cron. Schema YAML propio con `function:` para invocar funciones del registry (`fn run <id>`) y `command:` para shell. Historial en SQLite. Scheduler oficial del ecosistema."
|
||||
tags: [service, dag, workflow, scheduler, web, cron]
|
||||
uses_functions:
|
||||
@@ -29,7 +29,7 @@ framework: "net/http + vite + react"
|
||||
entry_point: "main.go"
|
||||
dir_path: "apps/dag_engine"
|
||||
service:
|
||||
port: 8090
|
||||
port: 4200
|
||||
health_endpoint: /api/dags
|
||||
health_timeout_s: 3
|
||||
systemd_unit: dag_engine.service
|
||||
@@ -88,13 +88,13 @@ cd .. && CGO_ENABLED=1 go build -tags fts5 -o dag-engine .
|
||||
./dag-engine list apps/dag_engine/dags/
|
||||
|
||||
# Servidor web (production: gestionado por dag_engine.service systemd user unit)
|
||||
./dag-engine server --port 8090 --dags-dir apps/dag_engine/dags/ --scheduler
|
||||
# Browser: http://localhost:8090
|
||||
./dag-engine server --port 4200 --dags-dir apps/dag_engine/dags/ --scheduler
|
||||
# Browser: http://localhost:4200
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Schema YAML propio (ver `README.md` seccion 3 + ejemplos en `dags/`). Steps tipo `function:` invocan `fn run <id>` y propagan `function_id` a `dag_step_results` para el bucle reactivo. Puerto default 8090.
|
||||
Schema YAML propio (ver `README.md` seccion 3 + ejemplos en `dags/`). Steps tipo `function:` invocan `fn run <id>` y propagan `function_id` a `dag_step_results` para el bucle reactivo. Puerto default 4200.
|
||||
|
||||
### 2026-05-16 — Fix function-not-found en steps `function:` + panel Logs en RunDetail `[done]`
|
||||
|
||||
@@ -141,3 +141,4 @@ Una linea por bump SemVer. Bump-type segun `.claude/commands/version.md`:
|
||||
- `patch`: bugfix sin cambio observable.
|
||||
|
||||
- v0.1.0 (2026-05-18) — baseline.
|
||||
- v0.2.0 (2026-06-02) — minor: limpieza de la herencia `dagu` (renombrado `DAGU_ENV`→`FN_DAG_ENV`, directorio `dags_migrated/`→`dags/`, eliminado DAGs legacy/ejemplo), historial de ejecuciones reseteado, frontend reescrito con el estilo fn (tema indigo + radius md + `FnProvider` con `@mantine/notifications`, fix de la API `Collapse in`→`expanded` de Mantine 9.2.1), daemon systemd-user sirviendo React + API en el puerto 4200, y reduccion del binario de ~72MB a ~10MB separando los drivers pesados (duckdb/clickhouse/postgres/matrix/keyring) del paquete `functions/infra` a subpaquetes propios. `go.mod` replace ahora relativo (`../..`).
|
||||
|
||||
@@ -6,7 +6,7 @@ export default defineConfig({
|
||||
server: {
|
||||
port: 5175,
|
||||
proxy: {
|
||||
"/api": "http://localhost:8090",
|
||||
"/api": "http://localhost:4200",
|
||||
},
|
||||
},
|
||||
build: {
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
//go:build !noclickhouse
|
||||
|
||||
package infra
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
@@ -29,7 +29,7 @@ output: "conexion sql.DB abierta a ClickHouse con ping verificado"
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "functions/infra/clickhouse_open.go"
|
||||
file_path: "functions/infra/clickhouse/clickhouse_open.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
@@ -1,6 +1,4 @@
|
||||
//go:build !noduckdb
|
||||
|
||||
package infra
|
||||
package duckdb
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
@@ -21,7 +21,7 @@ output: "conexion sql.DB abierta a DuckDB"
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "functions/infra/duckdb_open.go"
|
||||
file_path: "functions/infra/duckdb/duckdb_open.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
+2
-2
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package keyring
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@@ -20,7 +20,7 @@ type Token struct {
|
||||
UserID string `json:"user_id"`
|
||||
DeviceID string `json:"device_id,omitempty"`
|
||||
HomeserverURL string `json:"homeserver_url"`
|
||||
Issuer string `json:"issuer,omitempty"` // MAS/OIDC issuer URL
|
||||
Issuer string `json:"issuer,omitempty"` // MAS/OIDC issuer URL
|
||||
ClientID string `json:"client_id,omitempty"` // MAS client_id used
|
||||
}
|
||||
|
||||
+2
-2
@@ -54,8 +54,8 @@ tests:
|
||||
- "Save then Delete then Load returns ErrNotFound"
|
||||
- "Delete nonexistent is idempotent"
|
||||
- "Save twice overwrites with second token"
|
||||
test_file_path: "functions/infra/keyring_token_store_test.go"
|
||||
file_path: "functions/infra/keyring_token_store.go"
|
||||
test_file_path: "functions/infra/keyring/keyring_token_store_test.go"
|
||||
file_path: "functions/infra/keyring/keyring_token_store.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package keyring
|
||||
|
||||
import (
|
||||
"errors"
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"context"
|
||||
+2
-2
@@ -38,8 +38,8 @@ tests:
|
||||
- "Whoami 401 token invalido"
|
||||
- "EnableCrypto true devuelve error not implemented"
|
||||
- "StoreDir se crea con permisos 0700"
|
||||
test_file_path: "functions/infra/matrix_client_init_test.go"
|
||||
file_path: "functions/infra/matrix_client_init.go"
|
||||
test_file_path: "functions/infra/matrix/matrix_client_init_test.go"
|
||||
file_path: "functions/infra/matrix/matrix_client_init.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
//go:build goolm || libolm
|
||||
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"context"
|
||||
+2
-2
@@ -38,8 +38,8 @@ tests:
|
||||
- "directorio del store se crea con permisos 0700"
|
||||
- "input valido Init exito helper no nil"
|
||||
- "Synapse 401 en keys upload devuelve error"
|
||||
test_file_path: "functions/infra/matrix_crypto_init_test.go"
|
||||
file_path: "functions/infra/matrix_crypto_init.go"
|
||||
test_file_path: "functions/infra/matrix/matrix_crypto_init_test.go"
|
||||
file_path: "functions/infra/matrix/matrix_crypto_init.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
+8
-8
@@ -1,6 +1,6 @@
|
||||
//go:build goolm || libolm
|
||||
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -72,10 +72,10 @@ func newSynapseMock(t *testing.T, uploadStatus int) *httptest.Server {
|
||||
mux.HandleFunc("/_matrix/client/v3/keys/query", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
resp := map[string]any{
|
||||
"device_keys": map[string]any{},
|
||||
"failures": map[string]any{},
|
||||
"master_keys": map[string]any{},
|
||||
"user_signing_keys": map[string]any{},
|
||||
"device_keys": map[string]any{},
|
||||
"failures": map[string]any{},
|
||||
"master_keys": map[string]any{},
|
||||
"user_signing_keys": map[string]any{},
|
||||
"self_signing_keys": map[string]any{},
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
@@ -85,9 +85,9 @@ func newSynapseMock(t *testing.T, uploadStatus int) *httptest.Server {
|
||||
mux.HandleFunc("/_matrix/client/v3/sync", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
resp := map[string]any{
|
||||
"next_batch": "s0_1",
|
||||
"rooms": map[string]any{},
|
||||
"to_device": map[string]any{"events": []any{}},
|
||||
"next_batch": "s0_1",
|
||||
"rooms": map[string]any{},
|
||||
"to_device": map[string]any{"events": []any{}},
|
||||
"device_one_time_keys_count": map[string]any{},
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
+2
-2
@@ -53,8 +53,8 @@ tests:
|
||||
- "SendReply client nil devuelve error"
|
||||
- "EditMessage client nil devuelve error"
|
||||
- "SendReaction client nil devuelve error"
|
||||
test_file_path: "functions/infra/matrix_message_send_test.go"
|
||||
file_path: "functions/infra/matrix_message_send.go"
|
||||
test_file_path: "functions/infra/matrix/matrix_message_send_test.go"
|
||||
file_path: "functions/infra/matrix/matrix_message_send.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -23,8 +23,8 @@ type RoomSummary struct {
|
||||
IsSpace bool `json:"is_space"` // m.room.type == m.space
|
||||
IsEncrypted bool `json:"is_encrypted"` // m.room.encryption state event presente
|
||||
MemberCount int `json:"member_count"`
|
||||
LastEventTs int64 `json:"last_event_ts"` // unix ms del ultimo evento conocido
|
||||
UnreadCount int `json:"unread_count"` // notifications.unread + highlight
|
||||
LastEventTs int64 `json:"last_event_ts"` // unix ms del ultimo evento conocido
|
||||
UnreadCount int `json:"unread_count"` // notifications.unread + highlight
|
||||
Tags []string `json:"tags,omitempty"` // m.tag account_data
|
||||
}
|
||||
|
||||
@@ -30,8 +30,8 @@ tests:
|
||||
- "IsDirect set correctamente segun m.direct"
|
||||
- "IsEncrypted set segun presencia de m.room.encryption"
|
||||
- "client nil devuelve error"
|
||||
test_file_path: "functions/infra/matrix_room_list_test.go"
|
||||
file_path: "functions/infra/matrix_room_list.go"
|
||||
test_file_path: "functions/infra/matrix/matrix_room_list_test.go"
|
||||
file_path: "functions/infra/matrix/matrix_room_list.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
+4
-4
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -18,9 +18,9 @@ import (
|
||||
// los handlers lo decodifican antes de hacer lookup.
|
||||
type matrixTestServer struct {
|
||||
*httptest.Server
|
||||
joinedRooms []string // room IDs que devuelve /joined_rooms
|
||||
roomNames map[string]string // roomID -> name (no seteado = sin m.room.name)
|
||||
encryptedRooms map[string]bool // roomID -> tiene encryption event
|
||||
joinedRooms []string // room IDs que devuelve /joined_rooms
|
||||
roomNames map[string]string // roomID -> name (no seteado = sin m.room.name)
|
||||
encryptedRooms map[string]bool // roomID -> tiene encryption event
|
||||
directContent map[string][]string // userID -> []roomID
|
||||
roomTags map[string][]string // roomID -> []tag names
|
||||
}
|
||||
+1
-1
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"context"
|
||||
+2
-2
@@ -36,8 +36,8 @@ tests:
|
||||
- "Error401NoExit"
|
||||
- "StopIdempotente"
|
||||
- "CtxCancelCierraChannels"
|
||||
test_file_path: "functions/infra/matrix_sync_service_test.go"
|
||||
file_path: "functions/infra/matrix_sync_service.go"
|
||||
test_file_path: "functions/infra/matrix/matrix_sync_service_test.go"
|
||||
file_path: "functions/infra/matrix/matrix_sync_service.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
+2
-2
@@ -1,6 +1,6 @@
|
||||
//go:build goolm
|
||||
|
||||
package infra
|
||||
package matrix
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -163,7 +163,7 @@ func TestMatrixSyncService_BackoffRecovery(t *testing.T) {
|
||||
cli := newTestSyncClient(t, srv.URL)
|
||||
h, err := MatrixSyncService(context.Background(), MatrixSyncServiceConfig{
|
||||
Client: cli,
|
||||
InitialBackoffMS: 50, // backoff corto para tests
|
||||
InitialBackoffMS: 50, // backoff corto para tests
|
||||
MaxBackoffMS: 200,
|
||||
ChannelBuffer: 16,
|
||||
})
|
||||
@@ -1,4 +1,4 @@
|
||||
package infra
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
@@ -31,7 +31,7 @@ output: "conexion sql.DB abierta a PostgreSQL"
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "functions/infra/postgres_open.go"
|
||||
file_path: "functions/infra/postgres/postgres_open.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
@@ -2,6 +2,7 @@ package infra
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
@@ -210,9 +211,16 @@ func TestSSEHandler(t *testing.T) {
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
buf := make([]byte, 4096)
|
||||
n, _ := resp.Body.Read(buf)
|
||||
body := string(buf[:n])
|
||||
// Leer el body completo hasta EOF. El canal se cierra antes de la
|
||||
// peticion, asi que el handler envia ambos eventos y termina, cerrando
|
||||
// el stream. Un unico Read podria devolver solo el primer chunk
|
||||
// (event: first), porque io.Reader.Read no garantiza llenar el buffer;
|
||||
// io.ReadAll consume todos los chunks emitidos por el handler.
|
||||
raw, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("read body failed: %v", err)
|
||||
}
|
||||
body := string(raw)
|
||||
|
||||
for _, want := range []string{"event: first", "event: second", "data: 1", "data: 2"} {
|
||||
if !strings.Contains(body, want) {
|
||||
|
||||
Reference in New Issue
Block a user