chore: añade directorio dev/ con issues y funciones implementadas

Tracking de issues completados (jupyter tools) y funciones implementadas (specs de diseño ya resueltas).
This commit is contained in:
2026-04-05 18:19:36 +02:00
parent 806c819cf7
commit a9cd28b010
64 changed files with 3680 additions and 0 deletions
@@ -0,0 +1,80 @@
# Parse Markdown — Suite de funciones
Fuente conceptual: OpenViking `openviking/parse/parsers/markdown.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. extract_frontmatter
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `extract_frontmatter(content: str) -> tuple[str, dict | None]`
- **Descripcion:** Extrae YAML frontmatter (delimitado por `---`) del inicio de un string markdown. Retorna el contenido sin frontmatter y el dict parseado (o None si no hay).
- **Algoritmo:**
1. Regex `^---\n(.*?)\n---\n` con DOTALL
2. Si match, parsear cada linea `key: value` (o usar `yaml.safe_load`)
3. Retornar (contenido_restante, dict_frontmatter)
- **Deps:** `re`, opcionalmente `yaml`
- **Tests:** contenido con frontmatter, sin frontmatter, frontmatter vacio, frontmatter con listas
### 2. find_headings
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `find_headings(content: str) -> list[tuple[int, int, str, int]]`
- **Descripcion:** Encuentra todos los headings markdown (# a ######), excluyendo los que estan dentro de code blocks (``` ... ```), HTML comments (<!-- ... -->) y bloques indentados.
- **Retorno:** Lista de `(start_pos, end_pos, title, level)`
- **Algoritmo:**
1. Recopilar rangos excluidos: code blocks (triple backtick), HTML comments, bloques indentados (4 espacios/tab)
2. Regex `^(#{1,6})\s+(.+)$` con MULTILINE
3. Filtrar matches cuya posicion cae en un rango excluido
4. Filtrar headings escapados (`\#`)
- **Deps:** `re`
- **Tests:** headings normales, headings dentro de code blocks (no deben detectarse), headings escapados, headings en HTML comments
### 3. smart_split_content
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `smart_split_content(content: str, max_tokens: int = 1024, max_chars: int = 8000) -> list[str]`
- **Descripcion:** Divide contenido grande en partes respetando limites de tokens y caracteres. Divide por parrafos (doble newline). Si un parrafo individual excede el limite, lo corta por caracteres.
- **Algoritmo:**
1. Split por `\n\n` (parrafos)
2. Acumular parrafos mientras no excedan max_tokens/max_chars
3. Si un parrafo individual excede, cortarlo en chunks de max_chars
4. Retornar lista de partes
- **Deps:** `re`
- **Tests:** contenido corto (1 parte), contenido largo, parrafo gigante que requiere forzar corte
### 4. estimate_token_count
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `estimate_token_count(content: str) -> int`
- **Descripcion:** Estimacion rapida de tokens sin tokenizer. CJK chars ~0.7 token/char, otros ~0.3 token/char.
- **Algoritmo:**
1. Contar chars CJK con regex `[\u4e00-\u9fff\u3040-\u30ff\uac00-\ud7af]`
2. Contar otros non-whitespace chars
3. Retornar `int(cjk * 0.7 + otros * 0.3)`
- **Deps:** `re`
- **Tests:** texto solo latin, texto solo CJK, texto mixto, texto vacio
### 5. sanitize_for_path
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `sanitize_for_path(text: str, max_length: int = 50) -> str`
- **Descripcion:** Convierte texto a nombre seguro para uso en paths. Remueve caracteres especiales, reemplaza espacios con `_`, trunca con hash suffix si excede max_length.
- **Algoritmo:**
1. Regex: mantener solo `\w`, CJK ranges, espacios y guiones
2. Reemplazar espacios por `_`, strip underscores
3. Si vacio, retornar "section"
4. Si excede max_length: truncar y anadir `_` + sha256[:8]
- **Deps:** `re`, `hashlib`
- **Tests:** texto normal, texto con caracteres especiales, texto muy largo, texto vacio, texto CJK
@@ -0,0 +1,74 @@
# Parse PDF to Markdown
Fuente conceptual: OpenViking `openviking/parse/parsers/pdf.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. pdf_to_markdown
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (I/O: lee archivo PDF)
- **Signature:** `pdf_to_markdown(pdf_path: str, heading_detection: str = "auto") -> tuple[str, dict]`
- **Descripcion:** Convierte un PDF a markdown. Extrae texto, tablas e inyecta headings detectados desde bookmarks o analisis de fuentes. Retorna (markdown_content, metadata_dict).
- **Algoritmo:**
1. Abrir PDF con pdfplumber
2. Detectar headings:
- `"bookmarks"`: extraer outlines/bookmarks del PDF
- `"font"`: analizar distribucion de font sizes para detectar headings
- `"auto"`: intentar bookmarks primero, fallback a font
3. Agrupar headings por pagina
4. Por cada pagina: inyectar headings como `# titulo`, extraer texto, extraer tablas como markdown
5. Unir todo con `\n\n`
- **Deps:** `pdfplumber`
- **Error type:** Exception (archivo no encontrado, PDF corrupto)
- **Tests:** PDF con bookmarks, PDF sin bookmarks, PDF con tablas, PDF vacio
### 2. extract_pdf_bookmarks
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (requiere objeto PDF abierto)
- **Signature:** `extract_pdf_bookmarks(pdf) -> list[dict]`
- **Descripcion:** Extrae la estructura de bookmarks/outlines de un PDF abierto con pdfplumber. Retorna lista de `{"level": int, "title": str, "page_num": int | None}`.
- **Algoritmo:**
1. Acceder a `pdf.doc.get_outlines()`
2. Construir mapping `objid -> page_number` desde pdf.pages
3. Resolver cada destino de outline a su numero de pagina
4. Limitar nivel a [1, 6]
- **Deps:** `pdfplumber`
- **Tests:** PDF con outlines, PDF sin outlines
### 3. detect_headings_by_font
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (requiere objeto PDF abierto)
- **Signature:** `detect_headings_by_font(pdf, min_delta: float = 2.0, max_levels: int = 4) -> list[dict]`
- **Descripcion:** Detecta headings analizando la distribucion de font sizes. El font size mas comun es el body; sizes significativamente mayores son headings.
- **Algoritmo:**
1. Samplear font sizes (cada 5ta pagina) → Counter
2. Determinar body_size (most_common)
3. Heading sizes: font sizes >= body_size + min_delta Y con frecuencia < 50% del body
4. Ordenar heading sizes desc → asignar niveles 1,2,3...
5. Recorrer todas las paginas, extraer texto de chars con heading size
6. Deduplicar: filtrar titulos que aparecen en >30% de paginas (son headers/footers)
- **Deps:** `pdfplumber`, `collections.Counter`
- **Tests:** PDF con headings claros por font, PDF con fuente uniforme (sin headings detectados)
### 4. format_table_markdown (reutilizable)
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `format_table_to_markdown(rows: list[list[str]], has_header: bool = True) -> str`
- **Descripcion:** Convierte una lista 2D de celdas a tabla markdown con alineacion de columnas.
- **Algoritmo:**
1. Calcular max width por columna
2. Formatear header row con `|`
3. Anadir separador `| --- | --- |`
4. Formatear data rows
5. Escapar pipes en celdas
- **Deps:** ninguna
- **Tests:** tabla normal, tabla con celdas vacias, tabla con 1 fila, tabla vacia, celdas con pipes
@@ -0,0 +1,66 @@
# Parse HTML to Markdown
Fuente conceptual: OpenViking `openviking/parse/parsers/html.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. html_to_markdown
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `html_to_markdown(html: str) -> str`
- **Descripcion:** Convierte HTML a markdown. Usa readabilipy para extraer contenido principal (filtra nav, ads, boilerplate), luego markdownify para convertir a markdown.
- **Algoritmo:**
1. Preprocesar HTML: manejar contenido oculto (ej: WeChat js_content), lazy loading images (data-src → src)
2. Extraer contenido principal con readabilipy (basado en Mozilla Readability)
3. Convertir a markdown con markdownify (headings ATX, strip script/style)
- **Deps:** `readabilipy`, `markdownify`, `beautifulsoup4`
- **Tests:** HTML con nav/footer (debe filtrarse), HTML limpio, HTML con imagenes lazy-loaded
### 2. detect_url_type
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (hace HTTP HEAD request)
- **Signature:** `detect_url_type(url: str, timeout: float = 10.0) -> tuple[str, dict]`
- **Descripcion:** Detecta el tipo de contenido de una URL. Retorna tipo ("webpage", "pdf", "markdown", "text", "code_repository") y metadata.
- **Algoritmo:**
1. Verificar patrones de repos de codigo (github.com/org/repo, git@...)
2. Verificar extension en URL (.pdf, .md, .txt, .html, .git)
3. Si no se determino: HTTP HEAD request → leer Content-Type header
4. Default: "webpage"
- **Deps:** `httpx`, `urllib.parse`
- **Error type:** Exception (timeout, network error)
- **Tests:** URL .pdf, URL github repo, URL webpage, URL con Content-Type custom
### 3. fetch_and_parse_url
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (I/O: HTTP request)
- **Signature:** `fetch_and_parse_url(url: str, timeout: float = 30.0) -> str`
- **Descripcion:** Descarga una pagina web y la convierte a markdown. Combina fetch HTML + html_to_markdown.
- **Algoritmo:**
1. Detectar tipo de URL con detect_url_type
2. Si webpage: fetch HTML con httpx, convertir a markdown
3. Si download link: descargar a archivo temporal, delegar al parser apropiado
4. Si code repository: delegar a parser de codigo
- **Deps:** `httpx`
- **Error type:** Exception
### 4. convert_github_to_raw_url
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `convert_github_to_raw_url(url: str) -> str`
- **Descripcion:** Convierte una URL de blob de GitHub/GitLab a su URL raw. Ej: `github.com/org/repo/blob/main/file.py``raw.githubusercontent.com/org/repo/main/file.py`
- **Algoritmo:**
1. Parsear URL
2. Si GitHub y path contiene `/blob/`: remover `/blob/` y cambiar dominio a raw.githubusercontent.com
3. Si GitLab y path contiene `/blob/`: reemplazar por `/raw/`
4. Si no aplica, retornar URL sin cambiar
- **Deps:** `urllib.parse`
- **Tests:** URL GitHub blob, URL GitLab blob, URL que no es blob, URL no-GitHub
@@ -0,0 +1,30 @@
# Parse DOCX to Markdown
Fuente conceptual: OpenViking `openviking/parse/parsers/word.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### docx_to_markdown
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (I/O: lee archivo .docx)
- **Signature:** `docx_to_markdown(docx_path: str) -> str`
- **Descripcion:** Convierte un documento Word (.docx) a markdown preservando estructura, formato y tablas en su posicion original.
- **Algoritmo:**
1. Abrir con python-docx
2. Construir mapa de tablas: `{element_xml: Table}` para lookup O(1)
3. Recorrer `doc.element.body` en orden (preserva posicion de tablas):
- Si es parrafo (`w:p`):
- Si estilo es Heading N: `{'#' * level} {text}`
- Si no: convertir runs con formato (bold→`**`, italic→`*`, underline→`<ins>`)
- Si es tabla (`w:tbl`): convertir filas a markdown table con `format_table_to_markdown`
4. Unir partes con `\n\n`
- **Deps:** `python-docx`
- **Error type:** Exception (archivo no encontrado, formato invalido)
- **Notas:**
- Las tablas deben aparecer en su posicion original, no al final
- Los heading levels se extraen del nombre del estilo ("Heading 1" → nivel 1)
- El formato inline (bold/italic/underline) se preserva
- **Tests:** docx con headings y parrafos, docx con tablas intercaladas, docx con formato bold/italic, docx vacio
@@ -0,0 +1,32 @@
# Parse Excel to Markdown
Fuente conceptual: OpenViking `openviking/parse/parsers/excel.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### excel_to_markdown
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (I/O: lee archivo)
- **Signature:** `excel_to_markdown(path: str, max_rows_per_sheet: int = 1000) -> str`
- **Descripcion:** Convierte un archivo Excel (.xlsx, .xls, .xlsm) a markdown con cada sheet como seccion H2.
- **Algoritmo:**
1. Si extension `.xls`: usar xlrd (legacy)
2. Si `.xlsx`/`.xlsm`: usar openpyxl
3. Para cada sheet:
- Header: `## Sheet: {name}`
- Metadata: `**Dimensions:** {rows} x {cols}`
- Truncar a max_rows_per_sheet
- Convertir filas a tabla markdown
- Si truncado: anadir nota de filas omitidas
4. Manejo de tipos de celda para xlrd:
- EMPTY/BLANK → ""
- DATE → formato ISO (con hora si no es 00:00:00)
- BOOLEAN → "TRUE"/"FALSE"
- ERROR → codigo de error Excel (#NULL!, #DIV/0!, etc.)
- NUMBER → entero si es entero, float si tiene decimales
- **Deps:** `openpyxl` (xlsx), `xlrd` (xls legacy)
- **Error type:** Exception
- **Tests:** xlsx con multiples sheets, xls legacy con fechas, sheet vacio, sheet con formulas (data_only), sheet truncado
@@ -0,0 +1,36 @@
# Parse EPUB to Markdown
Fuente conceptual: OpenViking `openviking/parse/parsers/epub.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### epub_to_markdown
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (I/O: lee archivo)
- **Signature:** `epub_to_markdown(epub_path: str) -> str`
- **Descripcion:** Convierte un ebook EPUB a markdown. Intenta ebooklib primero, fallback a extraccion manual con zipfile.
- **Algoritmo con ebooklib:**
1. Leer con `epub.read_epub(path)`
2. Extraer metadata: titulo, autor
3. Generar header: `# {titulo}` + `**Author:** {autor}`
4. Para cada ITEM_DOCUMENT: decodificar contenido HTML → convertir a markdown
5. Unir con `\n\n`
- **Algoritmo fallback manual (sin ebooklib):**
1. Abrir como ZIP
2. Listar archivos .html/.xhtml/.htm
3. Para cada uno: decodificar HTML → convertir a markdown basico
- **Conversion HTML basica a markdown:**
1. Remover `<script>` y `<style>` tags
2. `<h1>``# ...`, `<h2>``## ...`, etc.
3. `<strong>`/`<b>``**...**`, `<em>`/`<i>``*...*`
4. `<p>` → contenido + `\n\n`
5. `<br>``\n`
6. Remover tags HTML restantes
7. Unescape HTML entities (`&amp;``&`)
8. Normalizar whitespace
- **Deps:** `ebooklib` (opcional), `zipfile` (stdlib)
- **Error type:** Exception
- **Tests:** epub con ebooklib, epub sin ebooklib (manual), epub con metadata, epub corrupto
@@ -0,0 +1,61 @@
# Directory Scanner
Fuente conceptual: OpenViking `openviking/parse/directory_scan.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. scan_directory
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure (I/O: filesystem traversal)
- **Signature:** `scan_directory(root: str, supported_extensions: set[str] | None = None, ignore_dirs: set[str] | None = None, include: str | None = None, exclude: str | None = None, strict: bool = False) -> DirectoryScanResult`
- **Descripcion:** Recorre un arbol de directorios y clasifica cada archivo como procesable o no soportado. Util para validacion pre-importacion de directorios.
- **Tipos de retorno:**
```python
@dataclass
class ClassifiedFile:
path: str # path absoluto
rel_path: str # relativo a root, forward slashes
classification: str # "processable" | "unsupported"
@dataclass
class DirectoryScanResult:
root: str
processable: list[ClassifiedFile]
unsupported: list[ClassifiedFile]
skipped: list[str] # "path (reason)"
warnings: list[str]
```
- **Algoritmo:**
1. Validar que root existe y es directorio
2. `os.walk(root, topdown=True)` para recorrer
3. Podar directorios:
- Skip: dot dirs, symlinks, IGNORE_DIRS predefinidos (`__pycache__`, `node_modules`, `.git`, `venv`, etc.)
- Skip: dirs en `ignore_dirs` (por nombre o por path relativo)
4. Por cada archivo:
- Skip: dot files, symlinks, archivos vacios
- Aplicar filtros include/exclude (glob patterns, comma-separated)
- Clasificar: extension en supported_extensions → processable, sino → unsupported
5. Si strict=True y hay unsupported: raise error
6. Ordenar resultados
- **Logica de include/exclude:**
- include: comma-separated globs (ej: `"*.pdf,*.md"`), si vacio incluye todo
- exclude: comma-separated, pattern con `/` final es path prefix (ej: `"drafts/"`), sin `/` es glob de nombre (ej: `"*.tmp"`)
- **Deps:** `os`, `pathlib`, `fnmatch`, `dataclasses`
- **Error type:** FileNotFoundError, NotADirectoryError
- **Tests:** directorio con mezcla de archivos, directorio con dot files, directorio con subdirs ignorados, filtros include/exclude, modo strict
### 2. Constantes IGNORE_DIRS sugeridas
```python
IGNORE_DIRS = {
"__pycache__", "node_modules", ".git", ".svn", ".hg",
"venv", ".venv", "env", ".env", ".tox", ".nox",
".mypy_cache", ".pytest_cache", ".ruff_cache",
"dist", "build", ".next", ".nuxt",
"target", # Rust/Java
"vendor", # Go
}
```
@@ -0,0 +1,38 @@
# Circuit Breaker
Fuente conceptual: OpenViking `openviking/utils/circuit_breaker.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Clase a implementar
### CircuitBreaker
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (tiene estado mutable + usa time.monotonic)
- **Signature:**
```python
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, reset_timeout: float = 300.0): ...
def check(self) -> None: ... # raises CircuitBreakerOpen
def record_success(self) -> None: ...
def record_failure(self, error: Exception) -> None: ...
@property
def retry_after(self) -> float: ... # seconds until half-open, capped at 30s
```
- **Descripcion:** Patron circuit breaker thread-safe para proteger llamadas a APIs externas. Tres estados: CLOSED (normal), OPEN (bloqueando), HALF_OPEN (permitiendo 1 request de prueba).
- **Algoritmo:**
- **CLOSED:** Requests pasan normalmente. Si fallan `failure_threshold` veces consecutivas → OPEN. Errores permanentes (401, 403) abren inmediatamente.
- **OPEN:** Requests bloqueados con `CircuitBreakerOpen`. Despues de `reset_timeout` segundos → HALF_OPEN.
- **HALF_OPEN:** Permite 1 request de prueba. Si exito → CLOSED. Si falla → OPEN.
- **Thread safety:** Usar `threading.Lock` para proteger estado interno.
- **Clasificacion de errores:** Integrar con `classify_api_error()` (ver spec 09) para distinguir errores permanentes de transitorios.
- **Deps:** `threading`, `time`
- **Tests:**
- Transicion CLOSED → OPEN despues de N fallos
- Transicion OPEN → HALF_OPEN despues de timeout
- Transicion HALF_OPEN → CLOSED en exito
- Transicion HALF_OPEN → OPEN en fallo
- Error permanente abre inmediatamente
- Thread safety (concurrencia)
- retry_after retorna 0 cuando no esta OPEN
@@ -0,0 +1,57 @@
# Retry with Error Classification
Fuente conceptual: OpenViking `openviking/utils/model_retry.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. classify_api_error
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `classify_api_error(error: Exception) -> str`
- **Retorno:** `"permanent"` | `"transient"` | `"unknown"`
- **Descripcion:** Clasifica un error de API como permanente (no reintentar), transitorio (reintentar) o desconocido.
- **Algoritmo:**
1. Obtener textos: `str(error)` + `str(error.__cause__)` si existe
2. Buscar patrones permanentes: `"400"`, `"401"`, `"403"`, `"Forbidden"`, `"Unauthorized"`
3. Buscar patrones transitorios: `"429"`, `"500"`, `"502"`, `"503"`, `"504"`, `"TooManyRequests"`, `"RateLimit"`, `"timeout"`, `"Timeout"`, `"ConnectionError"`, `"Connection refused"`, `"Connection reset"`
4. Permanente tiene prioridad sobre transitorio
- **Deps:** ninguna
- **Tests:** error 429, error 401, error timeout, error desconocido, error con __cause__
### 2. compute_backoff_delay
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure (con jitter usa random pero es aceptable)
- **Signature:** `compute_backoff_delay(attempt: int, base_delay: float = 0.5, max_delay: float = 8.0, jitter: bool = True) -> float`
- **Descripcion:** Calcula el delay para exponential backoff con jitter opcional.
- **Algoritmo:**
1. `delay = min(base_delay * 2^attempt, max_delay)`
2. Si jitter: `delay += random.uniform(0, min(base_delay, delay))`
- **Deps:** `random`
- **Tests:** attempt 0 (base_delay), attempt alto (capped a max_delay), sin jitter (determinista)
### 3. retry_sync
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (ejecuta funcion, hace sleep)
- **Signature:** `retry_sync(func: Callable[[], T], max_retries: int, base_delay: float = 0.5, max_delay: float = 8.0, jitter: bool = True, is_retryable: Callable[[Exception], bool] | None = None) -> T`
- **Descripcion:** Reintenta una funcion sincrona en errores transitorios con exponential backoff.
- **Algoritmo:**
1. Loop: llamar func()
2. Si excepcion Y es retryable Y no agotamos intentos: dormir delay, incrementar attempt
3. Si no retryable o agotamos intentos: re-raise
- **Deps:** `time`
### 4. retry_async
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure
- **Signature:** `async retry_async(func: Callable[[], Awaitable[T]], max_retries: int, base_delay: float = 0.5, max_delay: float = 8.0, jitter: bool = True, is_retryable: Callable[[Exception], bool] | None = None) -> T`
- **Descripcion:** Version async de retry_sync. Usa `asyncio.sleep` en vez de `time.sleep`.
- **Deps:** `asyncio`
@@ -0,0 +1,41 @@
# Hotness Score
Fuente conceptual: OpenViking `openviking/retrieve/memory_lifecycle.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### hotness_score
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure (si se pasa `now` explicitamente)
- **Signature:** `hotness_score(active_count: int, updated_at: datetime | None, now: datetime | None = None, half_life_days: float = 7.0) -> float`
- **Retorno:** float en [0.0, 1.0]
- **Descripcion:** Calcula un score de "hotness" (calor/relevancia) combinando frecuencia de acceso y recencia. Util para ranking de resultados de busqueda, memoria hot/cold, cache eviction.
- **Formula:**
```
score = sigmoid(log1p(active_count)) * time_decay(updated_at)
```
- **Algoritmo:**
1. **Componente de frecuencia:** `freq = 1 / (1 + exp(-log1p(active_count)))` — sigmoid de log1p mapea conteos a (0,1)
2. **Componente de recencia:** Si updated_at es None → retornar 0.0
- Normalizar a UTC
- `age_days = max((now - updated_at).total_seconds() / 86400, 0)`
- `decay_rate = ln(2) / half_life_days`
- `recency = exp(-decay_rate * age_days)`
3. **Score final:** `freq * recency`
- **Propiedades:**
- active_count=0, cualquier fecha → ~0.5 * recency
- active_count alto, reciente → cercano a 1.0
- active_count alto, antiguo → cercano a 0.0
- updated_at=None → siempre 0.0
- half_life_days controla velocidad de decaimiento (7 = score se reduce a la mitad cada 7 dias)
- **Deps:** `math`, `datetime`
- **Tests:**
- active_count=0, updated_at reciente
- active_count=100, updated_at reciente (score alto)
- active_count=100, updated_at hace 30 dias (score bajo)
- updated_at=None (retorna 0.0)
- now explicito (determinista para tests)
- half_life_days custom
@@ -0,0 +1,48 @@
# Time Utils
Fuente conceptual: OpenViking `openviking/utils/time_utils.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. parse_iso_datetime
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `parse_iso_datetime(value: str) -> datetime`
- **Descripcion:** Parsea un datetime ISO 8601 tolerando >6 digitos en fracciones de segundo (Windows produce timestamps como `2026-02-21T13:20:23.1470042+08:00` donde los microsegundos exceden 6 digitos de Python).
- **Algoritmo:**
1. Regex: truncar fracciones de segundo a 6 digitos: `(\.\d{6})\d+``\1`
2. Reemplazar `Z` final por `+00:00`
3. `datetime.fromisoformat(normalized)`
- **Deps:** `re`, `datetime`
- **Tests:** ISO normal, ISO con Z, ISO con >6 digitos fraccion, ISO con timezone offset
### 2. format_iso8601
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `format_iso8601(dt: datetime) -> str`
- **Descripcion:** Formatea datetime a ISO 8601 UTC con milisegundos. Formato: `yyyy-MM-ddTHH:mm:ss.SSSZ`
- **Algoritmo:**
1. Si naive (sin tzinfo): asumir UTC
2. Si aware: convertir a UTC con `astimezone(timezone.utc)`
3. `dt.isoformat(timespec="milliseconds").replace("+00:00", "Z")`
- **Deps:** `datetime`
- **Tests:** datetime naive, datetime con timezone, datetime UTC
### 3. format_simplified
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `format_simplified(dt: datetime, now: datetime) -> str`
- **Descripcion:** Formato humano simplificado: si es del mismo dia muestra `HH:MM:SS`, si no muestra `YYYY-MM-DD`.
- **Algoritmo:**
1. Remover tzinfo para comparacion simple
2. Si `(now - dt).days < 1`: retornar `dt.strftime("%H:%M:%S")`
3. Si no: retornar `dt.strftime("%Y-%m-%d")`
- **Deps:** `datetime`
- **Tests:** mismo dia (formato hora), dia anterior (formato fecha), exactamente 24h
@@ -0,0 +1,43 @@
# Safe Extract ZIP
Fuente conceptual: OpenViking `openviking/utils/zip_safe.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. safe_extract_zip
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure (I/O: escribe archivos)
- **Signature:** `safe_extract_zip(zip_path: str, dest_dir: str) -> None`
- **Descripcion:** Extrae un archivo ZIP con proteccion contra Zip Slip (path traversal attack). Valida que cada archivo extraido quede dentro del directorio destino.
- **Algoritmo:**
1. Resolver dest_dir a path absoluto
2. Normalizar filenames del ZIP (reparar encoding UTF-8)
3. Para cada miembro del ZIP:
- Resolver path completo: `(dest_dir / member.filename).resolve()`
- Verificar que `str(member_path).startswith(str(dest_dir) + os.sep)`
- Si no: raise ValueError "Zip Slip attempt detected"
- Si si: extraer
- **Deps:** `zipfile`, `pathlib`, `os`
- **Error type:** ValueError (zip slip), zipfile.BadZipFile
- **Tests:** ZIP normal, ZIP con path traversal (`../../etc/passwd`), ZIP con paths absolutos
### 2. normalize_zip_filenames
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure (modifica objeto ZipFile in-place)
- **Signature:** `normalize_zip_filenames(zipf: zipfile.ZipFile) -> None`
- **Descripcion:** Repara nombres de archivos UTF-8 en ZIPs que no tienen el flag UTF-8 seteado. Comun en archivos creados en Windows con nombres CJK.
- **Algoritmo:**
1. Para cada miembro sin flag UTF-8 (bit 0x800):
- Intentar: encode como CP437 → decode como UTF-8
- Si el resultado tiene CJK y el original tiene mojibake: reparar
- Si no: dejar como esta
2. Si se reparo algun nombre: setear `zipf.metadata_encoding = "utf-8"`
- **Deteccion de CJK:** chars en rangos `\u3400-\u4dbf`, `\u4e00-\u9fff`, `\u3000-\u303f`, `\uff00-\uffef`
- **Deteccion de mojibake:** chars en rangos Greek (`\u0370-\u03ff`), Math (`\u2200-\u22ff`), Box Drawing (`\u2500-\u257f`)
- **Deps:** `zipfile`
- **Tests:** ZIP con nombres UTF-8 correctos (no cambiar), ZIP con nombres CJK mojibake (reparar)
@@ -0,0 +1,60 @@
# Envelope Encryption
Fuente conceptual: OpenViking `openviking/crypto/encryptor.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. envelope_encrypt
- **Dominio:** cybersecurity
- **Lang:** Python
- **Purity:** impure (genera random bytes)
- **Signature:** `envelope_encrypt(plaintext: bytes, master_key: bytes) -> bytes`
- **Descripcion:** Cifra datos usando patron Envelope Encryption. Genera una clave de archivo aleatoria (file key), cifra los datos con AES-256-GCM usando la file key, luego cifra la file key con la master key.
- **Algoritmo:**
1. Generar file_key aleatorio de 32 bytes
2. Generar data_iv de 12 bytes
3. Cifrar plaintext: `AES-GCM(file_key, data_iv, plaintext)` → encrypted_content
4. Generar key_iv de 12 bytes
5. Cifrar file_key: `AES-GCM(master_key, key_iv, file_key)` → encrypted_file_key
6. Construir envelope: magic + version + lengths + encrypted_file_key + key_iv + data_iv + encrypted_content
- **Formato del envelope:**
```
Magic (4B): b"OVE1" (identificador)
Version (1B): 0x01
Reserved (1B): 0x00
EFK_len (2B): big-endian, longitud de encrypted_file_key
KIV_len (2B): big-endian, longitud de key_iv
DIV_len (2B): big-endian, longitud de data_iv
--- header: 12 bytes total ---
Encrypted File Key (variable)
Key IV (variable)
Data IV (variable)
Encrypted Content (variable, incluye GCM auth tag)
```
- **Deps:** `cryptography` (AESGCM), `secrets`, `struct`
- **Error type:** Exception
- **Tests:** encrypt → decrypt roundtrip, datos vacios, datos grandes
### 2. envelope_decrypt
- **Dominio:** cybersecurity
- **Lang:** Python
- **Purity:** impure (puede fallar en autenticacion)
- **Signature:** `envelope_decrypt(ciphertext: bytes, master_key: bytes) -> bytes`
- **Descripcion:** Descifra datos cifrados con envelope_encrypt.
- **Algoritmo:**
1. Verificar magic bytes `b"OVE1"` al inicio — si no coincide, retornar datos sin cambiar (archivo no cifrado)
2. Parsear header: version, lengths
3. Extraer: encrypted_file_key, key_iv, data_iv, encrypted_content
4. Descifrar file_key: `AES-GCM_decrypt(master_key, key_iv, encrypted_file_key)`
5. Descifrar contenido: `AES-GCM_decrypt(file_key, data_iv, encrypted_content)`
- **Comportamiento especial:** Si ciphertext no empieza con magic, se asume que no esta cifrado y se retorna tal cual. Esto permite usar decrypt en archivos que pueden o no estar cifrados.
- **Deps:** `cryptography`, `struct`
- **Error type:** ValueError (envelope corrupto), AuthenticationError (key incorrecta)
- **Tests:** decrypt de datos cifrados, decrypt de datos no cifrados (passthrough), key incorrecta, envelope truncado, envelope con magic invalido
### 3. build_envelope / parse_envelope (helpers)
Pueden ser funciones internas puras usadas por encrypt/decrypt para construir y parsear el formato binario del envelope.
@@ -0,0 +1,51 @@
# Parse Code AST (multi-language)
Fuente conceptual: OpenViking `openviking/parse/parsers/code/ast/` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### parse_code_ast
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure (opera sobre strings)
- **Signature:** `parse_code_ast(source_code: str, language: str) -> list[CodeEntity]`
- **Tipo de retorno:**
```python
@dataclass
class CodeEntity:
kind: str # "function" | "class" | "method" | "interface" | "type" | "struct" | "trait"
name: str
start_line: int
end_line: int
signature: str # firma completa
docstring: str | None
children: list["CodeEntity"] # metodos dentro de clases
```
- **Descripcion:** Extrae entidades de codigo (funciones, clases, metodos, tipos) de codigo fuente usando tree-sitter. Soporta multiples lenguajes.
- **Lenguajes soportados:**
| Lenguaje | tree-sitter grammar | Entidades extraidas |
|----------|--------------------|--------------------|
| Python | tree-sitter-python | function_definition, class_definition, decorated_definition |
| JavaScript/TypeScript | tree-sitter-javascript/typescript | function_declaration, class_declaration, interface_declaration, type_alias_declaration |
| Go | tree-sitter-go | function_declaration, method_declaration, type_declaration |
| Rust | tree-sitter-rust | function_item, struct_item, impl_item, trait_item |
| Java | tree-sitter-java | class_declaration, method_declaration, interface_declaration |
| C++ | tree-sitter-cpp | function_definition, class_specifier, template_declaration |
- **Algoritmo:**
1. Seleccionar grammar de tree-sitter segun `language`
2. Parsear source_code → AST
3. Walk del AST buscando nodos relevantes segun el lenguaje
4. Para cada nodo: extraer nombre, lineas, firma, docstring
5. Para clases: recursion para extraer metodos como children
- **Deps:** `tree-sitter`, `tree-sitter-{language}` (uno por lenguaje)
- **Error type:** ValueError (lenguaje no soportado)
- **Notas:**
- tree-sitter >= 0.23 usa nuevo API con `Language.build_library()`
- Los grammars se instalan como paquetes pip: `pip install tree-sitter-python`
- La firma se extrae del texto del nodo hasta el body
- El docstring se extrae del primer string literal hijo (Python) o comentario previo (otros)
- **Tests:** codigo Python con funciones y clases, Go con funciones y tipos, TypeScript con interfaces, codigo vacio, codigo con errores de sintaxis (tree-sitter es tolerante)
@@ -0,0 +1,48 @@
# Git URL Parser
Fuente conceptual: OpenViking `openviking/utils/code_hosting_utils.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. parse_git_url
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `parse_git_url(url: str, known_hosts: list[str] | None = None) -> str | None`
- **Descripcion:** Parsea una URL de code hosting (GitHub, GitLab, etc.) y retorna el path `org/repo`. Soporta HTTP(S), SSH (git@), y git:// protocols.
- **Hosts conocidos por defecto:** `["github.com", "gitlab.com"]`
- **Algoritmo:**
1. Si empieza con `git@`: extraer host:path, split por `:`, tomar primeros 2 segmentos, remover `.git`
2. Si empieza con `http://`, `https://`, `git://`, `ssh://`: parsear con urlparse, verificar netloc en hosts, tomar primeros 2 segmentos del path
3. Sanitizar org y repo: solo `[a-zA-Z0-9_-]`
4. Retornar `org/repo` o None si no es valida
- **Deps:** `urllib.parse`
- **Tests:** URL HTTPS GitHub, URL SSH git@, URL con .git suffix, URL con path extra (issues/123), URL no reconocida
### 2. is_git_repo_url
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `is_git_repo_url(url: str, known_hosts: list[str] | None = None) -> bool`
- **Descripcion:** Verifica estrictamente si una URL apunta a un repositorio git clonable (no a un issue, PR, file, etc.).
- **Algoritmo:**
1. `git@`/`ssh://`/`git://`: si el dominio es conocido → True
2. `http://`/`https://`: dominio conocido Y exactamente 2 segmentos de path (org/repo) → True
3. Tambien acepta: org/repo/tree/<ref> (branch/commit)
4. No acepta: org/repo/issues/123, org/repo/blob/main/file.py
- **Deps:** `urllib.parse`
- **Tests:** URL repo valida, URL de issue (False), URL de blob/file (False), URL con tree/branch (True)
### 3. validate_git_ssh_uri
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `validate_git_ssh_uri(url: str) -> None`
- **Descripcion:** Valida formato de URI SSH de git (`git@host:path`). Raise ValueError si invalida.
- **Algoritmo:** Verificar que empieza con `git@`, contiene `:`, y tiene path no vacio despues del `:`
- **Deps:** ninguna
- **Tests:** URI valida, URI sin git@, URI sin colon, URI con path vacio
@@ -0,0 +1,26 @@
# Calculate Media Strategy
Fuente conceptual: OpenViking `openviking/parse/base.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### calculate_media_strategy
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `calculate_media_strategy(image_count: int, line_count: int) -> str`
- **Retorno:** `"full_page_vlm"` | `"extract"` | `"text_only"`
- **Descripcion:** Determina la estrategia optima de procesamiento de medios para un documento basado en la proporcion de imagenes vs texto.
- **Algoritmo:**
1. Si `line_count > 0` y (`image_count / line_count > 0.3` o `image_count >= 5`): → `"full_page_vlm"` (documento dominado por imagenes, usar vision-language model)
2. Si `image_count > 0`: → `"extract"` (pocas imagenes, extraer y procesar individualmente)
3. Si `image_count == 0`: → `"text_only"` (solo texto)
- **Deps:** ninguna
- **Tests:**
- 0 imagenes → text_only
- 2 imagenes, 100 lineas → extract (ratio bajo)
- 10 imagenes, 20 lineas → full_page_vlm (ratio > 0.3)
- 5 imagenes, 100 lineas → full_page_vlm (>= 5 imagenes)
- 0 lineas, cualquier imagenes → text_only (division por cero evitada)
@@ -0,0 +1,53 @@
# Parser Registry (patron extensible)
Fuente conceptual: OpenViking `openviking/parse/registry.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Clase a implementar
### ParserRegistry
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (mantiene estado mutable, singleton)
- **Descripcion:** Registry extensible que despacha parsing de archivos al parser correcto basado en extension. Patron plugin: registrar parsers por nombre y extensiones, resolver automaticamente.
- **Signature:**
```python
class ParserRegistry:
def __init__(self): ...
def register(self, name: str, parser: BaseParser) -> None: ...
def unregister(self, name: str) -> None: ...
def get_parser(self, name: str) -> BaseParser | None: ...
def get_parser_for_file(self, path: str) -> BaseParser | None: ...
async def parse(self, source: str, **kwargs) -> ParseResult: ...
def list_parsers(self) -> list[str]: ...
def list_supported_extensions(self) -> list[str]: ...
```
- **Protocolo BaseParser:**
```python
class BaseParser(ABC):
@property
@abstractmethod
def supported_extensions(self) -> list[str]: ...
@abstractmethod
async def parse(self, source: str | Path, **kwargs) -> ParseResult: ...
@abstractmethod
async def parse_content(self, content: str, source_path: str | None = None, **kwargs) -> ParseResult: ...
def can_parse(self, path: str) -> bool:
return Path(path).suffix.lower() in self.supported_extensions
```
- **Algoritmo de despacho (`parse()`):**
1. Si source es URL de repo de codigo → delegar a code parser
2. Si source parece path de archivo (corto, sin newlines) y existe:
- Si es directorio → delegar a directory parser
- Resolver parser por extension → delegar
- Si no hay parser → fallback a text parser
3. Si no es path → parsear como string de contenido con text parser
- **Extension map:** `{".md": "markdown", ".pdf": "pdf", ".html": "html", ...}` construido automaticamente al registrar parsers
- **Singleton:** `get_registry()` retorna instancia global lazy-initialized
- **Deps:** `pathlib`
- **Tests:** registrar parser custom, resolver por extension, parse file, parse string, unregister, extension desconocida (fallback a text)
- **Notas:** Este patron es util mas alla de parsing — cualquier sistema de plugins extensible por tipo de archivo puede usar este patron.
@@ -0,0 +1,22 @@
# Read File with Encoding Detection
Fuente conceptual: OpenViking `openviking/parse/parsers/base_parser.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### read_file_with_encoding
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure (I/O: lee archivo)
- **Signature:** `read_file_with_encoding(path: str, encodings: list[str] | None = None) -> str`
- **Descripcion:** Lee un archivo de texto intentando multiples encodings en orden hasta encontrar uno que funcione. Util para archivos de origen desconocido (Windows, Latin-1, etc.).
- **Algoritmo:**
1. Encodings por defecto: `["utf-8", "utf-8-sig", "latin-1", "cp1252"]`
2. Para cada encoding: intentar abrir y leer
3. Si UnicodeDecodeError: intentar siguiente
4. Si ninguno funciona: raise ValueError
- **Deps:** ninguna (stdlib)
- **Error type:** ValueError ("Unable to decode file")
- **Tests:** archivo UTF-8, archivo UTF-8 con BOM, archivo Latin-1, archivo binario (falla)
@@ -0,0 +1,77 @@
# Tipo: ParseResult / ResourceNode / NodeType
Fuente conceptual: OpenViking `openviking/parse/base.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. NodeType (enum)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
class NodeType(Enum):
ROOT = "root"
SECTION = "section"
```
- **Descripcion:** Tipos de nodo en un arbol de documento parseado. Solo ROOT y SECTION — todo el contenido detallado (parrafos, code blocks, tablas, listas) permanece como markdown dentro del campo content del nodo.
- **Notas:** Diseno intencionalmente simple. Evita decomposicion fina de nodos (no hay PARAGRAPH, CODE_BLOCK, TABLE, etc.) — el contenido se preserva en formato markdown.
### 2. ResourceNode (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class ResourceNode:
type: NodeType
title: str | None = None
level: int = 0 # 0=root, 1=top section, etc.
content: str = "" # markdown content de este nodo
children: list["ResourceNode"] = field(default_factory=list)
meta: dict[str, Any] = field(default_factory=dict)
```
- **Metodos:**
- `add_child(child: ResourceNode) -> None`
- `get_text(include_children: bool = True) -> str` — contenido concatenado recursivo
- `get_abstract(max_length: int = 256) -> str` — resumen corto (title o primeros N chars)
- `get_overview(max_length: int = 4000) -> str` — overview con lista de children
- `to_dict() -> dict` — serializacion recursiva
- `from_dict(data: dict) -> ResourceNode` — deserializacion (classmethod)
- **Descripcion:** Nodo en un arbol jerarquico de documento. Preserva la estructura natural del documento (secciones, subsecciones) sin perder contenido. Cada nodo puede tener hijos (subsecciones) y metadata.
- **Uso:** Resultado intermedio de parsing. Los parsers (markdown, pdf, html, docx) producen arboles de ResourceNode.
### 3. ParseResult (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class ParseResult:
root: ResourceNode
source_path: str | None = None
source_format: str | None = None # "pdf", "markdown", "html", "docx", etc.
parser_name: str | None = None # "PDFParser", "MarkdownParser", etc.
parser_version: str | None = None
parse_time: float | None = None # segundos
parse_timestamp: datetime | None = None
meta: dict[str, Any] = field(default_factory=dict)
warnings: list[str] = field(default_factory=list)
```
- **Metodos:**
- `success -> bool` (property) — True si no hay warnings
- `get_all_nodes() -> list[ResourceNode]` — flatten recursivo del arbol
- `get_sections(min_level=0, max_level=10) -> list[ResourceNode]` — filtrar secciones por nivel
- **Descripcion:** Resultado completo de parsear un documento. Contiene el arbol de nodos, metadata del proceso, y warnings. Es el tipo de retorno unificado de todos los parsers.
- **Helper:**
```python
def create_parse_result(root, source_path=None, source_format=None,
parser_name=None, parse_time=None, meta=None,
warnings=None) -> ParseResult
```
@@ -0,0 +1,41 @@
# Tipo: ClassifiedFile / DirectoryScanResult
Fuente conceptual: OpenViking `openviking/parse/directory_scan.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. ClassifiedFile (product)
- **Dominio:** infra
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class ClassifiedFile:
path: str # path absoluto
rel_path: str # relativo a root, siempre forward slashes
classification: str # "processable" | "unsupported"
```
- **Descripcion:** Un archivo con su clasificacion tras un scan de directorio. `rel_path` siempre usa forward slashes para consistencia cross-platform.
### 2. DirectoryScanResult (product)
- **Dominio:** infra
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class DirectoryScanResult:
root: str
processable: list[ClassifiedFile] = field(default_factory=list)
unsupported: list[ClassifiedFile] = field(default_factory=list)
skipped: list[str] = field(default_factory=list) # "path (reason)"
warnings: list[str] = field(default_factory=list)
```
- **Metodos:**
- `all_processable_files() -> list[ClassifiedFile]` — alias de processable, para API clara
- **Descripcion:** Resultado de un pre-scan de directorio. Clasifica todos los archivos en procesables vs no soportados, y reporta archivos/dirs que se skipearon (dot files, symlinks, dirs ignorados) con la razon del skip.
- **Uso:** Validacion previa antes de importar un directorio completo. Permite reportar archivos no soportados al usuario antes de procesar.
@@ -0,0 +1,40 @@
# Tipo: CodeEntity
Fuente conceptual: OpenViking `openviking/parse/parsers/code/ast/` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipo a implementar
### CodeEntity (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class CodeEntity:
kind: str # "function" | "class" | "method" | "interface" | "type" | "struct" | "trait"
name: str # nombre de la entidad
start_line: int # linea de inicio (1-based)
end_line: int # linea de fin (1-based)
signature: str # firma completa (hasta el body)
docstring: str | None # docstring/comment extraido
language: str # "python" | "go" | "typescript" | "rust" | "java" | "cpp"
children: list["CodeEntity"] = field(default_factory=list) # metodos dentro de clases
```
- **Metodos:**
- `line_count -> int` (property) — `end_line - start_line + 1`
- `has_docstring -> bool` (property)
- `to_dict() -> dict` — serializacion recursiva
- `from_dict(data: dict) -> CodeEntity` — deserializacion (classmethod)
- **Descripcion:** Entidad de codigo extraida por analisis AST (tree-sitter). Representa una funcion, clase, metodo, tipo, interfaz, struct o trait encontrado en codigo fuente. Las clases contienen sus metodos como children.
- **Uso:** Resultado de `parse_code_ast()`. Util para:
- Indexar codigo en un registry
- Generar skeletons/resumen de archivos de codigo
- Extraer funciones candidatas de repos externos
- Navegacion de codigo (jump to definition)
- **Notas:**
- `signature` incluye decoradores/annotations en Python, generic params en TypeScript/Go, visibility modifiers en Java/Rust
- `docstring` es el primer string literal hijo en Python, el comentario `///` o `/** */` previo en otros lenguajes
- `children` solo se usa para metodos dentro de clases/structs/traits/interfaces
@@ -0,0 +1,96 @@
# Tipo: Message / Part (multipart messages)
Fuente conceptual: OpenViking `openviking/message/` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. TextPart (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class TextPart:
text: str = ""
type: str = "text" # literal discriminator
```
### 2. ContextPart (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class ContextPart:
uri: str = "" # referencia al contexto
context_type: str = "memory" # "memory" | "resource" | "skill"
abstract: str = "" # resumen del contexto
type: str = "context" # literal discriminator
```
### 3. ToolPart (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class ToolPart:
tool_id: str = ""
tool_name: str = ""
tool_input: dict | None = None
tool_output: str = ""
tool_status: str = "pending" # "pending" | "running" | "completed" | "error"
duration_ms: float | None = None
prompt_tokens: int | None = None
completion_tokens: int | None = None
type: str = "tool" # literal discriminator
```
### 4. Part (sum type)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
Part = Union[TextPart, ContextPart, ToolPart]
def part_from_dict(data: dict) -> Part:
"""Deserializa un Part desde dict, usando el campo 'type' como discriminador."""
```
### 5. Message (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class Message:
id: str
role: str # "user" | "assistant"
parts: list[Part]
created_at: datetime | None = None
```
- **Metodos:**
- `content -> str` (property) — texto del primer TextPart
- `estimated_tokens -> int` (property) — estimacion ceil(total_chars / 4)
- `to_dict() -> dict` — serializacion a JSONL-compatible dict
- `from_dict(data: dict) -> Message` (classmethod) — deserializacion
- `create_user(content: str) -> Message` (classmethod) — factory para mensajes de usuario
- `create_assistant(content: str, context_refs=None, tool_calls=None) -> Message` (classmethod)
- `get_context_parts() -> list[ContextPart]`
- `get_tool_parts() -> list[ToolPart]`
- `find_tool_part(tool_id: str) -> ToolPart | None`
- `to_jsonl() -> str` — serializacion a string JSON
- **Descripcion:** Modelo de mensaje multipart para conversaciones con agentes. Soporta texto, referencias a contexto (memoria/recurso/skill) y llamadas a herramientas, todo en un mensaje unificado. Diseñado para ser serializable a JSONL para persistencia.
- **Estimacion de tokens:** Cuenta chars de text parts, abstracts de context parts, y tool inputs/outputs. Divide entre 4 (heuristica). Util para gestionar budgets de tokens en contexto de LLMs.
- **Uso:** Historial de conversacion, sesiones de agentes, logs de interaccion.
@@ -0,0 +1,144 @@
# Tipos: Retrieval (TypedQuery, MatchedContext, QueryResult, FindResult)
Fuente conceptual: OpenViking `openviking_cli/retrieve/types.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. ContextType (enum/sum)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
class ContextType(str, Enum):
MEMORY = "memory"
RESOURCE = "resource"
SKILL = "skill"
```
- **Descripcion:** Tipo de contexto para busqueda/retrieval. Permite filtrar resultados por categoria.
### 2. TypedQuery (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class TypedQuery:
query: str # texto de la query
context_type: ContextType | None # tipo de contexto objetivo
intent: str # descripcion de la intencion
priority: int = 3 # 1-5 (1 es mayor prioridad)
target_directories: list[str] = field(default_factory=list) # URIs de directorios objetivo
```
- **Descripcion:** Query tipada que resulta del analisis de intenciones. Un plan de busqueda se descompone en multiples TypedQueries, cada una apuntando a un tipo de contexto diferente.
### 3. QueryPlan (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class QueryPlan:
queries: list[TypedQuery]
session_context: str # resumen del contexto de sesion
reasoning: str # razonamiento del LLM
```
- **Descripcion:** Plan de busqueda generado por analisis de intenciones (LLM). Contiene multiples queries tipadas y el razonamiento detras de la descomposicion.
### 4. RelatedContext (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class RelatedContext:
uri: str
abstract: str
```
### 5. MatchedContext (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class MatchedContext:
uri: str
context_type: ContextType
level: int = 2 # 0=abstract, 1=overview, 2=detail
abstract: str = ""
overview: str | None = None
category: str = ""
score: float = 0.0 # 0-1 relevancia
match_reason: str = ""
relations: list[RelatedContext] = field(default_factory=list)
```
- **Descripcion:** Contexto encontrado por el retriever con su score de relevancia, nivel de detalle, y contextos relacionados.
### 6. ScoreDistribution (product)
- **Dominio:** datascience
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class ScoreDistribution:
scores: list[tuple[str, float]] # [(uri, score), ...] ordenados desc
min_score: float = 0.0
max_score: float = 0.0
mean_score: float = 0.0
threshold: float = 0.0
```
- **Metodos:**
- `from_scores(uri_scores, threshold=0.0) -> ScoreDistribution` (classmethod) — calcula stats
- `to_dict() -> dict` — incluye count y above_threshold
- **Descripcion:** Estadisticas de distribucion de scores para visualizacion y debugging de retrieval.
### 7. QueryResult (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class QueryResult:
query: TypedQuery
matched_contexts: list[MatchedContext]
searched_directories: list[str]
```
- **Descripcion:** Resultado de una sola TypedQuery con los contextos encontrados y los directorios que se buscaron.
### 8. FindResult (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class FindResult:
memories: list[MatchedContext]
resources: list[MatchedContext]
skills: list[MatchedContext]
query_plan: QueryPlan | None = None
query_results: list[QueryResult] | None = None
total: int = 0 # auto-calculado en __post_init__
```
- **Metodos:**
- `__iter__` — itera sobre todos los MatchedContext (memories + resources + skills)
- `__post_init__` — calcula total
- `to_dict(include_provenance=False) -> dict` — serializacion
- `from_dict(data: dict) -> FindResult` (classmethod) — deserializacion
- **Descripcion:** Resultado final de una busqueda completa, agrupado por tipo de contexto. Opcionalmente incluye provenance (traza de busqueda) para observabilidad.
@@ -0,0 +1,115 @@
# Tipos: Memory System (MemoryField, MemoryTypeSchema, MemoryData)
Fuente conceptual: OpenViking `openviking/session/memory/dataclass.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. FieldType (enum/sum)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
class FieldType(str, Enum):
STRING = "string"
INTEGER = "integer"
FLOAT = "float"
BOOLEAN = "boolean"
LIST = "list"
DICT = "dict"
DATETIME = "datetime"
```
### 2. MergeOp (enum/sum)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
class MergeOp(str, Enum):
PATCH = "patch" # SEARCH/REPLACE para strings, replace directo para otros
SUM = "sum" # suma numerica (contadores)
IMMUTABLE = "immutable" # no se puede modificar despues de crear
```
- **Descripcion:** Estrategia de merge cuando se actualiza un campo de memoria existente. PATCH aplica cambios parciales (search/replace en strings), SUM acumula valores numericos, IMMUTABLE protege contra modificacion.
### 3. MemoryField (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class MemoryField:
name: str # nombre del campo
field_type: FieldType # tipo de dato
description: str = "" # descripcion para el LLM
merge_op: MergeOp = MergeOp.PATCH # estrategia de merge
```
- **Descripcion:** Definicion de un campo dentro de un tipo de memoria. El merge_op determina como se actualizan valores existentes.
### 4. MemoryTypeSchema (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class MemoryTypeSchema:
memory_type: str # nombre del tipo (ej: "profile", "preferences")
description: str = "" # descripcion del tipo
fields: list[MemoryField] = field(default_factory=list)
filename_template: str = "" # template para nombre de archivo
directory: str = "" # directorio donde se almacenan
enabled: bool = True
operation_mode: str = "upsert" # "upsert" | "add_only" | "update_only"
```
- **Descripcion:** Schema de un tipo de memoria. Define la estructura de los datos, la estrategia de merge por campo, y como se organizan en el filesystem. Soporta 3 modos de operacion:
- `upsert`: crear o actualizar (default)
- `add_only`: solo crear, nunca actualizar existentes
- `update_only`: solo actualizar existentes, nunca crear nuevos
### 5. MemoryData (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class MemoryData:
memory_type: str # tipo de memoria
uri: str | None = None # URI para updates
fields: dict[str, Any] = field(default_factory=dict) # datos dinamicos
abstract: str | None = None # L0 resumen corto
overview: str | None = None # L1 resumen medio
content: str | None = None # L2 contenido completo
name: str | None = None
tags: list[str] = field(default_factory=list)
created_at: datetime | None = None
updated_at: datetime | None = None
```
- **Metodos:**
- `get_field(field_name: str) -> Any`
- `set_field(field_name: str, value: Any) -> None`
- **Descripcion:** Instancia concreta de un dato de memoria. Los campos son dinamicos (dict) y se validan contra el MemoryTypeSchema correspondiente. Soporta 3 niveles de contenido (L0 abstract, L1 overview, L2 detail) para retrieval jerarquico.
### 6. Categorias de memoria predefinidas
Categorias estandar que un sistema de memoria de agente deberia soportar:
| Categoria | Descripcion | Ejemplo |
|-----------|-------------|---------|
| profile | Informacion del usuario | nombre, rol, expertise |
| preferences | Preferencias | idioma, estilo de respuesta |
| entities | Entidades conocidas | personas, proyectos, tools |
| events | Eventos ocurridos | reuniones, deployments, incidentes |
| cases | Casos/precedentes | bugs resueltos, decisiones |
| patterns | Patrones observados | workflows, preferencias recurrentes |
| tools | Herramientas conocidas | comandos, APIs, shortcuts |
| skills | Habilidades/capacidades | del agente o del usuario |
@@ -0,0 +1,69 @@
# Tipos: Context (modelo unificado de contexto)
Fuente conceptual: OpenViking `openviking/core/context.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. ResourceContentType (enum/sum)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
class ResourceContentType(str, Enum):
TEXT = "text"
IMAGE = "image"
VIDEO = "video"
AUDIO = "audio"
BINARY = "binary"
```
### 2. ContextLevel (enum/sum)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
class ContextLevel(int, Enum):
ABSTRACT = 0 # L0: resumen corto (~256 chars)
OVERVIEW = 1 # L1: overview (~4000 chars)
DETAIL = 2 # L2: contenido completo
```
- **Descripcion:** Nivel de detalle de un contexto para indexacion vectorial y retrieval jerarquico. El retriever primero busca en L0 (rapido, barato), luego refina con L1/L2.
### 3. Context (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class Context:
id: str # UUID
uri: str # identificador unico del recurso
parent_uri: str | None = None # URI del padre (directorio)
is_leaf: bool = False # True si es nodo hoja (archivo)
abstract: str = "" # L0 resumen
context_type: str = "resource" # "memory" | "resource" | "skill"
category: str = "" # subcategoria
level: int | None = None # ContextLevel
created_at: datetime | None = None
updated_at: datetime | None = None
active_count: int = 0 # veces accedido (para hotness)
related_uri: list[str] = field(default_factory=list) # URIs relacionados
meta: dict[str, Any] = field(default_factory=dict)
```
- **Metodos:**
- `to_dict() -> dict` — serializacion
- `from_dict(data: dict) -> Context` (classmethod) — deserializacion
- **Descripcion:** Modelo unificado de contexto que representa cualquier recurso (documento, memoria, skill) indexable y buscable. Combina metadata, jerarquia (parent/children via URI), conteo de accesos (para hotness scoring), y relaciones.
- **Uso:** Almacenado en vector DB con su embedding. El retriever busca por similitud vectorial y luego usa `active_count` + `updated_at` para hotness scoring.
- **Notas:**
- `uri` sigue un esquema tipo filesystem: `viking://resources/docs/readme.md`
- `parent_uri` permite reconstruir la jerarquia de directorios
- `active_count` se incrementa cada vez que el contexto se retrieva — usado por `hotness_score()`
- `related_uri` permite grafos de relaciones entre contextos
@@ -0,0 +1,86 @@
# Validacion y Schemas
Gap identificado en el registry. Reimplementar desde cero.
No hay funciones de validacion de datos estructurados. Necesarias para el pipeline
de ingesta (validar documentos parseados) y para assertions mas ricas en operations.db.
## Funciones a implementar
### validate_json_schema
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `validate_json_schema(data: dict, schema: dict) -> tuple[bool, list[str]]`
- **Retorno:** (valid, errors) — True y lista vacia si cumple, False y lista de errores si no
- **Descripcion:** Valida un dict contra un schema JSON Schema (draft 2020-12) sin dependencias externas. Soporta types, required, properties, items, minimum/maximum, minLength/maxLength, pattern, enum. No pretende cubrir todo el spec — solo lo practico.
- **Algoritmo:**
1. Validar type (string, number, integer, boolean, array, object, null)
2. Para object: validar required, iterar properties recursivamente
3. Para array: validar items recursivamente, minItems/maxItems
4. Para string: pattern (re.match), minLength/maxLength, enum
5. Para number/integer: minimum/maximum, exclusiveMinimum/exclusiveMaximum
6. Acumular errores con path (ej: "$.address.zip: expected string, got int")
- **Deps:** `re` (solo stdlib)
- **Tests:**
- Schema simple con types y required
- Nested object validation
- Array con items schema
- Pattern validation en strings
- Numeric ranges
- Multiples errores acumulados con paths correctos
- Schema vacio (acepta todo)
- Data None contra required field
### validate_struct_fields
- **Dominio:** core
- **Lang:** Go
- **Purity:** pure
- **Signature:** `ValidateStructFields(data map[string]any, rules map[string]string) (bool, []string)`
- **Retorno:** (valid, errors)
- **Descripcion:** Valida campos de un map contra reglas declarativas tipo `"required,min=1,max=100,type=string"`. Pensado para validar metadata de entities en operations.db o resultados de queries sin definir structs Go.
- **Reglas soportadas:**
- `required` — campo debe existir y no ser nil/""
- `type=string|int|float|bool` — validar tipo Go subyacente
- `min=N`, `max=N` — para numericos
- `minlen=N`, `maxlen=N` — para strings
- `oneof=a|b|c` — valor debe ser uno de los listados
- `pattern=regex` — para strings
- **Deps:** `regexp`, `strconv`, `strings` (solo stdlib)
- **Tests:**
- Campo required presente y ausente
- Type validation (string como int falla)
- Numeric ranges
- String lengths
- Oneof validation
- Pattern matching
- Multiples reglas combinadas
- Map vacio con reglas required
### coerce_types
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `coerce_types(data: dict, schema: dict[str, str]) -> tuple[dict, list[str]]`
- **Retorno:** (coerced_data, warnings) — nuevo dict con tipos corregidos, warnings para coerciones lossy
- **Descripcion:** Convierte valores de un dict a los tipos esperados. Schema es `{"field": "int|float|str|bool|datetime|list[str]"}`. Util para normalizar datos que vienen como strings de CSV, JSON, o query params antes de insertarlos en operations.db.
- **Coerciones:**
- str → int: `int(v)`, warning si tiene decimales
- str → float: `float(v)`
- str → bool: "true/1/yes" → True, "false/0/no" → False
- str → datetime: ISO 8601 parse
- str → list[str]: split por "," y strip
- Valor ya del tipo correcto → pass through
- Coercion imposible → mantener original + warning
- **Deps:** `datetime` (solo stdlib)
- **Tests:**
- String "42" → int 42
- String "3.14" → float 3.14
- String "true" → bool True
- String "2024-01-15T10:30:00Z" → datetime
- Coercion fallida genera warning sin crash
- Dict con mix de tipos ya correctos y strings
- Campo ausente en schema → pass through sin tocar
@@ -0,0 +1,122 @@
# Transformacion de datos tabulares
Gap identificado en el registry. Reimplementar desde cero.
Hay estadistica y rolling windows pero no transformaciones tabulares puras.
Necesarias para pipelines Go/Python que procesan CSVs o resultados de queries
sin depender de pandas.
Datos tabulares = `list[dict]` en Python, `[]map[string]any` en Go.
## Funciones a implementar
### pivot
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `pivot(rows: list[dict], index: str, columns: str, values: str, agg: str = "sum") -> list[dict]`
- **Retorno:** Lista de dicts donde cada dict tiene el campo index + un campo por cada valor unico de columns
- **Descripcion:** Pivot table sin pandas. Agrupa por `index`, expande valores unicos de `columns` como nuevas columnas, agrega `values` con la funcion indicada.
- **Agregaciones:** sum, count, mean, min, max, first, last
- **Ejemplo:**
```python
rows = [
{"region": "US", "product": "A", "sales": 10},
{"region": "US", "product": "B", "sales": 20},
{"region": "EU", "product": "A", "sales": 15},
]
pivot(rows, index="region", columns="product", values="sales")
# [{"region": "US", "A": 10, "B": 20}, {"region": "EU", "A": 15, "B": 0}]
```
- **Deps:** ninguna (solo stdlib)
- **Tests:**
- Pivot basico con sum
- Pivot con count y mean
- Valores faltantes rellenados con 0 (numericos) o None (strings)
- Una sola fila
- Multiples valores por celda (requiere agregacion)
### melt
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `melt(rows: list[dict], id_vars: list[str], value_vars: list[str], var_name: str = "variable", value_name: str = "value") -> list[dict]`
- **Retorno:** Lista de dicts en formato largo (unpivoted)
- **Descripcion:** Inversa de pivot — convierte columnas en filas. Cada combinacion de id_vars + value_var genera una fila.
- **Ejemplo:**
```python
rows = [{"region": "US", "q1": 10, "q2": 20}]
melt(rows, id_vars=["region"], value_vars=["q1", "q2"])
# [{"region": "US", "variable": "q1", "value": 10},
# {"region": "US", "variable": "q2", "value": 20}]
```
- **Deps:** ninguna
- **Tests:**
- Melt basico
- Multiples id_vars
- value_vars con None → melt todas las columnas no-id
- Fila con campo faltante en value_vars
### join_by_key
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `join_by_key(left: list[dict], right: list[dict], key: str, how: str = "inner") -> list[dict]`
- **Retorno:** Lista de dicts con campos de ambos lados mergeados
- **Descripcion:** Join de dos tablas por una clave comun. Soporta inner, left, right, outer. Campos duplicados del right se sufijan con `_right`.
- **Algoritmo:**
1. Indexar right por key en un dict (O(n))
2. Iterar left, buscar match en index (O(m))
3. Merge de campos segun tipo de join
- **Deps:** ninguna
- **Tests:**
- Inner join (solo matches)
- Left join (todos los left, None para right sin match)
- Right join
- Outer join
- Campos duplicados con sufijo
- Key ausente en alguna fila
### aggregate_by_group
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `aggregate_by_group(rows: list[dict], group_by: list[str], aggs: dict[str, str]) -> list[dict]`
- **Retorno:** Lista de dicts agrupados con valores agregados
- **Descripcion:** GROUP BY + agregaciones sobre datos tabulares. `aggs` es `{"column": "sum|mean|count|min|max|first|last|collect"}`. `collect` acumula valores en lista.
- **Ejemplo:**
```python
rows = [{"dept": "eng", "salary": 100}, {"dept": "eng", "salary": 120}, {"dept": "sales", "salary": 80}]
aggregate_by_group(rows, group_by=["dept"], aggs={"salary": "mean"})
# [{"dept": "eng", "salary": 110.0}, {"dept": "sales", "salary": 80.0}]
```
- **Deps:** ninguna
- **Tests:**
- Group by una columna con sum
- Group by multiples columnas
- Agregacion mean, count, min, max
- collect acumula en lista
- Grupo con una sola fila
- Campo con None (ignorar en agregaciones numericas)
### pivot (Go)
- **Dominio:** datascience
- **Lang:** Go
- **Purity:** pure
- **Signature:** `Pivot(rows []map[string]any, index, columns, values, agg string) []map[string]any`
- **Descripcion:** Misma semantica que la version Python. Go no tiene generics suficientes para hacerlo elegante, pero con `map[string]any` y type assertions funciona para datos deserializados de JSON/SQL.
- **Tests:** Mismos casos que Python
### join_by_key (Go)
- **Dominio:** core
- **Lang:** Go
- **Purity:** pure
- **Signature:** `JoinByKey(left, right []map[string]any, key, how string) []map[string]any`
- **Descripcion:** Misma semantica que la version Python.
- **Tests:** Mismos casos que Python
@@ -0,0 +1,118 @@
# Serializacion y formato de salida
Gap identificado en el registry. Reimplementar desde cero.
Los pipelines terminan en operations.db pero no hay funciones para exportar
datos a formatos consumibles (CSV, JSONL, HTML, templates).
## Funciones a implementar
### to_csv
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `to_csv(rows: list[dict], columns: list[str] | None = None, delimiter: str = ",", include_header: bool = True) -> str`
- **Retorno:** String CSV completo
- **Descripcion:** Serializa datos tabulares a CSV. Si columns es None, usa las keys de la primera fila. Escapa campos con comillas, newlines y delimiters correctamente (RFC 4180).
- **Deps:** ninguna (no usar csv module para mantener control total)
- **Tests:**
- Lista simple → CSV con header
- Campos con comas, comillas, newlines (escaping correcto)
- columns explicitas (reordena y filtra)
- include_header=False
- Lista vacia → string vacio
- Valores None → campo vacio
### from_csv
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `from_csv(text: str, delimiter: str = ",", has_header: bool = True) -> list[dict]`
- **Retorno:** Lista de dicts con keys del header
- **Descripcion:** Parser CSV a datos tabulares. Complemento de to_csv. Si has_header=False, genera keys col_0, col_1, etc.
- **Deps:** ninguna
- **Tests:**
- CSV simple con header
- Campos con escaping (comillas, comas internas)
- Sin header (keys generadas)
- Lineas vacias ignoradas
- Un solo campo por fila
### to_jsonl
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `to_jsonl(rows: list[dict]) -> str`
- **Retorno:** String con un JSON por linea
- **Descripcion:** Serializa a JSON Lines (newline-delimited JSON). Cada dict se serializa como una linea JSON independiente. Util para streaming, logging estructurado, y formatos de intercambio.
- **Deps:** `json` (stdlib)
- **Tests:**
- Lista de dicts → JSONL
- Valores con unicode, None, nested dicts
- Lista vacia → string vacio
- Cada linea es JSON parseable independientemente
### from_jsonl
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `from_jsonl(text: str) -> list[dict]`
- **Retorno:** Lista de dicts
- **Descripcion:** Parser JSONL a lista de dicts. Ignora lineas vacias. Complemento de to_jsonl.
- **Deps:** `json`
- **Tests:**
- JSONL valido
- Lineas vacias intercaladas
- Linea con JSON invalido → raise con numero de linea
### render_template
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `render_template(template: str, context: dict, missing: str = "") -> str`
- **Retorno:** String con placeholders reemplazados
- **Descripcion:** Motor de templates minimalista sin dependencias. Soporta `{{variable}}`, `{{obj.field}}` (dot access en dicts anidados), `{% for item in list %}...{% endfor %}`, `{% if cond %}...{% endif %}`. No pretende reemplazar Jinja — cubre el 80% de casos simples.
- **Sintaxis:**
- `{{var}}` — sustitucion simple, HTML-escaped por defecto
- `{{{var}}}` — sustitucion sin escape (raw)
- `{{obj.key.subkey}}` — dot-path traversal en dicts
- `{% for x in items %}{{x}}{% endfor %}` — iteracion
- `{% if var %}...{% endif %}` — condicional (truthy check)
- `{% if not var %}...{% endif %}` — negacion
- **Deps:** `re` (solo stdlib)
- **Tests:**
- Sustitucion simple
- Dot-path en nested dicts
- For loop con lista de strings
- For loop con lista de dicts
- If/endif condicional
- Variable missing → string vacio (configurable)
- HTML escaping por defecto
- Triple braces sin escape
### generate_html_report
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `generate_html_report(title: str, sections: list[dict]) -> str`
- **Retorno:** String HTML completo (con DOCTYPE, head, body)
- **Descripcion:** Genera un reporte HTML autocontenido con CSS inline. Cada section es `{"heading": str, "type": "table"|"text"|"kpi"|"list", "data": ...}`. Para exportar resultados de pipelines o assertions a algo compartible sin servidor.
- **Tipos de seccion:**
- `table`: data es `list[dict]` → renderiza <table> con headers
- `text`: data es `str` → renderiza <p> con markdown basico (bold, links)
- `kpi`: data es `list[{"label": str, "value": str|number, "delta": str|None}]` → cards
- `list`: data es `list[str]` → renderiza <ul>
- **CSS:** Inline en <style>, tema minimalista con max-width, font-family sans-serif, tabla con zebra stripes
- **Deps:** ninguna
- **Tests:**
- Reporte con una tabla
- Reporte con multiples secciones mixtas
- KPI con deltas positivos y negativos
- Caracteres especiales HTML escapados en data
- Titulo con caracteres especiales
@@ -0,0 +1,96 @@
# HTTP Client primitivas
Gap identificado en el registry. Reimplementar desde cero.
Hay fetch_ohlcv y stream_ticks pero no un HTTP client generico reutilizable.
Necesario para integraciones con APIs externas sin reimplementar retry/headers cada vez.
## Funciones a implementar
### http_get_json
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure (I/O de red)
- **Signature:** `http_get_json(url: str, headers: dict[str, str] | None = None, params: dict[str, str] | None = None, timeout: float = 30.0) -> dict`
- **Retorno:** Response body parseado como dict/list
- **Descripcion:** GET request que espera JSON. Agrega `Accept: application/json` automaticamente. Lanza error descriptivo si status >= 400 (incluye status code, url truncada, y primeros 200 chars del body).
- **Deps:** `urllib.request`, `urllib.parse`, `json` (solo stdlib, sin requests)
- **Tests:**
- Mock de respuesta 200 con JSON
- Mock de respuesta 404 → error con status code
- Mock de respuesta con JSON invalido → error descriptivo
- Params serializados como query string
- Headers custom enviados
### http_post_json
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure
- **Signature:** `http_post_json(url: str, body: dict, headers: dict[str, str] | None = None, timeout: float = 30.0) -> dict`
- **Retorno:** Response body parseado como dict/list
- **Descripcion:** POST request con body JSON. Agrega `Content-Type: application/json` y `Accept: application/json`. Misma politica de errores que http_get_json.
- **Deps:** `urllib.request`, `json` (solo stdlib)
- **Tests:**
- Mock de POST con body serializado correctamente
- Mock de respuesta 201
- Mock de respuesta 500 → error
- Body con unicode
### http_download_file
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure
- **Signature:** `http_download_file(url: str, dest_path: str, headers: dict[str, str] | None = None, timeout: float = 120.0, chunk_size: int = 8192) -> dict`
- **Retorno:** `{"path": str, "size_bytes": int, "content_type": str}`
- **Descripcion:** Descarga un archivo por HTTP en streaming (sin cargar todo en memoria). Crea directorios intermedios si no existen. Si el archivo destino ya existe, lo sobreescribe.
- **Deps:** `urllib.request`, `os` (solo stdlib)
- **Tests:**
- Mock de descarga con contenido binario
- Directorio destino creado automaticamente
- Retorno con size correcto
- Timeout configurado en el request
### http_get_json (Go)
- **Dominio:** infra
- **Lang:** Go
- **Purity:** impure
- **Signature:** `HttpGetJSON(url string, headers map[string]string, timeout time.Duration) (map[string]any, error)`
- **Descripcion:** Misma semantica que Python. Usa `net/http` + `encoding/json`. Retorna error con status code si >= 400. Cierra body siempre (defer).
- **Deps:** `net/http`, `encoding/json`, `time` (solo stdlib)
- **Tests:**
- httptest.Server con respuesta JSON
- Status 404 → error
- Timeout → error
- Headers custom
### http_post_json (Go)
- **Dominio:** infra
- **Lang:** Go
- **Purity:** impure
- **Signature:** `HttpPostJSON(url string, body any, headers map[string]string, timeout time.Duration) (map[string]any, error)`
- **Descripcion:** Misma semantica que Python. Serializa body con json.Marshal, POST con Content-Type application/json.
- **Deps:** `net/http`, `encoding/json`, `bytes`, `time`
- **Tests:**
- httptest.Server recibe body correcto
- Status 201 → exito
- Status 500 → error con body parcial
### http_download_file (Go)
- **Dominio:** infra
- **Lang:** Go
- **Purity:** impure
- **Signature:** `HttpDownloadFile(url, destPath string, headers map[string]string, timeout time.Duration) (int64, error)`
- **Retorno:** bytes escritos, error
- **Descripcion:** Descarga en streaming con io.Copy. Crea directorios con os.MkdirAll. Usa archivo temporal + rename para atomicidad.
- **Deps:** `net/http`, `os`, `io`, `path/filepath`
- **Tests:**
- httptest.Server sirve archivo binario
- Directorio creado automaticamente
- Archivo temporal + rename (no deja basura si falla)
- Size retornado coincide
@@ -0,0 +1,104 @@
# Scheduling / Cron
Gap identificado en el registry. Reimplementar desde cero.
Los pipelines se lanzan manualmente o desde la TUI. No hay scheduling recurrente
dentro del registry. Complementario a Dagu — estas funciones son primitivas
componibles, no un scheduler completo.
## Funciones a implementar
### parse_cron_expr
- **Dominio:** core
- **Lang:** Go
- **Purity:** pure
- **Signature:** `ParseCronExpr(expr string) (CronSchedule, error)`
- **Retorno:** CronSchedule con campos Minute, Hour, DayOfMonth, Month, DayOfWeek parseados
- **Descripcion:** Parser de expresiones cron estandar (5 campos). Soporta `*`, rangos (`1-5`), listas (`1,3,5`), pasos (`*/15`), y aliases (@hourly, @daily, @weekly, @monthly). No soporta seconds ni years (no es Quartz).
- **Algoritmo:**
1. Check aliases (@hourly → "0 * * * *", etc)
2. Split por espacios, validar 5 campos
3. Por campo: expandir `*` a rango completo, parsear rangos/listas/pasos
4. Validar limites (minute 0-59, hour 0-23, etc)
- **Deps:** `strings`, `strconv`, `fmt` (solo stdlib)
- **Tests:**
- "*/15 * * * *" → minutos [0,15,30,45]
- "0 9 * * 1-5" → 9am weekdays
- "@daily" → "0 0 * * *"
- "0 9 1,15 * *" → dia 1 y 15 a las 9
- Expresion invalida → error descriptivo
- Campo fuera de rango → error
### next_cron_time
- **Dominio:** core
- **Lang:** Go
- **Purity:** pure
- **Signature:** `NextCronTime(schedule CronSchedule, after time.Time) time.Time`
- **Retorno:** Proximo time.Time que cumple el schedule despues de `after`
- **Descripcion:** Calcula la proxima ejecucion de un cron schedule. Avanza minuto a minuto desde `after` hasta encontrar un match. Limit de 366 dias para evitar loops infinitos en schedules imposibles.
- **Algoritmo:**
1. Truncar `after` al minuto, avanzar 1 minuto
2. Check month → si no match, avanzar al primer dia del proximo mes valido
3. Check day of month y day of week → si no match, avanzar al proximo dia
4. Check hour → si no match, avanzar a la proxima hora
5. Check minute → si no match, avanzar al proximo minuto
- **Deps:** `time`
- **Tests:**
- "0 * * * *" desde 14:30 → 15:00
- "0 9 * * 1" desde viernes → proximo lunes 9:00
- "0 0 29 2 *" → proximo 29 de febrero
- Schedule imposible → panic o error despues de limit
### cron_ticker
- **Dominio:** infra
- **Lang:** Go
- **Purity:** impure (goroutine + channel + time)
- **Signature:** `CronTicker(schedule CronSchedule, ctx context.Context) <-chan time.Time`
- **Retorno:** Channel que emite un time.Time en cada tick del schedule
- **Descripcion:** Crea un ticker que emite en los momentos definidos por el cron schedule. Usa time.NewTimer internamente, recalculando el proximo tick despues de cada emision. Se detiene al cancelar el context.
- **Deps:** `time`, `context`
- **Tests:**
- Ticker con schedule "* * * * *" emite cada minuto (test con clock mock o schedule rapido)
- Context cancel detiene el ticker
- Channel se cierra al cancelar
### parse_cron_expr (Python)
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `parse_cron_expr(expr: str) -> dict`
- **Retorno:** `{"minute": list[int], "hour": list[int], "day_of_month": list[int], "month": list[int], "day_of_week": list[int]}`
- **Descripcion:** Misma semantica que Go. Dict en vez de struct.
- **Tests:** Mismos casos que Go
### next_cron_time (Python)
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `next_cron_time(schedule: dict, after: datetime) -> datetime`
- **Descripcion:** Misma semantica que Go.
- **Tests:** Mismos casos que Go
## Tipo a implementar
### CronSchedule (Go)
- **Dominio:** core
- **Lang:** Go
- **Algebraic:** product
- **Definicion:**
```go
type CronSchedule struct {
Minute []int
Hour []int
DayOfMonth []int
Month []int
DayOfWeek []int
Raw string // expresion original
}
```
@@ -0,0 +1,124 @@
# Cache / Memoizacion persistente
Gap identificado en el registry. Reimplementar desde cero.
memoize_go_core es in-memory y se pierde entre ejecuciones. Para pipelines que
corren repetidamente (especialmente con llamadas LLM costosas), un cache
persistente con TTL evita recalculos.
## Funciones a implementar
### cache_to_sqlite
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure (I/O disco)
- **Signature:** `cache_to_sqlite(db_path: str, namespace: str = "default") -> CacheStore`
- **Retorno:** Objeto CacheStore con metodos get/set/delete/clear/stats
- **Descripcion:** Cache key-value persistido en SQLite. Cada namespace es una tabla logica (filtro en la query, no tabla separada). Keys son strings, values se serializan con json. TTL en segundos, 0 = sin expiracion. Limpieza de expirados en cada get (lazy eviction).
- **Schema SQLite:**
```sql
CREATE TABLE IF NOT EXISTS cache (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL, -- JSON serializado
created_at REAL NOT NULL, -- time.time()
expires_at REAL, -- NULL = no expira
PRIMARY KEY (namespace, key)
);
```
- **API del CacheStore:**
```python
class CacheStore:
def get(self, key: str) -> any | None: ... # None si miss o expirado
def set(self, key: str, value: any, ttl: float = 0) -> None: ...
def delete(self, key: str) -> bool: ...
def clear(self) -> int: ... # retorna filas eliminadas
def stats(self) -> dict: ... # {"hits": N, "misses": N, "size": N}
def get_or_set(self, key: str, factory: callable, ttl: float = 0) -> any: ...
```
- **Deps:** `sqlite3`, `json`, `time` (solo stdlib)
- **Tests:**
- Set y get basico
- TTL expirado → None
- TTL 0 → nunca expira
- get_or_set con factory que solo se llama en miss
- Namespaces independientes
- Clear elimina solo el namespace
- Stats contadores correctos
- Concurrencia (threading basico)
### cache_to_file
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure
- **Signature:** `cache_to_file(cache_dir: str, namespace: str = "default") -> FileCache`
- **Retorno:** Objeto FileCache con misma API que CacheStore
- **Descripcion:** Cache key-value donde cada entry es un archivo JSON en disco. Mas simple que SQLite, mejor para valores grandes (PDFs procesados, embeddings). Key se hashea con SHA-256 para nombre de archivo. Metadata (ttl, created_at) en un sidecar `.meta` file.
- **Estructura en disco:**
```
cache_dir/
namespace/
{sha256_key}.json # valor
{sha256_key}.meta # {"created_at": ..., "expires_at": ..., "original_key": ...}
```
- **API:** Misma interfaz que CacheStore (get, set, delete, clear, stats, get_or_set)
- **Deps:** `os`, `json`, `hashlib`, `time` (solo stdlib)
- **Tests:**
- Set y get basico
- TTL expirado → None
- Archivo .meta con metadata correcta
- Clear elimina el directorio del namespace
- Key con caracteres especiales → hash seguro
### cache_decorator
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (usa cache store)
- **Signature:** `cache_decorator(store: CacheStore | FileCache, ttl: float = 0, key_fn: callable | None = None)`
- **Retorno:** Decorator que cachea resultados de funciones
- **Descripcion:** Decorator que cachea el resultado de una funcion en un store persistente. Por defecto, la key se genera hasheando `(func.__name__, args, sorted(kwargs))`. `key_fn` permite custom key generation.
- **Uso:**
```python
store = cache_to_sqlite("cache.db")
@cache_decorator(store, ttl=3600)
def call_llm(prompt: str) -> str:
... # llamada costosa
result = call_llm("explain X") # primera vez: llama LLM
result = call_llm("explain X") # segunda vez: desde cache
```
- **Deps:** `functools`, `json`, `hashlib`
- **Tests:**
- Funcion llamada una vez, segunda vez desde cache
- TTL expirado → llama de nuevo
- key_fn custom
- Argumentos distintos → keys distintas
- Funciona con async (detect asyncio.iscoroutinefunction)
### cache_to_sqlite (Go)
- **Dominio:** infra
- **Lang:** Go
- **Purity:** impure
- **Signature:** `CacheToSQLite(dbPath, namespace string) (*SQLiteCache, error)`
- **Retorno:** *SQLiteCache con metodos Get/Set/Delete/Clear
- **Descripcion:** Misma semantica que Python. Valores almacenados como JSON string. Get retorna `([]byte, bool)` — el caller deserializa.
- **API:**
```go
type SQLiteCache struct { ... }
func (c *SQLiteCache) Get(key string) ([]byte, bool) { ... }
func (c *SQLiteCache) Set(key string, value []byte, ttl time.Duration) error { ... }
func (c *SQLiteCache) Delete(key string) error { ... }
func (c *SQLiteCache) Clear() (int64, error) { ... }
func (c *SQLiteCache) GetOrSet(key string, factory func() ([]byte, error), ttl time.Duration) ([]byte, error) { ... }
```
- **Deps:** `database/sql`, `encoding/json`, `time`, `crypto/sha256`
- **Tests:**
- Set/Get basico
- TTL expirado
- GetOrSet con factory
- Concurrencia (goroutines)
@@ -0,0 +1,122 @@
# Diff / Merge para operations.db y grafos
Gap identificado en el registry. Reimplementar desde cero.
Cuando un pipeline corre multiples veces, no hay forma de comparar resultados
entre ejecuciones. Necesario para detectar drift, validar estabilidad, y
mergear grafos de conocimiento de distintas fuentes.
## Funciones a implementar
### diff_entities
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `diff_entities(before: list[dict], after: list[dict], key: str = "id") -> dict`
- **Retorno:**
```python
{
"added": list[dict], # entities nuevas (key en after, no en before)
"removed": list[dict], # entities eliminadas (key en before, no en after)
"modified": list[dict], # entities con cambios: {"key": str, "changes": {"field": {"old": ..., "new": ...}}}
"unchanged": int, # count de entities sin cambios
"summary": str # "3 added, 1 removed, 5 modified, 42 unchanged"
}
```
- **Descripcion:** Compara dos snapshots de entities (tipicamente de dos ejecuciones del mismo pipeline). Detecta añadidas, eliminadas, y modificadas con detalle campo a campo. Ignora campos de metadata temporal (created_at, updated_at) por defecto.
- **Parametros opcionales:**
- `ignore_fields: list[str] = ["created_at", "updated_at"]` — campos a excluir de la comparacion
- `compare_fields: list[str] | None = None` — si se da, solo compara estos campos
- **Deps:** ninguna
- **Tests:**
- Entity añadida
- Entity eliminada
- Entity modificada con detalle de campos
- Entities identicas → unchanged
- ignore_fields funciona
- compare_fields filtra correctamente
- Lista vacia vs lista con datos
### diff_relations
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `diff_relations(before: list[dict], after: list[dict], key: tuple[str, str, str] = ("source_id", "target_id", "relation_type")) -> dict`
- **Retorno:** Misma estructura que diff_entities pero con key compuesta
- **Descripcion:** Compara relaciones entre dos snapshots. Key compuesta porque las relaciones se identifican por (source, target, type), no por un solo ID.
- **Deps:** ninguna
- **Tests:**
- Relacion añadida
- Relacion eliminada
- Relacion con metadata modificada (mismo source/target/type, distinto weight)
- Key compuesta funciona correctamente
### detect_drift
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `detect_drift(history: list[dict], current: dict, fields: list[str], threshold: float = 2.0) -> list[dict]`
- **Retorno:** Lista de `{"field": str, "current": float, "mean": float, "std": float, "z_score": float, "drifted": bool}`
- **Descripcion:** Detecta drift estadistico comparando metricas de la ejecucion actual contra el historial. Usa z-score — si |z| > threshold, el campo ha drifteado. Pensado para comparar metrics de executions sucesivas en operations.db.
- **Ejemplo:**
```python
history = [
{"records_out": 100, "duration_ms": 500},
{"records_out": 105, "duration_ms": 480},
{"records_out": 98, "duration_ms": 510},
]
current = {"records_out": 50, "duration_ms": 2000}
detect_drift(history, current, ["records_out", "duration_ms"])
# [{"field": "records_out", "current": 50, "mean": 101.0, "std": 3.6, "z_score": -14.2, "drifted": True},
# {"field": "duration_ms", "current": 2000, "mean": 496.7, "std": 15.3, "z_score": 98.3, "drifted": True}]
```
- **Deps:** `math` (solo stdlib)
- **Tests:**
- Campo con drift claro (z > threshold)
- Campo estable (z < threshold)
- Historial con un solo punto → std=0, no puede calcular → drifted=False con nota
- Historial vacio → todos drifted=False
- Threshold custom
### merge_graphs
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `merge_graphs(graphs: list[dict], entity_key: str = "name", similarity_threshold: float = 0.85) -> dict`
- **Retorno:**
```python
{
"entities": list[dict], # entities mergeadas (deduplicadas)
"relations": list[dict], # relaciones mergeadas
"merge_log": list[dict] # registro de merges: {"merged": [id1, id2], "into": id, "similarity": float}
}
```
- **Descripcion:** Mergea multiples grafos de conocimiento (de distintas fuentes o ejecuciones) en uno solo. Deduplicacion de entities por similitud de nombre (Levenshtein normalizado). Relaciones se re-apuntan a las entities mergeadas. Atributos se combinan (union de campos, ultimo gana en conflictos).
- **Algoritmo:**
1. Juntar todas las entities de todos los grafos
2. Para cada par de entities con similitud de nombre >= threshold, mergear
3. Elegir entity canonica (la que tiene mas campos no-null)
4. Re-apuntar relaciones al ID canonico
5. Deduplicar relaciones identicas (mismo source, target, type)
6. Registrar cada merge en merge_log
- **Deps del registry:** `levenshtein_distance_py_cybersecurity` (o reimplementar inline)
- **Tests:**
- Dos grafos con entity duplicada → merge
- Entities similares pero bajo threshold → no merge
- Relaciones re-apuntadas correctamente
- Merge log registra cada merge
- Tres grafos → merge transitivo
- Grafos sin overlap → concatenacion simple
### diff_entities (Go)
- **Dominio:** datascience
- **Lang:** Go
- **Purity:** pure
- **Signature:** `DiffEntities(before, after []map[string]any, key string, ignoreFields []string) map[string]any`
- **Descripcion:** Misma semantica que Python. Retorno como map para serializacion JSON directa.
- **Tests:** Mismos casos que Python
@@ -0,0 +1,73 @@
# Extract Entities from Text (LLM-based)
Diseño original para fuzzygraph.
## Funcion a implementar
### extract_entities_llm
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** impure (llama LLM)
- **Signature:**
```python
def extract_entities_llm(
text: str,
entity_schema: list[dict],
llm_chat_json: Callable[[list[dict]], dict],
language_instruction: str = "Respond in English.",
) -> list[EntityCandidate]:
```
- **Parametros:**
- `text`: chunk de texto a analizar
- `entity_schema`: lista de tipos con metadata fields, formato:
```python
[
{"type_ref": "osint_person_go_cybersecurity", "label": "Person",
"metadata_fields": ["full_name", "alias", "nationality", "dob", "risk_score"]},
{"type_ref": "osint_domain_go_cybersecurity", "label": "Domain",
"metadata_fields": ["fqdn", "registrar", "created_date"]},
...
]
```
- `llm_chat_json`: funcion que recibe messages y retorna dict (inyeccion de dependencia, no acoplado a OpenAI)
- `language_instruction`: instruccion de idioma para el LLM
- **Retorno:** `list[EntityCandidate]` (ver spec fg_09)
- **Algoritmo:**
1. Construir system prompt con el schema de entity types:
```
You are an entity extraction expert. Given text, extract all entities
matching these types. For each entity, provide: name, type_ref,
attributes (matching the metadata_fields for that type), and a
confidence score (0.0-1.0).
Entity types:
- Person (type_ref: osint_person_go_cybersecurity)
fields: full_name, alias, nationality, dob, risk_score
- Domain (type_ref: osint_domain_go_cybersecurity)
fields: fqdn, registrar, created_date
...
Output JSON: {"entities": [{"name": "...", "type_ref": "...", "attributes": {...}, "confidence": 0.9}]}
Rules:
- Only extract entities explicitly mentioned in the text
- Use the exact type_ref from the schema
- Leave unknown attributes as null
- Confidence: 1.0 = explicitly named, 0.7 = strongly implied, 0.5 = weakly implied
```
2. User message: el texto del chunk
3. Llamar `llm_chat_json([system, user])` → parsear respuesta
4. Validar: cada entity tiene name y type_ref valido (del schema)
5. Retornar lista de EntityCandidate
- **Error handling:**
- JSON invalido del LLM: retornar lista vacia + warning
- type_ref no en schema: descartar esa entidad
- **Deps:** ninguna especifica (el LLM se inyecta)
- **Error type:** ValueError (schema vacio)
- **Tests:**
- Texto con entidades claras (personas, dominios)
- Texto sin entidades (retorna [])
- LLM retorna JSON mal formado (retorna [] con warning)
- type_ref invalido en respuesta (se descarta)
- Confidence se propaga correctamente
@@ -0,0 +1,64 @@
# Extract Relations from Text (LLM-based)
Diseño original para fuzzygraph.
## Funcion a implementar
### extract_relations_llm
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** impure (llama LLM)
- **Signature:**
```python
def extract_relations_llm(
text: str,
entities: list[EntityCandidate],
relation_types: list[str],
llm_chat_json: Callable[[list[dict]], dict],
language_instruction: str = "Respond in English.",
) -> list[RelationCandidate]:
```
- **Parametros:**
- `text`: chunk de texto (mismo que se uso para extraer entidades)
- `entities`: entidades ya extraidas de este chunk (para que el LLM sepa entre que entidades buscar relaciones)
- `relation_types`: tipos de relacion permitidos, ej: `["funds", "employs", "communicates_with", "owns", "operates", "controls", "affiliated_with", "located_at", "resolves_to", "hosts", "exploits", "attributed_to", "related_to"]`
- `llm_chat_json`: funcion inyectada
- **Retorno:** `list[RelationCandidate]` (ver spec fg_09)
- **Algoritmo:**
1. Si `len(entities) < 2`: retornar [] (necesitas al menos 2 entidades para una relacion)
2. Construir system prompt:
```
You are a relation extraction expert. Given text and a list of
entities already extracted, identify relationships between them.
Entities found in this text:
- "John Smith" (Person)
- "Acme Corp" (Organization)
- "192.168.1.1" (IP Address)
Allowed relation types: funds, employs, communicates_with, ...
Output JSON: {"relations": [
{"from_name": "Acme Corp", "to_name": "John Smith",
"relation_type": "employs", "description": "...", "confidence": 0.8}
]}
Rules:
- Only extract relations explicitly stated or strongly implied
- from_name and to_name must match entity names exactly
- relation_type must be from the allowed list
- Confidence: 1.0 = explicitly stated, 0.7 = strongly implied
```
3. User message: el texto
4. Parsear respuesta, validar from/to contra nombres de entities
5. Retornar lista de RelationCandidate
- **Error handling:**
- from_name o to_name no matchean ninguna entidad: descartar relacion
- relation_type no en lista: usar "related_to" como fallback
- **Deps:** ninguna especifica
- **Tests:**
- Texto con 2 entidades relacionadas
- Texto con entidades pero sin relacion (retorna [])
- Menos de 2 entidades (retorna [])
- LLM inventa entidad que no existe (se descarta)
@@ -0,0 +1,50 @@
# Deduplicate Entities (fuzzy matching)
Diseño original para fuzzygraph.
## Funcion a implementar
### deduplicate_entities
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:**
```python
def deduplicate_entities(
candidates: list[EntityCandidate],
name_threshold: float = 0.85,
same_type_only: bool = True,
) -> DeduplicationResult:
```
- **Retorno:** `DeduplicationResult` con entities deduplicadas y merge log
- **Descripcion:** Agrupa entidades candidatas que refieren a la misma entidad real usando fuzzy matching de nombres. Cuando dos entidades son suficientemente similares (y del mismo tipo si `same_type_only=True`), se mergean en una sola combinando atributos.
- **Algoritmo:**
1. **Normalizar nombres:** `normalize_entity_name()` sobre cada candidato
2. **Agrupar por tipo** (si same_type_only): reducir comparaciones
3. **Comparacion pairwise dentro de cada grupo:**
- Calcular similitud: `1.0 - (levenshtein_distance(a, b) / max(len(a), len(b)))`
- Tambien comparar por tokens: `jaccard_similarity(tokens_a, tokens_b)`
- Score final: `max(levenshtein_sim, jaccard_sim)`
- Si score >= `name_threshold`: agrupar como misma entidad
4. **Union-Find** para clusters transitivos: si A~B y B~C, entonces {A,B,C} es un cluster
5. **Merge por cluster:**
- Nombre: el del candidato con mayor confidence
- Atributos: `merge_entity_attributes()` — combinar non-null de todos los candidatos
- Confidence: max del cluster
- Source chunks: union de todos
6. Retornar DeduplicationResult con entidades merged y log de merges
- **Optimizacion:** Para N candidatos, pairwise es O(N^2). Aceptable para <1000 entidades (caso tipico por documento). Si necesita escalar: pre-filtrar por primera letra o n-gram index.
- **Heuristicas adicionales de match:**
- Nombre contenido: "John" matchea "John Smith" si son del mismo tipo (score bonus +0.3)
- Acronimos: "FBI" matchea "Federal Bureau of Investigation" — detectar si un nombre es acronimo de otro
- IPs/emails/domains: matching exacto normalizado (no fuzzy)
- **Deps:** `levenshtein_distance`, `jaccard_similarity` (ya en registry)
- **Tests:**
- "John Smith" y "Smith, John" → merge
- "Google" y "Google LLC" → merge
- "192.168.1.1" y "192.168.1.1" → merge (exacto)
- "John Smith" (Person) y "John Smith" (Organization) → NO merge (distinto tipo)
- Clusters transitivos: A~B, B~C → {A,B,C}
- Entidades sin duplicados → sin cambios
- Confidence y atributos se mergean correctamente
@@ -0,0 +1,36 @@
# Deduplicate Relations
Diseño original para fuzzygraph.
## Funcion a implementar
### deduplicate_relations
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:**
```python
def deduplicate_relations(
relations: list[RelationCandidate],
entity_id_map: dict[str, str],
) -> list[RelationCandidate]:
```
- **Parametros:**
- `relations`: relaciones candidatas (con from_name/to_name originales)
- `entity_id_map`: mapping de nombre normalizado → entity_id final (output de deduplicate_entities). Permite resolver nombres que fueron mergeados: si "John Smith" y "Smith, John" se mergearon en entity_123, ambos nombres mapean a entity_123.
- **Retorno:** lista deduplicada de RelationCandidate con from/to resueltos a IDs finales
- **Algoritmo:**
1. **Resolver nombres a IDs:** Para cada relacion, mapear from_name y to_name al entity_id via entity_id_map. Si un nombre no tiene match, intentar fuzzy match contra las keys del map.
2. **Deduplicar por (from_id, to_id, relation_type):**
- Si multiples relaciones tienen el mismo (from, to, type): mergear
- Descripcion: concatenar descripciones unicas
- Confidence: max
- Self-loops (from_id == to_id): descartar
3. Retornar lista limpia
- **Deps:** `levenshtein_distance` (para fuzzy match de nombres no resueltos)
- **Tests:**
- 2 relaciones identicas (from, to, type) → 1
- Relacion con nombre mergeado → se resuelve al ID correcto
- Self-loop → descartado
- Relacion con nombre no mapeado → intenta fuzzy match, si falla descarta con warning
@@ -0,0 +1,51 @@
# Build Schema Prompts
Diseño original para fuzzygraph.
## Funciones a implementar
### 1. build_entity_schema_prompt
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `build_entity_schema_prompt(entity_presets: list[dict]) -> str`
- **Descripcion:** Genera la seccion del system prompt que describe los entity types disponibles para extraccion. Formatea los presets del registry en texto legible para el LLM.
- **Input:** Lista de presets tal como estan en fuzzygraph:
```python
[
{"type_ref": "osint_person_go_cybersecurity", "label": "Person",
"metadata_fields": ["full_name", "alias", "nationality", "dob", "risk_score"]},
...
]
```
- **Output:** String formateado:
```
Entity types available for extraction:
1. Person (type_ref: osint_person_go_cybersecurity)
Attributes: full_name, alias, nationality, dob, risk_score
2. Organization (type_ref: osint_organization_go_cybersecurity)
Attributes: legal_name, country, sector, founded, risk_score
...
```
- **Tests:** lista con varios presets, lista vacia, preset sin metadata_fields
### 2. build_relation_schema_prompt
- **Dominio:** datascience
- **Lang:** Python
- **Purity:** pure
- **Signature:** `build_relation_schema_prompt(relation_types: list[str]) -> str`
- **Descripcion:** Genera la seccion del system prompt con los tipos de relacion permitidos.
- **Input:** `["funds", "employs", "communicates_with", ...]`
- **Output:**
```
Allowed relation types:
funds, employs, communicates_with, owns, operates, controls,
affiliated_with, located_at, resolves_to, registered_by, hosts,
exploits, attributed_to, related_to
```
- **Tests:** lista normal, lista vacia
@@ -0,0 +1,41 @@
# Normalize Entity Name
Diseño original para fuzzygraph.
## Funcion a implementar
### normalize_entity_name
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `normalize_entity_name(name: str, entity_type: str = "") -> str`
- **Descripcion:** Normaliza el nombre de una entidad para comparacion y deduplicacion. Aplica reglas diferentes segun el tipo de entidad.
- **Algoritmo:**
1. Strip whitespace
2. **Si tipo es IP/email/domain/crypto_wallet/phone:** normalizar formato tecnico
- IP: strip, lower (para IPv6)
- Email: lower, strip
- Domain: lower, strip, remover trailing dot, remover "www." prefix
- Crypto wallet: strip (case-sensitive, no lowercase — Bitcoin addresses son case-sensitive)
- Phone: remover espacios, guiones, parentesis, mantener solo digitos y +
3. **Si tipo es persona:** normalizar nombre humano
- Remover titulos: "Dr.", "Mr.", "Mrs.", "Prof.", etc.
- "Apellido, Nombre" → "Nombre Apellido" (detectar formato con coma)
- Colapsar espacios multiples
- Title case
4. **Si tipo es organizacion:** normalizar nombre corporativo
- Remover sufijos legales: "Inc.", "LLC", "Ltd.", "Corp.", "S.A.", "GmbH", etc.
- Strip, colapsar espacios
5. **Default:** lower, strip, colapsar espacios
- **Deps:** `re`
- **Tests:**
- " John Smith " → "John Smith"
- "Smith, John" → "John Smith"
- "Dr. Jane Doe" → "Jane Doe"
- "Google LLC" → "Google"
- "ACME Corp." → "Acme"
- "192.168.1.1" → "192.168.1.1" (sin cambio)
- "user@GMAIL.com" → "user@gmail.com"
- "www.example.com." → "example.com"
- "+1 (555) 123-4567" → "+15551234567"
@@ -0,0 +1,34 @@
# Merge Entity Attributes
Diseño original para fuzzygraph.
## Funcion a implementar
### merge_entity_attributes
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `merge_entity_attributes(attr_list: list[dict]) -> dict`
- **Descripcion:** Combina atributos de multiples candidatos de la misma entidad. Cuando el mismo campo tiene valores diferentes, aplica heuristicas de resolucion.
- **Algoritmo:**
1. Para cada campo presente en cualquier candidato:
- Recopilar todos los valores non-null
- Si 0 valores: null
- Si 1 valor: usar ese
- Si todos iguales: usar ese
- Si diferentes:
- **Numerico** (risk_score, balance, cvss): usar max
- **Fecha** (first_seen, created_date): usar min (mas antigua)
- **Fecha** (last_seen, expires_date): usar max (mas reciente)
- **Lista** (name_servers): union
- **String** (description, alias): usar el mas largo, o concatenar si ambos aportan info
- **Boolean** (verified, exploited): usar True si alguno es True (OR)
2. Retornar dict merged
- **Deps:** ninguna
- **Tests:**
- Atributos complementarios (A tiene full_name, B tiene nationality) → ambos
- Atributos conflictivos en risk_score → max
- Atributos first_seen conflictivos → min
- Todos null → null
- Listas → union sin duplicados
@@ -0,0 +1,71 @@
# Extraction Pipeline (orquestador)
Diseño original para fuzzygraph.
## Pipeline a implementar
### extraction_pipeline
- **Dominio:** pipelines
- **Lang:** Python
- **Kind:** pipeline
- **Purity:** impure
- **Signature:**
```python
def extraction_pipeline(
file_path: str,
entity_presets: list[dict],
relation_types: list[str],
llm_chat_json: Callable[[list[dict]], dict],
chunk_size: int = 500,
chunk_overlap: int = 50,
confidence_threshold: float = 0.5,
dedup_threshold: float = 0.85,
on_progress: Callable[[str, float], None] | None = None,
) -> ExtractionResult:
```
- **Descripcion:** Pipeline completa de extraccion de entidades y relaciones desde un documento. Orquesta todas las funciones del stack.
- **uses_functions:**
- `extract_text_from_file` (mf_09)
- `preprocess_text` (mf_02)
- `split_text_into_chunks` (mf_01)
- `build_entity_schema_prompt` (fg_05)
- `build_relation_schema_prompt` (fg_05)
- `extract_entities_llm` (fg_01)
- `extract_relations_llm` (fg_02)
- `deduplicate_entities` (fg_03)
- `deduplicate_relations` (fg_04)
- **Algoritmo:**
1. **Extract:** `extract_text_from_file(file_path)` → texto crudo
2. **Preprocess:** `preprocess_text(text)` → texto limpio
3. **Split:** `split_text_into_chunks(text, chunk_size, chunk_overlap)` → chunks
4. **Extract entities per chunk:** Para cada chunk (con retry):
- `extract_entities_llm(chunk, entity_presets, llm_chat_json)`
- Anotar `source_chunk_index` en cada candidato
- progress callback: `(f"Extracting entities from chunk {i}/{n}", i/n * 0.4)`
5. **Flatten + filter:** Juntar todas las entidades, filtrar por `confidence >= confidence_threshold`
6. **Deduplicate entities:** `deduplicate_entities(all_entities, dedup_threshold)`
7. **Extract relations per chunk:** Para cada chunk:
- Obtener entidades relevantes de ese chunk
- `extract_relations_llm(chunk, chunk_entities, relation_types, llm_chat_json)`
- progress callback: `("Extracting relations...", 0.4 + i/n * 0.4)`
8. **Deduplicate relations:** `deduplicate_relations(all_relations, entity_id_map)`
9. **Return** ExtractionResult con entities, relations, stats
- **Progress:** 0-40% entity extraction, 40-80% relation extraction, 80-100% dedup
- **Error type:** FileNotFoundError, ValueError
- **Tests:** documento con entidades y relaciones, documento vacio, documento sin entidades detectables
## Integracion con fuzzygraph
La pipeline retorna `ExtractionResult` que contiene listas de entidades y relaciones listas para insertar en `operations.db` via `fn ops entity add` / `fn ops relation add`. El mapping a fuzzygraph seria:
```python
for entity in result.entities:
# entity.name → ops entity name
# entity.type_ref → ops entity type_ref
# entity.attributes → ops entity metadata (JSON)
db.add_entity(name=entity.name, type_ref=entity.type_ref, metadata=entity.attributes)
for relation in result.relations:
db.add_relation(name=relation.relation_type, from_entity=relation.from_id, to_entity=relation.to_id)
```
@@ -0,0 +1,102 @@
# Tipos: Extraction Pipeline
Diseño original para fuzzygraph.
## Tipos a implementar
### 1. EntityCandidate (product)
- **Dominio:** datascience
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class EntityCandidate:
name: str # nombre extraido del texto
name_normalized: str = "" # nombre normalizado (se llena en dedup)
type_ref: str = "" # registry type ID (ej: osint_person_go_cybersecurity)
type_label: str = "" # label humano (ej: "Person")
attributes: dict = field(default_factory=dict) # metadata extraida
confidence: float = 0.0 # 0.0-1.0
source_chunk_indices: list[int] = field(default_factory=list) # chunks donde aparecio
merged_from: list[str] = field(default_factory=list) # nombres originales si fue mergeado
```
- **Metodos:** `to_dict() -> dict`
- **Descripcion:** Candidato de entidad extraido por el LLM. Puede venir de un solo chunk o ser el resultado de mergear multiples extracciones. `merged_from` rastrea los nombres originales para debugging.
### 2. RelationCandidate (product)
- **Dominio:** datascience
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class RelationCandidate:
from_name: str # nombre de la entidad origen
to_name: str # nombre de la entidad destino
from_id: str = "" # entity ID resuelto (se llena en dedup)
to_id: str = "" # entity ID resuelto
relation_type: str = "" # tipo de relacion (de relationPresets)
description: str = "" # descripcion textual de la relacion
confidence: float = 0.0 # 0.0-1.0
source_chunk_index: int = -1 # chunk donde se extrajo
```
- **Metodos:** `to_dict() -> dict`
### 3. ExtractionResult (product)
- **Dominio:** datascience
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class ExtractionResult:
entities: list[EntityCandidate] # entidades finales (deduplicadas)
relations: list[RelationCandidate] # relaciones finales (deduplicadas)
stats: ExtractionStats = field(default_factory=lambda: ExtractionStats())
```
### 4. ExtractionStats (product)
- **Dominio:** datascience
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class ExtractionStats:
total_chunks: int = 0
total_chars: int = 0
raw_entities_count: int = 0 # antes de dedup
final_entities_count: int = 0 # despues de dedup
entities_merged: int = 0 # cuantas se mergearon
raw_relations_count: int = 0
final_relations_count: int = 0
relations_merged: int = 0
relations_discarded: int = 0 # self-loops + sin match
entity_types_found: dict[str, int] = field(default_factory=dict) # type_ref → count
relation_types_found: dict[str, int] = field(default_factory=dict)
processing_time_seconds: float = 0.0
```
- **Descripcion:** Estadisticas del proceso de extraccion. Util para reporting y debugging.
### 5. DeduplicationResult (product)
- **Dominio:** datascience
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class DeduplicationResult:
entities: list[EntityCandidate] # entidades finales
entity_id_map: dict[str, str] # nombre_normalizado → entity_id (para resolver relations)
name_to_id: dict[str, str] # todos los nombres originales → entity_id (incluyendo aliases)
merge_log: list[dict] = field(default_factory=list) # [{"merged": ["John Smith", "Smith, John"], "into": "John Smith", "score": 0.92}]
total_before: int = 0
total_after: int = 0
```
- **Descripcion:** Resultado de deduplicacion de entidades. El `name_to_id` mapea TODOS los nombres originales (incluyendo los mergeados) a su ID final, permitiendo resolver relaciones que usan cualquier variante del nombre.
@@ -0,0 +1,34 @@
# Split Text into Chunks (sentence-boundary aware)
Fuente conceptual: MiroFish `backend/app/utils/file_parser.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### split_text_into_chunks
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `split_text_into_chunks(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]`
- **Descripcion:** Divide texto en chunks de tamaño fijo con overlap, intentando cortar en limites de oracion para no romper frases a mitad.
- **Algoritmo:**
1. Si `len(text) <= chunk_size`: retornar `[text]` (o `[]` si vacio)
2. Sliding window con `start = 0`:
- `end = start + chunk_size`
- Si `end < len(text)`: buscar el ultimo separador de oracion en `text[start:end]`
- Separadores en orden de prioridad: `。`, ``, ``, `.\n`, `!\n`, `?\n`, `\n\n`, `. `, `! `, `? `
- Solo aceptar si el separador esta despues del 30% del chunk (evitar chunks muy cortos)
- Si se encuentra: ajustar `end` al separador + len(sep)
- Extraer chunk, strip, anadir si no vacio
- `start = end - overlap` (siguiente chunk empieza con overlap del anterior)
3. Retornar lista de chunks
- **Diferencia con smart_split_content (OpenViking):** Este divide por caracteres con overlap fijo y busca separadores de oracion. El de OpenViking divide por parrafos (doble newline) sin overlap. Son complementarios.
- **Deps:** ninguna
- **Tests:**
- Texto corto (cabe en 1 chunk)
- Texto largo con oraciones (corta en `.`)
- Texto CJK con `。` como separador
- Texto sin separadores (corta en chunk_size exacto)
- Overlap funciona (inicio del chunk N+1 = final del chunk N - overlap)
- Texto vacio → []
@@ -0,0 +1,36 @@
# Preprocess Text + Stats
Fuente conceptual: MiroFish `backend/app/services/text_processor.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. preprocess_text
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `preprocess_text(text: str) -> str`
- **Descripcion:** Normaliza whitespace y newlines de un texto crudo. Util como paso previo a chunking o indexing.
- **Algoritmo:**
1. Normalizar line endings: `\r\n``\n`, `\r``\n`
2. Reducir 3+ newlines consecutivos a maximo 2: `\n{3,}``\n\n`
3. Strip whitespace de cada linea
4. Strip global
- **Deps:** `re`
- **Tests:** texto con \r\n (Windows), texto con muchos newlines, texto con espacios leading/trailing
### 2. get_text_stats
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `get_text_stats(text: str) -> dict`
- **Retorno:** `{"total_chars": int, "total_lines": int, "total_words": int}`
- **Descripcion:** Estadisticas basicas de un texto: caracteres, lineas, palabras.
- **Algoritmo:**
1. `total_chars = len(text)`
2. `total_lines = text.count('\n') + 1`
3. `total_words = len(text.split())`
- **Deps:** ninguna
- **Tests:** texto normal, texto vacio, texto con solo newlines
@@ -0,0 +1,28 @@
# To PascalCase
Fuente conceptual: MiroFish `backend/app/services/ontology_generator.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### to_pascal_case
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `to_pascal_case(name: str) -> str`
- **Descripcion:** Convierte cualquier formato de nombre (snake_case, camelCase, kebab-case, mixto) a PascalCase.
- **Algoritmo:**
1. Split por caracteres no alfanumericos: `re.split(r'[^a-zA-Z0-9]+', name)`
2. Cada parte: split adicional por boundary camelCase: `re.sub(r'([a-z])([A-Z])', r'\1_\2', part).split('_')`
3. Capitalizar primera letra de cada word, join
4. Si resultado vacio: retornar `"Unknown"`
- **Ejemplos:**
- `"works_for"``"WorksFor"`
- `"camelCaseExample"``"CamelCaseExample"`
- `"person"``"Person"`
- `"SCREAMING_SNAKE"``"ScreamingSnake"`
- `"kebab-case"``"KebabCase"`
- `""``"Unknown"`
- **Deps:** `re`
- **Tests:** snake_case, camelCase, UPPER_SNAKE, kebab-case, PascalCase (idempotente), string vacio, con numeros
@@ -0,0 +1,48 @@
# LLM Response Cleaners
Fuente conceptual: MiroFish `backend/app/utils/llm_client.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. strip_think_tags
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `strip_think_tags(text: str) -> str`
- **Descripcion:** Remueve tags `<think>...</think>` que algunos modelos (MiniMax, DeepSeek) incluyen en sus respuestas como "chain of thought" interno.
- **Algoritmo:** `re.sub(r'<think>[\s\S]*?</think>', '', text).strip()`
- **Deps:** `re`
- **Tests:** texto sin tags (sin cambio), texto con think tags, tags multilinea, multiples tags
### 2. strip_markdown_codeblock
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `strip_markdown_codeblock(text: str) -> str`
- **Descripcion:** Remueve wrapping de code blocks markdown que modelos a veces anaden al responder JSON. Ej: ````json\n{...}\n```` → `{...}`
- **Algoritmo:**
1. Strip whitespace
2. Remover prefijo: `re.sub(r'^```(?:json)?\s*\n?', '', text, flags=re.IGNORECASE)`
3. Remover sufijo: `re.sub(r'\n?```\s*$', '', text)`
4. Strip final
- **Deps:** `re`
- **Tests:** JSON sin codeblock, JSON con ```json wrapper, JSON con ``` solo, texto que no es JSON
### 3. parse_llm_json
- **Dominio:** core
- **Lang:** Python
- **Purity:** pure
- **Signature:** `parse_llm_json(response: str) -> dict`
- **Descripcion:** Parsea una respuesta de LLM como JSON, limpiando primero think tags y markdown codeblocks. Combina strip_think_tags + strip_markdown_codeblock + json.loads.
- **Algoritmo:**
1. `strip_think_tags(response)`
2. `strip_markdown_codeblock(result)`
3. `json.loads(result)`
4. Si falla: raise ValueError con el texto limpiado para debugging
- **Deps:** `json`, `re`
- **Error type:** ValueError (JSON invalido)
- **Tests:** JSON limpio, JSON con think tags y codeblock, JSON invalido (error)
@@ -0,0 +1,33 @@
# Setup Logger (file rotation + console)
Fuente conceptual: MiroFish `backend/app/utils/logger.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### setup_logger
- **Dominio:** infra
- **Lang:** Python
- **Purity:** impure (crea archivos, modifica estado global de logging)
- **Signature:** `setup_logger(name: str = "app", log_dir: str = "logs", level: int = logging.DEBUG) -> logging.Logger`
- **Descripcion:** Configura un logger con dual output: archivo con rotacion diaria (detallado, DEBUG+) y consola (simple, INFO+). Maneja encoding UTF-8 en Windows.
- **Algoritmo:**
1. Crear log_dir si no existe
2. Crear logger con `logging.getLogger(name)`
3. `propagate = False` (evitar duplicados en root logger)
4. Si ya tiene handlers: retornar (idempotente)
5. Formato detallado (archivo): `[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s`
6. Formato simple (consola): `[%(asctime)s] %(levelname)s: %(message)s`
7. File handler: `RotatingFileHandler` con nombre `YYYY-MM-DD.log`, maxBytes=10MB, backupCount=5, encoding=utf-8
8. Console handler: `StreamHandler(sys.stdout)`, level=INFO
9. En Windows: `sys.stdout.reconfigure(encoding='utf-8', errors='replace')` para evitar mojibake
- **Deps:** `logging`, `logging.handlers`, `os`, `sys`, `datetime`
- **Tests:** logger se crea con 2 handlers, segundo call no duplica handlers, archivo se crea en log_dir
- **Companion:**
```python
def get_logger(name: str = "app") -> logging.Logger:
"""Get or create logger."""
logger = logging.getLogger(name)
return logger if logger.handlers else setup_logger(name)
```
@@ -0,0 +1,52 @@
# Retry Decorator (sync + async)
Fuente conceptual: MiroFish `backend/app/utils/retry.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
Nota: OpenViking spec 09 documenta retry como funciones. Este es complementario: retry como **decorador** con API mas ergonomica para decorar funciones existentes.
## Funciones a implementar
### 1. retry_with_backoff (decorator)
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (decorator que modifica comportamiento)
- **Signature:**
```python
def retry_with_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 30.0,
backoff_factor: float = 2.0,
jitter: bool = True,
exceptions: tuple[type[Exception], ...] = (Exception,),
on_retry: Callable[[Exception, int], None] | None = None
) -> Callable:
```
- **Descripcion:** Decorador que reintenta una funcion sincrona con exponential backoff cuando lanza excepciones del tipo especificado.
- **Uso:**
```python
@retry_with_backoff(max_retries=3, exceptions=(ConnectionError, TimeoutError))
def call_api():
...
```
- **Algoritmo:**
1. Wrapper con `functools.wraps`
2. Loop: intentar func(*args, **kwargs)
3. Si excepcion en `exceptions`:
- Si ultimo intento: re-raise
- Calcular delay: `min(delay, max_delay)`, si jitter: `*= (0.5 + random())`
- Llamar `on_retry(error, attempt)` si definido
- `time.sleep(delay)`, `delay *= backoff_factor`
- **Deps:** `functools`, `time`, `random`
### 2. retry_with_backoff_async (decorator)
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure
- **Signature:** Misma firma que `retry_with_backoff` pero retorna async wrapper
- **Descripcion:** Version async. Usa `asyncio.sleep` en vez de `time.sleep`.
- **Deps:** `asyncio`, `functools`, `random`
- **Tests:** funcion que falla 2 veces y luego exito (3 retries), funcion que siempre falla (agota retries y lanza), on_retry callback se llama, jitter produce delays variables, solo reintenta excepciones especificadas
@@ -0,0 +1,47 @@
# Cursor Paginator (generic)
Fuente conceptual: MiroFish `backend/app/utils/zep_paging.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### cursor_paginate
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (llama API externa)
- **Signature:**
```python
def cursor_paginate(
fetch_page: Callable[..., list[T]],
get_cursor: Callable[[T], str | None],
page_size: int = 100,
max_items: int = 2000,
max_retries: int = 3,
retry_delay: float = 2.0,
retryable_exceptions: tuple[type[Exception], ...] = (ConnectionError, TimeoutError, OSError),
) -> list[T]:
```
- **Descripcion:** Paginador generico basado en cursor que funciona con cualquier API que use cursor-based pagination. Cada pagina se obtiene con retry automatico. Se detiene cuando: pagina vacia, batch < page_size, o se alcanza max_items.
- **Algoritmo:**
1. `all_items = []`, `cursor = None`
2. Loop:
- Llamar `fetch_page(limit=page_size, cursor=cursor)` con retry (exponential backoff)
- Si batch vacio: break
- Extend all_items
- Si `len(all_items) >= max_items`: truncar y break
- Si `len(batch) < page_size`: break (ultima pagina)
- Obtener cursor del ultimo item: `cursor = get_cursor(batch[-1])`
- Si cursor es None: break
3. Retornar all_items
- **Patron:** Abstrae completamente el mecanismo de paginacion. El caller solo provee:
- `fetch_page`: funcion que recibe `limit` y `cursor` kwargs, retorna lista
- `get_cursor`: funcion que extrae el cursor del ultimo item
- **Deps:** `time`
- **Error type:** Exception (si todos los retries fallan en una pagina)
- **Tests:**
- API que retorna 3 paginas de 10 items
- API que falla 1 vez por pagina (retry funciona)
- max_items limita correctamente
- API que retorna pagina parcial (ultima pagina)
- Cursor None en ultimo item (se detiene)
@@ -0,0 +1,42 @@
# Simple i18n Translation
Fuente conceptual: MiroFish `backend/app/utils/locale.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funciones a implementar
### 1. t (translate)
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (lee estado thread-local)
- **Signature:** `t(key: str, locale: str | None = None, **kwargs) -> str`
- **Descripcion:** Traduce una clave con dot-path notation al idioma actual. Soporta interpolacion de variables con `{nombre}`.
- **Algoritmo:**
1. Determinar locale (parametro > thread-local > default)
2. Obtener diccionario de traducciones para ese locale
3. Navegar dot-path: `"report.taskStarted"``translations["report"]["taskStarted"]`
4. Si no existe: fallback al locale default
5. Si tampoco existe: retornar la key tal cual
6. Si hay kwargs: reemplazar `{key}``value` para cada kwarg
- **Ejemplo:**
```python
# translations = {"en": {"report": {"sectionStart": "Generating section: {title}"}}}
t("report.sectionStart", locale="en", title="Introduction")
# → "Generating section: Introduction"
```
- **Deps:** ninguna (stdlib)
- **Tests:** key existente, key inexistente (retorna key), interpolacion, dot-path profundo, fallback a locale default
### 2. load_translations
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (lee archivos)
- **Signature:** `load_translations(locales_dir: str) -> dict[str, dict]`
- **Descripcion:** Carga todos los archivos JSON de un directorio de locales. Cada archivo `{locale}.json` se carga como diccionario.
- **Algoritmo:**
1. Listar archivos `.json` en locales_dir
2. Para cada archivo: cargar JSON, key = nombre sin extension
3. Retornar dict de dicts
- **Deps:** `json`, `os`
@@ -0,0 +1,29 @@
# Extract Text from File (PDF/MD/TXT)
Fuente conceptual: MiroFish `backend/app/utils/file_parser.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### extract_text_from_file
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (I/O: lee archivos)
- **Signature:** `extract_text_from_file(file_path: str) -> str`
- **Descripcion:** Extrae texto plano de un archivo. Soporta PDF (PyMuPDF), Markdown y TXT (con deteccion automatica de encoding).
- **Algoritmo:**
1. Verificar que archivo existe
2. Determinar tipo por extension:
- `.pdf`: abrir con PyMuPDF (fitz), extraer texto de cada pagina, unir con `\n\n`
- `.md`, `.markdown`, `.txt`: leer con deteccion de encoding
3. Extension no soportada: raise ValueError
- **Deteccion de encoding (para texto):**
1. Intentar UTF-8
2. Si falla: `charset_normalizer.from_bytes(data).best().encoding`
3. Si falla: `chardet.detect(data)["encoding"]`
4. Ultimo recurso: UTF-8 con `errors='replace'`
- **Deps:** `PyMuPDF` (fitz), opcionalmente `charset_normalizer`, `chardet`
- **Error type:** FileNotFoundError, ValueError (extension no soportada), ImportError (PyMuPDF no instalado)
- **Diferencia con OpenViking:** OpenViking tiene parsers completos que convierten a markdown estructurado. Esta funcion es mas simple: solo extrae texto plano sin preservar estructura.
- **Tests:** PDF con texto, archivo MD UTF-8, archivo TXT latin-1, archivo inexistente (error), extension no soportada (error)
@@ -0,0 +1,60 @@
# ReACT Agent Loop
Fuente conceptual: MiroFish `backend/app/services/report_agent.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### react_loop
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (llama LLM y tools)
- **Signature:**
```python
def react_loop(
llm_chat: Callable[[list[dict]], str],
tools: dict[str, Callable[..., str]],
system_prompt: str,
user_prompt: str,
max_iterations: int = 5,
on_thought: Callable[[str], None] | None = None,
on_action: Callable[[str, dict], None] | None = None,
on_observation: Callable[[str], None] | None = None,
) -> str:
```
- **Descripcion:** Implementa el patron ReACT (Reasoning + Acting) para agentes LLM. El agente razona, decide usar herramientas, observa resultados, y repite hasta producir una respuesta final.
- **Algoritmo:**
1. Construir mensajes iniciales: system + user prompt con descripcion de tools disponibles
2. Loop (max_iterations):
- Llamar `llm_chat(messages)` → response
- Parsear response buscando patrones:
- `Thought:` → razonamiento del agente
- `Action:` → nombre de tool a llamar
- `Action Input:` → parametros (JSON)
- `Observation:` → (se llena con resultado del tool)
- `Final Answer:` → respuesta final, salir del loop
- Si hay Action:
- Llamar `tools[action_name](**action_input)`
- Anadir observacion a mensajes
- Callbacks: on_thought, on_action, on_observation
- Si hay Final Answer: retornar
3. Si se agota max_iterations: retornar ultimo response como fallback
- **Tools spec format:**
```
Available tools:
- search(query: str) -> str: Search knowledge graph
- analyze(entity_id: str) -> str: Deep dive into entity
```
- **Deps:** `json`, `re`
- **Error type:** Exception (tool falla, LLM timeout)
- **Notas:**
- El formato de parseo es flexible (Thought/Action/Observation es un patron comun en LangChain/ReACT)
- Los callbacks permiten logging/UI sin acoplar al agente
- La funcion es generica: funciona con cualquier LLM y cualquier set de tools
- **Tests:**
- Agente que usa 1 tool y da respuesta final
- Agente que usa multiples tools iterativamente
- Agente que da respuesta directa sin tools
- Max iterations alcanzado (fallback)
- Tool que falla (error handling)
@@ -0,0 +1,43 @@
# Batch Operations with Individual Retry
Fuente conceptual: MiroFish `backend/app/utils/retry.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Funcion a implementar
### call_batch_with_retry
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (ejecuta funciones, hace sleep)
- **Signature:**
```python
def call_batch_with_retry(
items: list[T],
process_func: Callable[[T], R],
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 30.0,
backoff_factor: float = 2.0,
exceptions: tuple[type[Exception], ...] = (Exception,),
continue_on_failure: bool = True,
) -> tuple[list[R], list[dict]]:
```
- **Retorno:** `(results, failures)` donde failures es `[{"index": int, "item": T, "error": str}]`
- **Descripcion:** Procesa una lista de items con retry individual por item. Si un item falla despues de todos los retries, se registra como failure y se continua con el siguiente (o se detiene, segun `continue_on_failure`).
- **Algoritmo:**
1. Para cada (index, item) en items:
- Intentar `process_func(item)` con retry (exponential backoff)
- Si exito: append a results
- Si falla despues de todos los retries:
- Append a failures con index, item, error
- Si `continue_on_failure=False`: re-raise
2. Retornar (results, failures)
- **Diferencia con retry simple:** El retry simple reintenta una sola llamada. Este maneja **listas** donde cada item se reintenta independientemente, y los fallos individuales no bloquean el resto del batch.
- **Deps:** `time`, `random`
- **Tests:**
- Todos los items exito
- 1 item falla permanentemente, rest exito (continue_on_failure=True)
- 1 item falla, abort (continue_on_failure=False)
- Item que falla 2 veces y luego exito (retry funciona)
- failures contiene index correcto
@@ -0,0 +1,66 @@
# Tipo: Task / TaskStatus / TaskManager
Fuente conceptual: MiroFish `backend/app/models/task.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. TaskStatus (enum/sum)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
class TaskStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
```
- **Descripcion:** Estado de una tarea de larga duracion (graph building, simulation, report generation).
### 2. Task (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class Task:
task_id: str # UUID
task_type: str # tipo libre: "graph_build", "simulation", etc.
status: TaskStatus
created_at: datetime
updated_at: datetime
progress: int = 0 # 0-100%
message: str = "" # mensaje de estado legible
result: dict | None = None # resultado al completar
error: str | None = None # error al fallar
metadata: dict = field(default_factory=dict)
progress_detail: dict = field(default_factory=dict) # detalles de progreso (etapa actual, items procesados, etc.)
```
- **Metodos:**
- `to_dict() -> dict` — serializacion con ISO timestamps
### 3. TaskManager (singleton thread-safe)
- **Dominio:** core
- **Lang:** Python
- **Purity:** impure (estado mutable, thread-safe)
- **Descripcion:** Gestor de tareas en memoria con thread safety. Patron singleton para acceso global. Util para tracking de tareas background con polling desde frontend.
- **Signature:**
```python
class TaskManager:
def create_task(self, task_type: str, metadata: dict | None = None) -> str: ...
def get_task(self, task_id: str) -> Task | None: ...
def update_task(self, task_id: str, status=None, progress=None, message=None, result=None, error=None, progress_detail=None) -> None: ...
def complete_task(self, task_id: str, result: dict) -> None: ...
def fail_task(self, task_id: str, error: str) -> None: ...
def list_tasks(self, task_type: str | None = None) -> list[dict]: ...
def cleanup_old_tasks(self, max_age_hours: int = 24) -> None: ...
```
- **Thread safety:** `threading.Lock` protege `_tasks: dict[str, Task]`
- **Singleton:** `__new__` con double-checked locking
- **Uso:** Background thread actualiza progreso via `update_task()`, frontend poll via `get_task()`. Pattern comun en web apps con tareas async.
@@ -0,0 +1,73 @@
# Tipos: AgentAction / RoundSummary / RunnerStatus
Fuente conceptual: MiroFish `backend/app/services/simulation_runner.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. RunnerStatus (enum/sum)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** sum
- **Definicion:**
```python
class RunnerStatus(str, Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
PAUSED = "paused"
STOPPING = "stopping"
STOPPED = "stopped"
COMPLETED = "completed"
FAILED = "failed"
```
- **Descripcion:** Maquina de estados para un runner de simulacion o pipeline de larga duracion. Transiciones validas:
- IDLE → STARTING → RUNNING
- RUNNING → PAUSED → RUNNING (resume)
- RUNNING → STOPPING → STOPPED
- RUNNING → COMPLETED
- RUNNING → FAILED
- STARTING → FAILED
### 2. AgentAction (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class AgentAction:
round_num: int # ronda de la simulacion
timestamp: str # ISO timestamp
platform: str # "twitter" | "reddit" | custom
agent_id: int # ID del agente
agent_name: str # nombre del agente
action_type: str # "CREATE_POST", "LIKE_POST", "COMMENT", etc.
action_args: dict = field(default_factory=dict) # parametros de la accion
result: str | None = None # resultado de la accion
success: bool = True # si la accion fue exitosa
```
- **Metodos:** `to_dict() -> dict`
- **Descripcion:** Registro de una accion individual de un agente en una simulacion multi-agente. Captura que hizo, cuando, en que plataforma, y si fue exitoso.
### 3. RoundSummary (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class RoundSummary:
round_num: int
start_time: str
end_time: str | None = None
simulated_hour: float = 0.0
actions: list[AgentAction] = field(default_factory=list)
active_agents: int = 0
```
- **Metodos:** `to_dict() -> dict`
- **Descripcion:** Resumen de una ronda de simulacion. Agrupa las acciones de todos los agentes en esa ronda con tiempos y conteos.
- **Uso:** Permite replay de simulaciones ronda por ronda, visualizacion de timeline, y analisis post-simulacion.
@@ -0,0 +1,46 @@
# Tipos: EntityNode / FilteredEntities
Fuente conceptual: MiroFish `backend/app/services/zep_entity_reader.py` (AGPL-3.0)
Reimplementar desde cero. No copiar codigo.
## Tipos a implementar
### 1. EntityNode (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class EntityNode:
uuid: str # identificador unico
name: str # nombre de la entidad
labels: list[str] = field(default_factory=list) # tipos/etiquetas (ej: ["Person", "Professor"])
summary: str = "" # resumen textual
attributes: dict = field(default_factory=dict) # atributos adicionales
related_edges: list[dict] = field(default_factory=list) # relaciones
related_nodes: list[dict] = field(default_factory=list) # nodos conectados
```
- **Metodos:**
- `to_dict() -> dict`
- `get_entity_type() -> str | None` — retorna el primer label que no sea generico ("Entity"), o None
- **Descripcion:** Nodo de entidad extraido de un knowledge graph. Contiene la identidad, tipo(s), atributos y relaciones de una entidad. Generico — no acoplado a Zep o ningun graph DB especifico.
### 2. FilteredEntities (product)
- **Dominio:** core
- **Lang:** Python
- **Algebraic:** product
- **Definicion:**
```python
@dataclass
class FilteredEntities:
entities: list[EntityNode]
entity_types: set[str] # tipos unicos encontrados
total_count: int # total antes de filtrar
filtered_count: int # total despues de filtrar
```
- **Metodos:** `to_dict() -> dict`
- **Descripcion:** Resultado de filtrar entidades de un graph por tipos definidos. Captura tanto las entidades filtradas como los conteos para reporting.
- **Uso:** Despues de construir un knowledge graph, se filtran las entidades por tipos definidos en la ontologia para obtener solo las relevantes. El total_count vs filtered_count muestra cuantas entidades se descartaron.