Compare commits
2 Commits
4461875b18
...
d8db05e9c9
| Author | SHA1 | Date | |
|---|---|---|---|
| d8db05e9c9 | |||
| e22c33ee6d |
@@ -16,7 +16,7 @@ Por defecto el systemd unit apunta a `apps/dag_engine/dags/`. Para usar otro dir
|
|||||||
|
|
||||||
```ini
|
```ini
|
||||||
ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \
|
ExecStart=/home/lucas/fn_registry/apps/dag_engine/dag_engine server \
|
||||||
--port 8090 \
|
--port 4200 \
|
||||||
--dags-dir /home/lucas/fn_registry/apps/dag_engine/dags \
|
--dags-dir /home/lucas/fn_registry/apps/dag_engine/dags \
|
||||||
--db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \
|
--db /home/lucas/fn_registry/apps/dag_engine/dag_engine.db \
|
||||||
--scheduler
|
--scheduler
|
||||||
@@ -52,12 +52,12 @@ systemctl --user restart dag_engine.service
|
|||||||
Busca la linea `[scheduler] ticker started for <nombre> (<cron>)` en los logs.
|
Busca la linea `[scheduler] ticker started for <nombre> (<cron>)` en los logs.
|
||||||
5. **Verificar en frontend**:
|
5. **Verificar en frontend**:
|
||||||
- C++ ImGui: panel `DAGs` muestra el nuevo DAG. Pulsa `Refresh` si no aparece.
|
- C++ ImGui: panel `DAGs` muestra el nuevo DAG. Pulsa `Refresh` si no aparece.
|
||||||
- Web: `http://localhost:8090`.
|
- Web: `http://localhost:4200`.
|
||||||
|
|
||||||
### Disparo manual desde curl o frontend
|
### Disparo manual desde curl o frontend
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -X POST http://127.0.0.1:8090/api/dags/<nombre>/run
|
curl -X POST http://127.0.0.1:4200/api/dags/<nombre>/run
|
||||||
```
|
```
|
||||||
|
|
||||||
Devuelve `{"dag":"<nombre>","run_id":"...","status":"accepted"}` y dispara el WS broadcast — los frontends ven la run en `<1s`.
|
Devuelve `{"dag":"<nombre>","run_id":"...","status":"accepted"}` y dispara el WS broadcast — los frontends ven la run en `<1s`.
|
||||||
@@ -223,7 +223,7 @@ Flags del `server`:
|
|||||||
|
|
||||||
| Flag | Default | Que |
|
| Flag | Default | Que |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
| `--port` | 8090 | Puerto HTTP. |
|
| `--port` | 4200 | Puerto HTTP. |
|
||||||
| `--dags-dir` | `apps/dag_engine/dags` (via systemd unit) | Dir scaneado para YAMLs. |
|
| `--dags-dir` | `apps/dag_engine/dags` (via systemd unit) | Dir scaneado para YAMLs. |
|
||||||
| `--db` | `dag_engine.db` | SQLite con `dag_runs` + `dag_step_results`. |
|
| `--db` | `dag_engine.db` | SQLite con `dag_runs` + `dag_step_results`. |
|
||||||
| `--scheduler` | false | Si presente, arranca cron tickers automaticamente. |
|
| `--scheduler` | false | Si presente, arranca cron tickers automaticamente. |
|
||||||
@@ -287,7 +287,7 @@ Flags del `server`:
|
|||||||
Debes ver `[scheduler] ticker started for <name> (<cron>), next: <ISO8601>`.
|
Debes ver `[scheduler] ticker started for <name> (<cron>), next: <ISO8601>`.
|
||||||
3. Si `next:` es muy lejano (ej. en una semana) y necesitas probar -> dispara manual:
|
3. Si `next:` es muy lejano (ej. en una semana) y necesitas probar -> dispara manual:
|
||||||
```bash
|
```bash
|
||||||
curl -X POST http://127.0.0.1:8090/api/dags/<name>/run
|
curl -X POST http://127.0.0.1:4200/api/dags/<name>/run
|
||||||
```
|
```
|
||||||
4. Hora del sistema descalibrada:
|
4. Hora del sistema descalibrada:
|
||||||
```bash
|
```bash
|
||||||
@@ -302,7 +302,7 @@ Flags del `server`:
|
|||||||
| Causa | Fix |
|
| Causa | Fix |
|
||||||
|---|---|
|
|---|---|
|
||||||
| Servidor caido | `systemctl --user status dag_engine.service`, `restart` si `inactive`. |
|
| Servidor caido | `systemctl --user status dag_engine.service`, `restart` si `inactive`. |
|
||||||
| Puerto cambiado | El cliente apunta a `127.0.0.1:8090` por codigo (constante `g_ws_port`). Reedificar si cambiaste el puerto del server. |
|
| Puerto cambiado | El cliente apunta a `127.0.0.1:4200` por codigo (constante `g_ws_port`). Reedificar si cambiaste el puerto del server. |
|
||||||
| Firewall Windows -> WSL | WSL2 expone `localhost`, normalmente OK. Si falla: `wsl --shutdown` y reabrir. |
|
| Firewall Windows -> WSL | WSL2 expone `localhost`, normalmente OK. Si falla: `wsl --shutdown` y reabrir. |
|
||||||
|
|
||||||
### 5.6. Cleanup de runs viejos
|
### 5.6. Cleanup de runs viejos
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
name: dag_engine
|
name: dag_engine
|
||||||
lang: go
|
lang: go
|
||||||
domain: infra
|
domain: infra
|
||||||
version: 0.1.0
|
version: 0.2.0
|
||||||
description: "Motor de ejecucion de DAGs del fn_registry: CLI + servidor HTTP + scheduler cron. Schema YAML propio con `function:` para invocar funciones del registry (`fn run <id>`) y `command:` para shell. Historial en SQLite. Scheduler oficial del ecosistema."
|
description: "Motor de ejecucion de DAGs del fn_registry: CLI + servidor HTTP + scheduler cron. Schema YAML propio con `function:` para invocar funciones del registry (`fn run <id>`) y `command:` para shell. Historial en SQLite. Scheduler oficial del ecosistema."
|
||||||
tags: [service, dag, workflow, scheduler, web, cron]
|
tags: [service, dag, workflow, scheduler, web, cron]
|
||||||
uses_functions:
|
uses_functions:
|
||||||
@@ -29,7 +29,7 @@ framework: "net/http + vite + react"
|
|||||||
entry_point: "main.go"
|
entry_point: "main.go"
|
||||||
dir_path: "apps/dag_engine"
|
dir_path: "apps/dag_engine"
|
||||||
service:
|
service:
|
||||||
port: 8090
|
port: 4200
|
||||||
health_endpoint: /api/dags
|
health_endpoint: /api/dags
|
||||||
health_timeout_s: 3
|
health_timeout_s: 3
|
||||||
systemd_unit: dag_engine.service
|
systemd_unit: dag_engine.service
|
||||||
@@ -88,13 +88,13 @@ cd .. && CGO_ENABLED=1 go build -tags fts5 -o dag-engine .
|
|||||||
./dag-engine list apps/dag_engine/dags/
|
./dag-engine list apps/dag_engine/dags/
|
||||||
|
|
||||||
# Servidor web (production: gestionado por dag_engine.service systemd user unit)
|
# Servidor web (production: gestionado por dag_engine.service systemd user unit)
|
||||||
./dag-engine server --port 8090 --dags-dir apps/dag_engine/dags/ --scheduler
|
./dag-engine server --port 4200 --dags-dir apps/dag_engine/dags/ --scheduler
|
||||||
# Browser: http://localhost:8090
|
# Browser: http://localhost:4200
|
||||||
```
|
```
|
||||||
|
|
||||||
## Notas
|
## Notas
|
||||||
|
|
||||||
Schema YAML propio (ver `README.md` seccion 3 + ejemplos en `dags/`). Steps tipo `function:` invocan `fn run <id>` y propagan `function_id` a `dag_step_results` para el bucle reactivo. Puerto default 8090.
|
Schema YAML propio (ver `README.md` seccion 3 + ejemplos en `dags/`). Steps tipo `function:` invocan `fn run <id>` y propagan `function_id` a `dag_step_results` para el bucle reactivo. Puerto default 4200.
|
||||||
|
|
||||||
### 2026-05-16 — Fix function-not-found en steps `function:` + panel Logs en RunDetail `[done]`
|
### 2026-05-16 — Fix function-not-found en steps `function:` + panel Logs en RunDetail `[done]`
|
||||||
|
|
||||||
@@ -141,3 +141,4 @@ Una linea por bump SemVer. Bump-type segun `.claude/commands/version.md`:
|
|||||||
- `patch`: bugfix sin cambio observable.
|
- `patch`: bugfix sin cambio observable.
|
||||||
|
|
||||||
- v0.1.0 (2026-05-18) — baseline.
|
- v0.1.0 (2026-05-18) — baseline.
|
||||||
|
- v0.2.0 (2026-06-02) — minor: limpieza de la herencia `dagu` (renombrado `DAGU_ENV`→`FN_DAG_ENV`, directorio `dags_migrated/`→`dags/`, eliminado DAGs legacy/ejemplo), historial de ejecuciones reseteado, frontend reescrito con el estilo fn (tema indigo + radius md + `FnProvider` con `@mantine/notifications`, fix de la API `Collapse in`→`expanded` de Mantine 9.2.1), daemon systemd-user sirviendo React + API en el puerto 4200, y reduccion del binario de ~72MB a ~10MB separando los drivers pesados (duckdb/clickhouse/postgres/matrix/keyring) del paquete `functions/infra` a subpaquetes propios. `go.mod` replace ahora relativo (`../..`).
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ export default defineConfig({
|
|||||||
server: {
|
server: {
|
||||||
port: 5175,
|
port: 5175,
|
||||||
proxy: {
|
proxy: {
|
||||||
"/api": "http://localhost:8090",
|
"/api": "http://localhost:4200",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
build: {
|
build: {
|
||||||
|
|||||||
@@ -1,6 +1,4 @@
|
|||||||
//go:build !noclickhouse
|
package clickhouse
|
||||||
|
|
||||||
package infra
|
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
@@ -29,7 +29,7 @@ output: "conexion sql.DB abierta a ClickHouse con ping verificado"
|
|||||||
tested: false
|
tested: false
|
||||||
tests: []
|
tests: []
|
||||||
test_file_path: ""
|
test_file_path: ""
|
||||||
file_path: "functions/infra/clickhouse_open.go"
|
file_path: "functions/infra/clickhouse/clickhouse_open.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
@@ -1,6 +1,4 @@
|
|||||||
//go:build !noduckdb
|
package duckdb
|
||||||
|
|
||||||
package infra
|
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
@@ -21,7 +21,7 @@ output: "conexion sql.DB abierta a DuckDB"
|
|||||||
tested: false
|
tested: false
|
||||||
tests: []
|
tests: []
|
||||||
test_file_path: ""
|
test_file_path: ""
|
||||||
file_path: "functions/infra/duckdb_open.go"
|
file_path: "functions/infra/duckdb/duckdb_open.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
+2
-2
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package keyring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@@ -20,7 +20,7 @@ type Token struct {
|
|||||||
UserID string `json:"user_id"`
|
UserID string `json:"user_id"`
|
||||||
DeviceID string `json:"device_id,omitempty"`
|
DeviceID string `json:"device_id,omitempty"`
|
||||||
HomeserverURL string `json:"homeserver_url"`
|
HomeserverURL string `json:"homeserver_url"`
|
||||||
Issuer string `json:"issuer,omitempty"` // MAS/OIDC issuer URL
|
Issuer string `json:"issuer,omitempty"` // MAS/OIDC issuer URL
|
||||||
ClientID string `json:"client_id,omitempty"` // MAS client_id used
|
ClientID string `json:"client_id,omitempty"` // MAS client_id used
|
||||||
}
|
}
|
||||||
|
|
||||||
+2
-2
@@ -54,8 +54,8 @@ tests:
|
|||||||
- "Save then Delete then Load returns ErrNotFound"
|
- "Save then Delete then Load returns ErrNotFound"
|
||||||
- "Delete nonexistent is idempotent"
|
- "Delete nonexistent is idempotent"
|
||||||
- "Save twice overwrites with second token"
|
- "Save twice overwrites with second token"
|
||||||
test_file_path: "functions/infra/keyring_token_store_test.go"
|
test_file_path: "functions/infra/keyring/keyring_token_store_test.go"
|
||||||
file_path: "functions/infra/keyring_token_store.go"
|
file_path: "functions/infra/keyring/keyring_token_store.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
+1
-1
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package keyring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
+1
-1
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
+2
-2
@@ -38,8 +38,8 @@ tests:
|
|||||||
- "Whoami 401 token invalido"
|
- "Whoami 401 token invalido"
|
||||||
- "EnableCrypto true devuelve error not implemented"
|
- "EnableCrypto true devuelve error not implemented"
|
||||||
- "StoreDir se crea con permisos 0700"
|
- "StoreDir se crea con permisos 0700"
|
||||||
test_file_path: "functions/infra/matrix_client_init_test.go"
|
test_file_path: "functions/infra/matrix/matrix_client_init_test.go"
|
||||||
file_path: "functions/infra/matrix_client_init.go"
|
file_path: "functions/infra/matrix/matrix_client_init.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
+1
-1
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
//go:build goolm || libolm
|
//go:build goolm || libolm
|
||||||
|
|
||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
+2
-2
@@ -38,8 +38,8 @@ tests:
|
|||||||
- "directorio del store se crea con permisos 0700"
|
- "directorio del store se crea con permisos 0700"
|
||||||
- "input valido Init exito helper no nil"
|
- "input valido Init exito helper no nil"
|
||||||
- "Synapse 401 en keys upload devuelve error"
|
- "Synapse 401 en keys upload devuelve error"
|
||||||
test_file_path: "functions/infra/matrix_crypto_init_test.go"
|
test_file_path: "functions/infra/matrix/matrix_crypto_init_test.go"
|
||||||
file_path: "functions/infra/matrix_crypto_init.go"
|
file_path: "functions/infra/matrix/matrix_crypto_init.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
+8
-8
@@ -1,6 +1,6 @@
|
|||||||
//go:build goolm || libolm
|
//go:build goolm || libolm
|
||||||
|
|
||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -72,10 +72,10 @@ func newSynapseMock(t *testing.T, uploadStatus int) *httptest.Server {
|
|||||||
mux.HandleFunc("/_matrix/client/v3/keys/query", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/_matrix/client/v3/keys/query", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
resp := map[string]any{
|
resp := map[string]any{
|
||||||
"device_keys": map[string]any{},
|
"device_keys": map[string]any{},
|
||||||
"failures": map[string]any{},
|
"failures": map[string]any{},
|
||||||
"master_keys": map[string]any{},
|
"master_keys": map[string]any{},
|
||||||
"user_signing_keys": map[string]any{},
|
"user_signing_keys": map[string]any{},
|
||||||
"self_signing_keys": map[string]any{},
|
"self_signing_keys": map[string]any{},
|
||||||
}
|
}
|
||||||
_ = json.NewEncoder(w).Encode(resp)
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
@@ -85,9 +85,9 @@ func newSynapseMock(t *testing.T, uploadStatus int) *httptest.Server {
|
|||||||
mux.HandleFunc("/_matrix/client/v3/sync", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/_matrix/client/v3/sync", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
resp := map[string]any{
|
resp := map[string]any{
|
||||||
"next_batch": "s0_1",
|
"next_batch": "s0_1",
|
||||||
"rooms": map[string]any{},
|
"rooms": map[string]any{},
|
||||||
"to_device": map[string]any{"events": []any{}},
|
"to_device": map[string]any{"events": []any{}},
|
||||||
"device_one_time_keys_count": map[string]any{},
|
"device_one_time_keys_count": map[string]any{},
|
||||||
}
|
}
|
||||||
_ = json.NewEncoder(w).Encode(resp)
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
+1
-1
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
+2
-2
@@ -53,8 +53,8 @@ tests:
|
|||||||
- "SendReply client nil devuelve error"
|
- "SendReply client nil devuelve error"
|
||||||
- "EditMessage client nil devuelve error"
|
- "EditMessage client nil devuelve error"
|
||||||
- "SendReaction client nil devuelve error"
|
- "SendReaction client nil devuelve error"
|
||||||
test_file_path: "functions/infra/matrix_message_send_test.go"
|
test_file_path: "functions/infra/matrix/matrix_message_send_test.go"
|
||||||
file_path: "functions/infra/matrix_message_send.go"
|
file_path: "functions/infra/matrix/matrix_message_send.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
+1
-1
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -23,8 +23,8 @@ type RoomSummary struct {
|
|||||||
IsSpace bool `json:"is_space"` // m.room.type == m.space
|
IsSpace bool `json:"is_space"` // m.room.type == m.space
|
||||||
IsEncrypted bool `json:"is_encrypted"` // m.room.encryption state event presente
|
IsEncrypted bool `json:"is_encrypted"` // m.room.encryption state event presente
|
||||||
MemberCount int `json:"member_count"`
|
MemberCount int `json:"member_count"`
|
||||||
LastEventTs int64 `json:"last_event_ts"` // unix ms del ultimo evento conocido
|
LastEventTs int64 `json:"last_event_ts"` // unix ms del ultimo evento conocido
|
||||||
UnreadCount int `json:"unread_count"` // notifications.unread + highlight
|
UnreadCount int `json:"unread_count"` // notifications.unread + highlight
|
||||||
Tags []string `json:"tags,omitempty"` // m.tag account_data
|
Tags []string `json:"tags,omitempty"` // m.tag account_data
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -30,8 +30,8 @@ tests:
|
|||||||
- "IsDirect set correctamente segun m.direct"
|
- "IsDirect set correctamente segun m.direct"
|
||||||
- "IsEncrypted set segun presencia de m.room.encryption"
|
- "IsEncrypted set segun presencia de m.room.encryption"
|
||||||
- "client nil devuelve error"
|
- "client nil devuelve error"
|
||||||
test_file_path: "functions/infra/matrix_room_list_test.go"
|
test_file_path: "functions/infra/matrix/matrix_room_list_test.go"
|
||||||
file_path: "functions/infra/matrix_room_list.go"
|
file_path: "functions/infra/matrix/matrix_room_list.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
+4
-4
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -18,9 +18,9 @@ import (
|
|||||||
// los handlers lo decodifican antes de hacer lookup.
|
// los handlers lo decodifican antes de hacer lookup.
|
||||||
type matrixTestServer struct {
|
type matrixTestServer struct {
|
||||||
*httptest.Server
|
*httptest.Server
|
||||||
joinedRooms []string // room IDs que devuelve /joined_rooms
|
joinedRooms []string // room IDs que devuelve /joined_rooms
|
||||||
roomNames map[string]string // roomID -> name (no seteado = sin m.room.name)
|
roomNames map[string]string // roomID -> name (no seteado = sin m.room.name)
|
||||||
encryptedRooms map[string]bool // roomID -> tiene encryption event
|
encryptedRooms map[string]bool // roomID -> tiene encryption event
|
||||||
directContent map[string][]string // userID -> []roomID
|
directContent map[string][]string // userID -> []roomID
|
||||||
roomTags map[string][]string // roomID -> []tag names
|
roomTags map[string][]string // roomID -> []tag names
|
||||||
}
|
}
|
||||||
+1
-1
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
+2
-2
@@ -36,8 +36,8 @@ tests:
|
|||||||
- "Error401NoExit"
|
- "Error401NoExit"
|
||||||
- "StopIdempotente"
|
- "StopIdempotente"
|
||||||
- "CtxCancelCierraChannels"
|
- "CtxCancelCierraChannels"
|
||||||
test_file_path: "functions/infra/matrix_sync_service_test.go"
|
test_file_path: "functions/infra/matrix/matrix_sync_service_test.go"
|
||||||
file_path: "functions/infra/matrix_sync_service.go"
|
file_path: "functions/infra/matrix/matrix_sync_service.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
+2
-2
@@ -1,6 +1,6 @@
|
|||||||
//go:build goolm
|
//go:build goolm
|
||||||
|
|
||||||
package infra
|
package matrix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -163,7 +163,7 @@ func TestMatrixSyncService_BackoffRecovery(t *testing.T) {
|
|||||||
cli := newTestSyncClient(t, srv.URL)
|
cli := newTestSyncClient(t, srv.URL)
|
||||||
h, err := MatrixSyncService(context.Background(), MatrixSyncServiceConfig{
|
h, err := MatrixSyncService(context.Background(), MatrixSyncServiceConfig{
|
||||||
Client: cli,
|
Client: cli,
|
||||||
InitialBackoffMS: 50, // backoff corto para tests
|
InitialBackoffMS: 50, // backoff corto para tests
|
||||||
MaxBackoffMS: 200,
|
MaxBackoffMS: 200,
|
||||||
ChannelBuffer: 16,
|
ChannelBuffer: 16,
|
||||||
})
|
})
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package infra
|
package postgres
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
@@ -31,7 +31,7 @@ output: "conexion sql.DB abierta a PostgreSQL"
|
|||||||
tested: false
|
tested: false
|
||||||
tests: []
|
tests: []
|
||||||
test_file_path: ""
|
test_file_path: ""
|
||||||
file_path: "functions/infra/postgres_open.go"
|
file_path: "functions/infra/postgres/postgres_open.go"
|
||||||
---
|
---
|
||||||
|
|
||||||
## Ejemplo
|
## Ejemplo
|
||||||
@@ -2,6 +2,7 @@ package infra
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -210,9 +211,16 @@ func TestSSEHandler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
buf := make([]byte, 4096)
|
// Leer el body completo hasta EOF. El canal se cierra antes de la
|
||||||
n, _ := resp.Body.Read(buf)
|
// peticion, asi que el handler envia ambos eventos y termina, cerrando
|
||||||
body := string(buf[:n])
|
// el stream. Un unico Read podria devolver solo el primer chunk
|
||||||
|
// (event: first), porque io.Reader.Read no garantiza llenar el buffer;
|
||||||
|
// io.ReadAll consume todos los chunks emitidos por el handler.
|
||||||
|
raw, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("read body failed: %v", err)
|
||||||
|
}
|
||||||
|
body := string(raw)
|
||||||
|
|
||||||
for _, want := range []string{"event: first", "event: second", "data: 1", "data: 2"} {
|
for _, want := range []string{"event: first", "event: second", "data: 1", "data: 2"} {
|
||||||
if !strings.Contains(body, want) {
|
if !strings.Contains(body, want) {
|
||||||
|
|||||||
Reference in New Issue
Block a user