Files
agents_and_robots/dev/issues/0036-claude-code-streaming.md
T
egutierrez 52d5632d89 docs: crear issues 0036-0041 — nuevas features del sistema
Issues planificados:
- 0036: Claude Code streaming de progreso en Matrix
- 0037: Agente que crea otros agentes/bots via Matrix
- 0038: Webapps y dashboards embebidos en Element via widgets
- 0039: Recordatorios dinámicos y crons que invocan agentes
- 0040: Soporte para mensajes de voz (audio → STT)
- 0041: Videollamadas con agentes via LiveKit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 21:19:09 +00:00

13 KiB

0036 — Agente Claude Code completo con streaming de progreso

Estado: pendiente

Objetivo

Transformar el provider claude-code para soportar streaming en tiempo real, permitiendo que los agentes que lo usan muestren en Matrix el progreso de uso de herramientas (e.g. "🔧 Ejecutando Bash: ls..." → "📝 Editando file.go..." → respuesta final). El usuario debe ver al agente trabajar como una sesion completa de Claude Code, no solo esperar en silencio hasta que termine.

Contexto

  • El provider claude-code vive en shell/llm/claudecode.go y ejecuta claude -p --output-format json como subproceso.
  • Actualmente usa bytes.Buffer para capturar stdout completo, espera a que el proceso termine, y luego parsea el JSON final. Durante todo este tiempo el usuario solo ve el typing indicator.
  • Claude CLI soporta --output-format stream-json que emite lineas JSON individuales conforme trabaja: eventos de tool_use, tool_result, text parcial, y result final.
  • El cliente Matrix (shell/matrix/client.go) tiene SendMarkdown, SendReplyMarkdown, SendThreadMarkdown pero no tiene EditMessage (m.replace). Las relaciones m.relates_to ya se manejan para threads.
  • El runner de effects (shell/effects/runner.go) ejecuta []decision.Action pero no tiene concepto de mensajes progresivos.
  • ClaudeCodeCfg en internal/config/schema.go ya tiene campos para binary, timeout, tools, working_dir, permission_mode, model, etc. No tiene campos de streaming.

Arquitectura

Patron pure core / impure shell

  • pkg/llm/types.go — tipos puros para streaming events (solo datos, sin I/O)
  • shell/llm/claudecode.go — impuro: pipe de stdout, parsing de stream JSON, invocacion de callbacks
  • shell/matrix/client.go — impuro: nuevo metodo EditMessage usando m.replace
  • devagents/handler.go — composicion: conectar progress reporter cuando el provider es claude-code con streaming
  • internal/config/schema.go — datos puros: nuevos campos en ClaudeCodeCfg

Fase 1 — Streaming del subproceso

claude -p --output-format stream-json
  │
  ├── {"type":"tool_use", "tool":"Bash", "input":"ls -la"}       ← tool empezando
  ├── {"type":"tool_result", "tool":"Bash", "output":"..."}      ← tool terminó
  ├── {"type":"text", "content":"Analizando...", "partial":true}  ← texto parcial
  └── {"type":"result", "result":"...", "usage":{...}}           ← resultado final

Se parsea linea a linea y se emite un StreamEvent por cada linea. El caller recibe eventos via callback StreamFunc.

Fase 2 — Mensajes progresivos en Matrix

StreamEvent(tool_use, "Bash", "ls -la")
  → sender.SendMarkdown("⏳ Procesando...")           ← mensaje inicial
  → sender.EditMessage(eventID, "🔧 Bash: ls -la")   ← editar con progreso
StreamEvent(tool_use, "Read", "main.go")
  → sender.EditMessage(eventID, "📖 Read: main.go")   ← editar de nuevo
StreamEvent(result, content)
  → sender.EditMessage(eventID, content)              ← reemplazar con resultado final

Un solo mensaje que se edita progresivamente. Evita spam de multiples mensajes.

Fase 3 — Config y UX

Nuevos campos en ClaudeCodeCfg:

claude_code:
  streaming: true              # usar stream-json en vez de json
  show_tool_progress: true     # mostrar progreso de tools en Matrix

Tareas

