package effects import ( "context" "fmt" "log/slog" "sync" "time" coretypes "github.com/enmanuel/agents/pkg/llm" ) // ProgressReporter sends real-time progress updates to a Matrix room // by editing a single "status" message as the claude-code subprocess // emits streaming events (tool_use, text, result). // // It rate-limits edits to at most one per second to avoid flooding the // homeserver. type ProgressReporter struct { sender MatrixSender roomID string logger *slog.Logger mu sync.Mutex eventID string // Matrix event ID of the progress message (empty until first send) lastEdit time.Time // timestamp of last edit, for rate limiting minInterval time.Duration } // NewProgressReporter creates a ProgressReporter that sends progress updates // to the given room. The progress message is created lazily on the first event. func NewProgressReporter(sender MatrixSender, roomID string, logger *slog.Logger) *ProgressReporter { return &ProgressReporter{ sender: sender, roomID: roomID, logger: logger, minInterval: time.Second, // max 1 edit/second } } // StreamFunc returns a StreamFunc callback suitable for passing to // CompletionRequest.StreamFunc. It captures streaming events and updates // the progress message in the Matrix room. func (p *ProgressReporter) StreamFunc() coretypes.StreamFunc { return func(evt coretypes.StreamEvent) { p.handleEvent(evt) } } // handleEvent processes a single streaming event and updates the Matrix message. func (p *ProgressReporter) handleEvent(evt coretypes.StreamEvent) { var markdown string switch evt.Kind { case coretypes.StreamToolUse: // Show which tool is being used input := evt.ToolInput if len(input) > 60 { input = input[:57] + "..." } if input != "" { markdown = fmt.Sprintf("\U0001f527 *%s*: `%s`", evt.ToolName, input) } else { markdown = fmt.Sprintf("\U0001f527 *%s*", evt.ToolName) } case coretypes.StreamResult: // Final result — no need to update progress; the handler will send the actual reply return case coretypes.StreamText: // Intermediate text — could be partial thinking, skip to avoid noise return case coretypes.StreamInit: markdown = "\u2699\ufe0f *Procesando...*" default: return } if markdown == "" { return } p.updateMessage(markdown) } // updateMessage sends or edits the progress message, respecting rate limits. func (p *ProgressReporter) updateMessage(markdown string) { p.mu.Lock() defer p.mu.Unlock() ctx := context.Background() // Rate limit: skip if we edited less than minInterval ago if p.eventID != "" && time.Since(p.lastEdit) < p.minInterval { return } if p.eventID == "" { // First message: send a new one and capture the event ID evtID, err := p.sender.SendMarkdownGetID(ctx, p.roomID, markdown) if err != nil { p.logger.Warn("progress_reporter: failed to send initial message", "err", err) return } p.eventID = evtID p.lastEdit = time.Now() return } // Subsequent updates: edit the existing message if err := p.sender.EditMessage(ctx, p.roomID, p.eventID, markdown); err != nil { p.logger.Warn("progress_reporter: failed to edit message", "err", err) return } p.lastEdit = time.Now() } // Finalize edits the progress message with the final content, or deletes it. // Call this after the LLM response is ready. If finalMarkdown is empty, the // progress message is left as-is (the handler will send a separate reply). func (p *ProgressReporter) Finalize(finalMarkdown string) { p.mu.Lock() defer p.mu.Unlock() if p.eventID == "" || finalMarkdown == "" { return } ctx := context.Background() if err := p.sender.EditMessage(ctx, p.roomID, p.eventID, finalMarkdown); err != nil { p.logger.Warn("progress_reporter: failed to finalize message", "err", err) } } // EventID returns the Matrix event ID of the progress message, or empty if // no message was sent yet. func (p *ProgressReporter) EventID() string { p.mu.Lock() defer p.mu.Unlock() return p.eventID }