docs: cerrar issue 0036 — claude-code streaming completado
Mueve el issue a dev/issues/completed/ tras implementar todas las fases: - Fase 1: streaming del subproceso con stream-json - Fase 2: ProgressReporter con mensajes progresivos en Matrix - Fase 3: config schema y template actualizados - Fase 4: tests de integracion y regresion Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,248 @@
|
||||
# 0036 — Agente Claude Code completo con streaming de progreso
|
||||
|
||||
**Estado:** pendiente
|
||||
|
||||
## Objetivo
|
||||
|
||||
Transformar el provider `claude-code` para soportar streaming en tiempo real, permitiendo que los agentes que lo usan muestren en Matrix el progreso de uso de herramientas (e.g. "🔧 Ejecutando Bash: ls..." → "📝 Editando file.go..." → respuesta final). El usuario debe ver al agente trabajar como una sesion completa de Claude Code, no solo esperar en silencio hasta que termine.
|
||||
|
||||
## Contexto
|
||||
|
||||
- El provider `claude-code` vive en `shell/llm/claudecode.go` y ejecuta `claude -p --output-format json` como subproceso.
|
||||
- Actualmente usa `bytes.Buffer` para capturar stdout completo, espera a que el proceso termine, y luego parsea el JSON final. Durante todo este tiempo el usuario solo ve el typing indicator.
|
||||
- Claude CLI soporta `--output-format stream-json` que emite lineas JSON individuales conforme trabaja: eventos de `tool_use`, `tool_result`, `text` parcial, y `result` final.
|
||||
- El cliente Matrix (`shell/matrix/client.go`) tiene `SendMarkdown`, `SendReplyMarkdown`, `SendThreadMarkdown` pero **no tiene** `EditMessage` (m.replace). Las relaciones `m.relates_to` ya se manejan para threads.
|
||||
- El runner de effects (`shell/effects/runner.go`) ejecuta `[]decision.Action` pero no tiene concepto de mensajes progresivos.
|
||||
- `ClaudeCodeCfg` en `internal/config/schema.go` ya tiene campos para binary, timeout, tools, working_dir, permission_mode, model, etc. No tiene campos de streaming.
|
||||
|
||||
## Arquitectura
|
||||
|
||||
### Patron pure core / impure shell
|
||||
|
||||
- `pkg/llm/types.go` — tipos puros para streaming events (solo datos, sin I/O)
|
||||
- `shell/llm/claudecode.go` — impuro: pipe de stdout, parsing de stream JSON, invocacion de callbacks
|
||||
- `shell/matrix/client.go` — impuro: nuevo metodo `EditMessage` usando m.replace
|
||||
- `devagents/handler.go` — composicion: conectar progress reporter cuando el provider es claude-code con streaming
|
||||
- `internal/config/schema.go` — datos puros: nuevos campos en `ClaudeCodeCfg`
|
||||
|
||||
### Fase 1 — Streaming del subproceso
|
||||
|
||||
```
|
||||
claude -p --output-format stream-json
|
||||
│
|
||||
├── {"type":"tool_use", "tool":"Bash", "input":"ls -la"} ← tool empezando
|
||||
├── {"type":"tool_result", "tool":"Bash", "output":"..."} ← tool terminó
|
||||
├── {"type":"text", "content":"Analizando...", "partial":true} ← texto parcial
|
||||
└── {"type":"result", "result":"...", "usage":{...}} ← resultado final
|
||||
```
|
||||
|
||||
Se parsea linea a linea y se emite un `StreamEvent` por cada linea. El caller recibe eventos via callback `StreamFunc`.
|
||||
|
||||
### Fase 2 — Mensajes progresivos en Matrix
|
||||
|
||||
```
|
||||
StreamEvent(tool_use, "Bash", "ls -la")
|
||||
→ sender.SendMarkdown("⏳ Procesando...") ← mensaje inicial
|
||||
→ sender.EditMessage(eventID, "🔧 Bash: ls -la") ← editar con progreso
|
||||
StreamEvent(tool_use, "Read", "main.go")
|
||||
→ sender.EditMessage(eventID, "📖 Read: main.go") ← editar de nuevo
|
||||
StreamEvent(result, content)
|
||||
→ sender.EditMessage(eventID, content) ← reemplazar con resultado final
|
||||
```
|
||||
|
||||
Un solo mensaje que se edita progresivamente. Evita spam de multiples mensajes.
|
||||
|
||||
### Fase 3 — Config y UX
|
||||
|
||||
Nuevos campos en `ClaudeCodeCfg`:
|
||||
```yaml
|
||||
claude_code:
|
||||
streaming: true # usar stream-json en vez de json
|
||||
show_tool_progress: true # mostrar progreso de tools en Matrix
|
||||
```
|
||||
|
||||
## Tareas
|
||||
|
||||
### Fase 1 — Streaming del subproceso
|
||||
|
||||
- [ ] **1.1** Añadir tipos puros de streaming a `pkg/llm/types.go`:
|
||||
- `StreamEventKind` (string type): `StreamToolUse`, `StreamToolResult`, `StreamText`, `StreamResult`, `StreamError`
|
||||
- `StreamEvent` struct: `Kind StreamEventKind`, `ToolName string`, `ToolInput string`, `Content string`, `IsPartial bool`, `Error error`
|
||||
- `StreamFunc` callback type: `func(event StreamEvent)`
|
||||
- Estos tipos son datos puros, sin side effects — coherente con el resto de `pkg/llm/`
|
||||
|
||||
- [ ] **1.2** Refactorizar `NewClaudeCodeComplete` para soportar modo streaming:
|
||||
- Nueva funcion `NewClaudeCodeStream(cfg, log) (CompleteFunc, StreamFunc setter)` o añadir `StreamFunc` al closure
|
||||
- Cuando `cfg.Streaming == true`: usar `cmd.StdoutPipe()` en vez de `bytes.Buffer`
|
||||
- Leer stdout linea a linea con `bufio.Scanner`
|
||||
- Parsear cada linea JSON e invocar `StreamFunc(event)` si no es nil
|
||||
- Acumular el resultado final para retornarlo como `CompletionResponse` normal (compatibilidad)
|
||||
- Si `cfg.Streaming == false` o `StreamFunc == nil`: mantener comportamiento actual (buffered)
|
||||
|
||||
- [ ] **1.3** Implementar parser de eventos stream-json:
|
||||
- Investigar formato exacto de `claude --output-format stream-json` (ejecutar `claude --help` para confirmar)
|
||||
- Funcion pura `parseStreamLine(line []byte) (StreamEvent, error)` en `shell/llm/claudecode.go`
|
||||
- Mapear los tipos de evento del CLI a `StreamEventKind`
|
||||
- Manejar lineas vacias y JSON malformado sin crash
|
||||
|
||||
- [ ] **1.4** Preservar cleanup de process group:
|
||||
- `cmd.SysProcAttr`, `cmd.Cancel` y el kill post-Run deben funcionar identicamente en modo streaming
|
||||
- El pipe de stdout debe cerrarse correctamente cuando el contexto se cancela
|
||||
|
||||
- [ ] **1.5** Tests unitarios para parsing de eventos:
|
||||
- Test `parseStreamLine` con samples de cada tipo de evento
|
||||
- Test de streaming completo con mock de stdout (io.Pipe)
|
||||
- Test de fallback a modo buffered cuando streaming == false
|
||||
- Test de cancelacion via contexto durante streaming
|
||||
|
||||
### Fase 2 — Mensajes progresivos en Matrix
|
||||
|
||||
- [ ] **2.1** Añadir `EditMessage` a `shell/matrix/client.go`:
|
||||
- Metodo `EditMessage(ctx, roomID, eventID, markdown string) error`
|
||||
- Usar `m.relates_to` con `rel_type: "m.replace"` y `event_id: <original>`
|
||||
- Incluir `m.new_content` con el nuevo body (formatted_body para markdown)
|
||||
- Retornar error si el eventID original no existe o la edicion falla
|
||||
|
||||
- [ ] **2.2** Añadir `SendMarkdownWithID` o modificar `SendMarkdown` para retornar `eventID`:
|
||||
- El progress reporter necesita el eventID del mensaje inicial para poder editarlo
|
||||
- Evaluar: nuevo metodo que retorna `(id.EventID, error)` vs cambiar firma existente (breaking change)
|
||||
- Recomendacion: nuevo metodo `SendMarkdownGetID(ctx, roomID, markdown) (string, error)` para no romper callers existentes
|
||||
|
||||
- [ ] **2.3** Implementar progress reporter en `shell/effects/progress.go` (NEW):
|
||||
- `ProgressReporter` struct con: sender (interface), roomID, eventID del mensaje actual
|
||||
- Metodo `HandleEvent(event StreamEvent)` que:
|
||||
- En primer evento: envia mensaje inicial "⏳ Procesando..." y guarda eventID
|
||||
- En `StreamToolUse`: edita a "🔧 {ToolName}: {ToolInput truncado a 80 chars}"
|
||||
- En `StreamToolResult`: edita a "✅ {ToolName} completado"
|
||||
- En `StreamText` parcial: ignora (demasiadas ediciones)
|
||||
- En `StreamResult`: edita con el contenido final completo
|
||||
- En `StreamError`: edita con "❌ Error: {mensaje}"
|
||||
- Rate limiter interno: maximo 1 edit por segundo para evitar rate limits de Matrix
|
||||
- Formateo con emojis configurable via config
|
||||
|
||||
- [ ] **2.4** Conectar progress reporter en `devagents/handler.go`:
|
||||
- En el flujo LLM del handler, si el provider es `claude-code` y `cfg.ClaudeCode.Streaming`:
|
||||
- Crear `ProgressReporter` con el sender y roomID
|
||||
- Pasar `reporter.HandleEvent` como `StreamFunc` al provider
|
||||
- El resultado final llega como `CompletionResponse` normal (el handler no cambia su flujo)
|
||||
- Si streaming deshabilitado: flujo actual sin cambios
|
||||
|
||||
- [ ] **2.5** Tests del progress reporter:
|
||||
- Test con mock sender que registra llamadas a SendMarkdown y EditMessage
|
||||
- Verificar secuencia: Send → Edit → Edit → Edit(final)
|
||||
- Verificar rate limiting: multiples eventos rapidos → solo 1 edit/segundo
|
||||
- Verificar truncado de ToolInput largo
|
||||
|
||||
### Fase 3 — Config y polish
|
||||
|
||||
- [ ] **3.1** Añadir campos a `ClaudeCodeCfg` en `internal/config/schema.go`:
|
||||
```go
|
||||
Streaming bool `yaml:"streaming"` // use stream-json output format (default false)
|
||||
ShowToolProgress bool `yaml:"show_tool_progress"` // show tool progress via message edits (default false)
|
||||
```
|
||||
|
||||
- [ ] **3.2** Actualizar `buildClaudeArgs` para usar `--output-format stream-json` cuando `cfg.Streaming == true`
|
||||
|
||||
- [ ] **3.3** Actualizar templates de config (`agents/_template/config.yaml`) con opciones de streaming (comentadas):
|
||||
```yaml
|
||||
# claude_code:
|
||||
# streaming: true # streaming en tiempo real (default: false)
|
||||
# show_tool_progress: true # mostrar progreso de tools en Matrix
|
||||
```
|
||||
|
||||
- [ ] **3.4** Documentar en el system prompt template (`.claude/templates/security-prompt.md` o README) que el agente puede mostrar progreso de trabajo
|
||||
|
||||
### Fase 4 — Tests de integracion y cleanup
|
||||
|
||||
- [ ] **4.1** Test de integracion: simular un flujo completo stream → progress reporter → mock sender
|
||||
- [ ] **4.2** Verificar que agentes con `streaming: false` no se ven afectados (regression)
|
||||
- [ ] **4.3** Actualizar `CLAUDE.md` si se añaden nuevas secciones de arquitectura relevantes
|
||||
|
||||
## Ejemplo de uso
|
||||
|
||||
### Config del agente
|
||||
|
||||
```yaml
|
||||
# agents/asistente-2/config.yaml
|
||||
llm:
|
||||
primary:
|
||||
provider: claude-code
|
||||
claude_code:
|
||||
working_dir: "/tmp/claude-agents/asistente-2"
|
||||
permission_mode: "bypassPermissions"
|
||||
streaming: true
|
||||
show_tool_progress: true
|
||||
```
|
||||
|
||||
### Flujo en Matrix
|
||||
|
||||
```
|
||||
Usuario: Analiza la estructura de este proyecto y dame un resumen
|
||||
|
||||
Bot (mensaje inicial):
|
||||
⏳ Procesando...
|
||||
|
||||
Bot (edit 1, ~2s despues):
|
||||
🔧 Bash: find . -name '*.go' | head -20
|
||||
|
||||
Bot (edit 2, ~4s despues):
|
||||
📖 Read: cmd/launcher/main.go
|
||||
|
||||
Bot (edit 3, ~6s despues):
|
||||
🔧 Bash: wc -l pkg/**/*.go
|
||||
|
||||
Bot (edit final, ~15s despues):
|
||||
## Estructura del proyecto
|
||||
|
||||
El proyecto es un monorepo Go con 45 archivos .go organizados en:
|
||||
- `pkg/` — core puro con tipos y reglas de decision
|
||||
- `shell/` — I/O impuro (Matrix, LLM, SSH)
|
||||
- `agents/` — definiciones de agentes
|
||||
[... respuesta completa ...]
|
||||
```
|
||||
|
||||
### Sin streaming (comportamiento actual)
|
||||
|
||||
```
|
||||
Usuario: Analiza la estructura de este proyecto y dame un resumen
|
||||
|
||||
[typing indicator durante 15 segundos]
|
||||
|
||||
Bot:
|
||||
## Estructura del proyecto
|
||||
[... respuesta completa ...]
|
||||
```
|
||||
|
||||
## Decisiones de diseño
|
||||
|
||||
1. **Edit en vez de multiples mensajes**: usar `m.replace` para editar un solo mensaje progresivamente evita spam en el chat. Si el homeserver no soporta ediciones, el fallback es enviar solo el resultado final (sin progreso intermedio).
|
||||
|
||||
2. **StreamFunc como callback, no como channel**: un callback `func(StreamEvent)` es mas simple que un channel y no requiere goroutine de consumo. El caller decide que hacer con cada evento sincrónicamente.
|
||||
|
||||
3. **Rate limit de 1 edit/segundo**: Matrix homeservers tipicamente tienen rate limits de 5-10 requests/segundo. Con 1 edit/segundo dejamos margen para otros mensajes del agente y evitamos 429 Too Many Requests.
|
||||
|
||||
4. **Tipos puros en `pkg/llm/`**: `StreamEvent` y `StreamFunc` son tipos de datos sin I/O. Estan en el package puro porque describen el contrato entre el provider (impuro) y el consumer (impuro). El tipo en si no tiene side effects.
|
||||
|
||||
5. **Backward compatible**: `streaming: false` (default) mantiene el comportamiento actual exacto. La refactorizacion de `claudecode.go` no cambia la firma de `CompleteFunc` — el streaming es un side channel via `StreamFunc`.
|
||||
|
||||
6. **`SendMarkdownGetID` nuevo en vez de cambiar firma**: añadir un metodo nuevo que retorne el eventID evita romper todos los callers existentes de `SendMarkdown`. El progress reporter usa el metodo nuevo; el resto del codigo no cambia.
|
||||
|
||||
## Prerequisitos
|
||||
|
||||
- Verificar que `claude --output-format stream-json` existe y documentar el formato exacto de sus eventos. Si el CLI no soporta `stream-json`, investigar alternativas:
|
||||
- `--output-format json` con lectura line-buffered del proceso (puede no emitir JSON parcial)
|
||||
- Parsear stderr del proceso para eventos de progreso
|
||||
- Usar la API directa de Anthropic con streaming en vez del CLI
|
||||
- El campo `ClaudeCodeCfg` ya existe en el schema — solo se añaden campos nuevos.
|
||||
- `m.relates_to` con `m.replace` ya se parsea para threads en el listener — la logica de edicion es el inverso (enviar en vez de recibir).
|
||||
|
||||
## Riesgos
|
||||
|
||||
| Riesgo | Mitigacion |
|
||||
|--------|------------|
|
||||
| Formato de `stream-json` cambia entre versiones del CLI | Parseo defensivo: lineas no reconocidas se ignoran silenciosamente, el resultado final siempre se captura |
|
||||
| Rate limits de Matrix en message edits | Rate limiter de 1 edit/segundo configurable; si falla un edit, se loguea warning y se continua |
|
||||
| Tareas largas (>5min) timeout del subproceso | Ya manejado via `context.WithTimeout` y `cfg.Timeout`. El progress reporter muestra el ultimo estado antes del timeout |
|
||||
| Homeserver no soporta m.replace | Detectar error 400 en primer edit; si falla, desactivar ediciones para esa sesion y enviar solo resultado final |
|
||||
| Stdout pipe se bloquea si no se lee | `bufio.Scanner` en goroutine consume stdout continuamente; el pipe no se bloquea mientras el scanner este activo |
|
||||
| `claude` CLI no esta instalado o no soporta stream-json | Fallback a modo buffered con warning en logs. La feature es opt-in (`streaming: false` por defecto) |
|
||||
Reference in New Issue
Block a user