Fase 1 — Streaming del subproceso

  • 1.1 Añadir tipos puros de streaming a pkg/llm/types.go:

    • StreamEventKind (string type): StreamToolUse, StreamToolResult, StreamText, StreamResult, StreamError
    • StreamEvent struct: Kind StreamEventKind, ToolName string, ToolInput string, Content string, IsPartial bool, Error error
    • StreamFunc callback type: func(event StreamEvent)
    • Estos tipos son datos puros, sin side effects — coherente con el resto de pkg/llm/
  • 1.2 Refactorizar NewClaudeCodeComplete para soportar modo streaming:

    • Nueva funcion NewClaudeCodeStream(cfg, log) (CompleteFunc, StreamFunc setter) o añadir StreamFunc al closure
    • Cuando cfg.Streaming == true: usar cmd.StdoutPipe() en vez de bytes.Buffer
    • Leer stdout linea a linea con bufio.Scanner
    • Parsear cada linea JSON e invocar StreamFunc(event) si no es nil
    • Acumular el resultado final para retornarlo como CompletionResponse normal (compatibilidad)
    • Si cfg.Streaming == false o StreamFunc == nil: mantener comportamiento actual (buffered)
  • 1.3 Implementar parser de eventos stream-json:

    • Investigar formato exacto de claude --output-format stream-json (ejecutar claude --help para confirmar)
    • Funcion pura parseStreamLine(line []byte) (StreamEvent, error) en shell/llm/claudecode.go
    • Mapear los tipos de evento del CLI a StreamEventKind
    • Manejar lineas vacias y JSON malformado sin crash
  • 1.4 Preservar cleanup de process group:

    • cmd.SysProcAttr, cmd.Cancel y el kill post-Run deben funcionar identicamente en modo streaming
    • El pipe de stdout debe cerrarse correctamente cuando el contexto se cancela
  • 1.5 Tests unitarios para parsing de eventos:

    • Test parseStreamLine con samples de cada tipo de evento
    • Test de streaming completo con mock de stdout (io.Pipe)
    • Test de fallback a modo buffered cuando streaming == false
    • Test de cancelacion via contexto durante streaming

Fase 2 — Mensajes progresivos en Matrix

  • 2.1 Añadir EditMessage a shell/matrix/client.go:

    • Metodo EditMessage(ctx, roomID, eventID, markdown string) error
    • Usar m.relates_to con rel_type: "m.replace" y event_id: <original>
    • Incluir m.new_content con el nuevo body (formatted_body para markdown)
    • Retornar error si el eventID original no existe o la edicion falla
  • 2.2 Añadir SendMarkdownWithID o modificar SendMarkdown para retornar eventID:

    • El progress reporter necesita el eventID del mensaje inicial para poder editarlo
    • Evaluar: nuevo metodo que retorna (id.EventID, error) vs cambiar firma existente (breaking change)
    • Recomendacion: nuevo metodo SendMarkdownGetID(ctx, roomID, markdown) (string, error) para no romper callers existentes
  • 2.3 Implementar progress reporter en shell/effects/progress.go (NEW):

    • ProgressReporter struct con: sender (interface), roomID, eventID del mensaje actual
    • Metodo HandleEvent(event StreamEvent) que:
      • En primer evento: envia mensaje inicial " Procesando..." y guarda eventID
      • En StreamToolUse: edita a "🔧 {ToolName}: {ToolInput truncado a 80 chars}"
      • En StreamToolResult: edita a " {ToolName} completado"
      • En StreamText parcial: ignora (demasiadas ediciones)
      • En StreamResult: edita con el contenido final completo
      • En StreamError: edita con " Error: {mensaje}"
    • Rate limiter interno: maximo 1 edit por segundo para evitar rate limits de Matrix
    • Formateo con emojis configurable via config
  • 2.4 Conectar progress reporter en devagents/handler.go:

    • En el flujo LLM del handler, si el provider es claude-code y cfg.ClaudeCode.Streaming:
      • Crear ProgressReporter con el sender y roomID
      • Pasar reporter.HandleEvent como StreamFunc al provider
      • El resultado final llega como CompletionResponse normal (el handler no cambia su flujo)
    • Si streaming deshabilitado: flujo actual sin cambios
  • 2.5 Tests del progress reporter:

    • Test con mock sender que registra llamadas a SendMarkdown y EditMessage
    • Verificar secuencia: Send → Edit → Edit → Edit(final)
    • Verificar rate limiting: multiples eventos rapidos → solo 1 edit/segundo
    • Verificar truncado de ToolInput largo

Fase 3 — Config y polish

  • 3.1 Añadir campos a ClaudeCodeCfg en internal/config/schema.go:

    Streaming        bool `yaml:"streaming"`          // use stream-json output format (default false)
    ShowToolProgress bool `yaml:"show_tool_progress"` // show tool progress via message edits (default false)
    
  • 3.2 Actualizar buildClaudeArgs para usar --output-format stream-json cuando cfg.Streaming == true

  • 3.3 Actualizar templates de config (agents/_template/config.yaml) con opciones de streaming (comentadas):

    # claude_code:
    #   streaming: true              # streaming en tiempo real (default: false)
    #   show_tool_progress: true     # mostrar progreso de tools en Matrix
    
  • 3.4 Documentar en el system prompt template (.claude/templates/security-prompt.md o README) que el agente puede mostrar progreso de trabajo

