Files
fn_registry/dev/issues/0097-data-factory-app.md

128 lines
6.0 KiB
Markdown

---
id: "0097"
title: "data_factory app (Factorio-style data pipeline)"
status: pendiente
type: feature
domain:
- data-ingest
scope: app-scoped
priority: alta
depends: []
blocks: []
related: []
created: 2026-05-17
updated: 2026-05-17
tags: []
---
# 0097 — data_factory app (Factorio-style data pipeline)
**Status:** pendiente
**Created:** 2026-05-15
**Type:** app
**Priority:** alta
**Depends:** 0096 (apps/ standard) — DONE
**Blocks:** 0098 (function dep tree) — diferido tras 0097
## Problema
No hay vista unificada del flujo de datos. Hoy:
- `dag_engine_ui` muestra DAGs corriendo (orquestacion).
- `registry_dashboard` muestra codigo (funciones, dependencias).
- Pero NO hay: que datos extraigo, donde acaban, que volumen tienen, lineage end-to-end.
User analogia: factoria Factorio. Extractors (drills), Transformers (assemblers), Databases (chests), Sinks (rocket silo), Belts (lineage).
## Objetivo v1
App C++ ImGui standalone `apps/data_factory/` que muestra:
1. **Extractors** (funciones tag `extractor`) — tabla: name, source, last_run, rows/min, kb/min, success_rate, schedule. Click -> detalle + sample rows.
2. **Transformers** (tag `transformer`) — tabla con inputs/outputs, drop_ratio, last_run.
3. **Databases** — tabla: kind (sqlite/postgres/bq/parquet/mongo), uri, table_count, size, freshness.
4. **Sinks** (tag `sink`) — Metabase cards, FastAPI endpoints, alerts.
5. **Health** — KPIs derivados de runs: rows_24h, success_rate, freshness_alerts.
6. **Map (placeholder v1)** — lista plana de nodes + connections. Grafo visual sale en v2 (no en 0097).
## Fuera de v1
- **Map grafico con `imgui_node_editor`** — v2.
- **Models tab (trainers ML)** — v2.
- **Integracion automatica con dag_engine** — manual en v1 (usuario decide schedules en DAGs).
- **Multi-tenant** — un PC, un `data_factory.db`.
## Arquitectura
```
data_factory.db (per-PC, en apps/data_factory/)
├── nodes (kind=extractor|transformer|database|sink, function_id FK registry, config_json)
├── connections (src_node, dst_node, payload_schema)
├── runs (node_id, started_at, finished_at, status, rows_in, rows_out, kb_in, kb_out, error)
└── databases (kind, uri, label)
sqlite_api (anade DataFactoryHub WS)
└── /api/ws/datafactory (replica patron CallMonitorHub + DagRunHub)
C++ ImGui (apps/data_factory/)
├── data_http.{cpp,h} (REST a sqlite_api)
├── ws_client.{cpp,h} (reusado de dag_engine_ui)
├── tabs.{cpp,h} (Extractors, Transformers, Databases, Sinks, Health, Map)
└── main.cpp (fn::run_app + cfg.panels)
```
## Fases
| Fase | Que entra |
|---|---|
| **A. Schema + migrations** | `apps/data_factory/migrations/001_init.sql` con 4 tablas. Idempotente (embed.FS). |
| **B. Tags registry** | Audit + asignar tags `extractor/transformer/sink` a funciones existentes. Target: >=3 funciones por tag (capability group threshold). |
| **C. Scaffold app C++** | `fn run init_cpp_app data_factory --desc "Factorio-style data pipeline factory"`. apps/data_factory/. |
| **D. DataFactoryHub** | `projects/fn_monitoring/apps/sqlite_api/datafactory_events.go` — clon de CallMonitorHub. Endpoint `/api/ws/datafactory`. |
| **E. Wrappers `data_factory_record_run`** | Pipeline Python: envuelve `fn run <id>` capturando rows/kb/duration -> INSERT en `runs` table. Reusa `subprocess` + `psutil`. |
| **F. Tabs Extractors+Transformers+Databases+Sinks** | Reusa `data_table_cpp_viz`. Lista filtrable por tag. Click row -> panel lateral con detalle (FnInfo pattern de dag_engine_ui). |
| **G. Health tab** | KPIs derivados de runs (rows_24h, success_rate, freshness). Reusa pattern dag_engine_ui health. |
| **H. e2e_checks + deploy** | build_cmake, binary_exists, self-test (HTTP probe a sqlite_api), cpp_apps_conformance. `redeploy_cpp_app_windows`. |
## Funciones nuevas a delegar (estimacion)
| ID | Lang | Que hace |
|---|---|---|
| `data_factory_record_run_py_pipelines` | py pipeline | Wrappea `fn run <id> [args]`. Captura rows (parse stdout) + kb (size of output) + duration. INSERT en data_factory.db.runs. |
| `data_factory_list_nodes_go_infra` | go infra | Lista nodes desde data_factory.db filtrado por kind. |
| `data_factory_open_db_go_infra` | go infra | Abre data_factory.db con embed.FS migrations. |
Tags nuevos: `extractor`, `transformer`, `sink`, `database`, `validator` (no v1).
## Aceptacion v1
- `apps/data_factory/` existe con scaffolding canonico (cumple cpp/PATTERNS.md).
- `data_factory.db` se crea al primer run con tablas init.
- `sqlite_api` expone `/api/ws/datafactory` y `GET /api/datafactory/nodes`.
- Al menos 3 funciones tageadas `extractor` y 3 `transformer` (mover de existentes).
- App C++ arranca, conecta sqlite_api, muestra tabs Extractors/Transformers/Databases/Sinks/Health.
- `fn doctor cpp-apps` clean.
- `fn doctor uses-functions` clean para data_factory.
- Build Linux + Windows pass.
- e2e_check `self_test` exit 0.
## No-objetivos
- Visual lineage graph (deferido a 0098 o v2).
- Edicion de DAGs/extractors desde la UI (solo lectura + run-now).
- ML trainers (v2).
- Sync multi-PC del `data_factory.db` (igual que dag_engine.db: per-PC).
## Riesgos
| Riesgo | Mitigacion |
|---|---|
| Tags `extractor`/etc se solapan con tags existentes (`metabase`, `bigquery`) | Aceptado: una funcion puede tener varios tags. Tag flat, no jerarquia. |
| Asignar tags rompe FTS o consumers existentes | Tags son aditivos, no destructivos. `fn index` valida. |
| Wrapper Python `data_factory_record_run` interfiere con telemetria call_monitor (issue 0085) | Telemetria PostToolUse sigue activa. Wrapper escribe a tabla distinta (data_factory.runs vs call_monitor.calls). Cero conflicto. |
| Schema `data_factory.db` cambia mucho en v1 | Migrations aditivas estrictamente (regla db_migrations.md). Si cambio tipo: branch-by-abstraction. |
## Telemetria objetivo
- `data_factory` aparece en `pc_locations` activo.
- 3+ funciones nuevas creadas + usadas en mismo turno (capability-growth ratio bueno).
- runs persisten en data_factory.db; visibles desde tab Health en <500ms tras `fn run <extractor>`.