Fase 4 — Tests de integracion y cleanup

  • 4.1 Test de integracion: simular un flujo completo stream → progress reporter → mock sender
  • 4.2 Verificar que agentes con streaming: false no se ven afectados (regression)
  • 4.3 Actualizar CLAUDE.md si se añaden nuevas secciones de arquitectura relevantes

Ejemplo de uso

Config del agente

# agents/asistente-2/config.yaml
llm:
  primary:
    provider: claude-code
    claude_code:
      working_dir: "/tmp/claude-agents/asistente-2"
      permission_mode: "bypassPermissions"
      streaming: true
      show_tool_progress: true

Flujo en Matrix

Usuario: Analiza la estructura de este proyecto y dame un resumen

Bot (mensaje inicial):
  ⏳ Procesando...

Bot (edit 1, ~2s despues):
  🔧 Bash: find . -name '*.go' | head -20

Bot (edit 2, ~4s despues):
  📖 Read: cmd/launcher/main.go

Bot (edit 3, ~6s despues):
  🔧 Bash: wc -l pkg/**/*.go

Bot (edit final, ~15s despues):
  ## Estructura del proyecto

  El proyecto es un monorepo Go con 45 archivos .go organizados en:
  - `pkg/` — core puro con tipos y reglas de decision
  - `shell/` — I/O impuro (Matrix, LLM, SSH)
  - `agents/` — definiciones de agentes
  [... respuesta completa ...]

Sin streaming (comportamiento actual)

Usuario: Analiza la estructura de este proyecto y dame un resumen

[typing indicator durante 15 segundos]

Bot:
  ## Estructura del proyecto
  [... respuesta completa ...]

Decisiones de diseño

  1. Edit en vez de multiples mensajes: usar m.replace para editar un solo mensaje progresivamente evita spam en el chat. Si el homeserver no soporta ediciones, el fallback es enviar solo el resultado final (sin progreso intermedio).

  2. StreamFunc como callback, no como channel: un callback func(StreamEvent) es mas simple que un channel y no requiere goroutine de consumo. El caller decide que hacer con cada evento sincrónicamente.

  3. Rate limit de 1 edit/segundo: Matrix homeservers tipicamente tienen rate limits de 5-10 requests/segundo. Con 1 edit/segundo dejamos margen para otros mensajes del agente y evitamos 429 Too Many Requests.

  4. Tipos puros en pkg/llm/: StreamEvent y StreamFunc son tipos de datos sin I/O. Estan en el package puro porque describen el contrato entre el provider (impuro) y el consumer (impuro). El tipo en si no tiene side effects.

  5. Backward compatible: streaming: false (default) mantiene el comportamiento actual exacto. La refactorizacion de claudecode.go no cambia la firma de CompleteFunc — el streaming es un side channel via StreamFunc.

  6. SendMarkdownGetID nuevo en vez de cambiar firma: añadir un metodo nuevo que retorne el eventID evita romper todos los callers existentes de SendMarkdown. El progress reporter usa el metodo nuevo; el resto del codigo no cambia.

Prerequisitos

  • Verificar que claude --output-format stream-json existe y documentar el formato exacto de sus eventos. Si el CLI no soporta stream-json, investigar alternativas:
    • --output-format json con lectura line-buffered del proceso (puede no emitir JSON parcial)
    • Parsear stderr del proceso para eventos de progreso
    • Usar la API directa de Anthropic con streaming en vez del CLI
  • El campo ClaudeCodeCfg ya existe en el schema — solo se añaden campos nuevos.
  • m.relates_to con m.replace ya se parsea para threads en el listener — la logica de edicion es el inverso (enviar en vez de recibir).

Riesgos

Riesgo Mitigacion
Formato de stream-json cambia entre versiones del CLI Parseo defensivo: lineas no reconocidas se ignoran silenciosamente, el resultado final siempre se captura
Rate limits de Matrix en message edits Rate limiter de 1 edit/segundo configurable; si falla un edit, se loguea warning y se continua
Tareas largas (>5min) timeout del subproceso Ya manejado via context.WithTimeout y cfg.Timeout. El progress reporter muestra el ultimo estado antes del timeout
Homeserver no soporta m.replace Detectar error 400 en primer edit; si falla, desactivar ediciones para esa sesion y enviar solo resultado final
Stdout pipe se bloquea si no se lee bufio.Scanner en goroutine consume stdout continuamente; el pipe no se bloquea mientras el scanner este activo
claude CLI no esta instalado o no soporta stream-json Fallback a modo buffered con warning en logs. La feature es opt-in (streaming: false por defecto)