// Command claude_session is a long-lived daemon that keeps an interactive claude // TUI session hot and answers prompts fast, intended to be embedded in an app as a // subprocess and driven over stdin/stdout with NDJSON. // // It starts a mitmproxy (with the tee_anthropic_sse addon) and a claude TUI in a // PTY once, then keeps both alive. Each prompt is typed into the live TUI; the // answer is read off the wire (the model's SSE), so it is exact and finishes at // message_stop with no blind idle. Because the TUI stays alive, the first prompt // pays the cold start (~7s) and subsequent prompts only pay generation (~2-3s), // and the conversation keeps context across turns. // // Protocol (NDJSON, one JSON object per line): // // stdin (commands): // {"cmd":"send","prompt":"..."} type a prompt, stream the answer // {"cmd":"restart"} kill+relaunch the TUI (fresh conversation, proxy kept) // {"cmd":"shutdown"} stop everything and exit // stdout (events): // {"type":"ready"} the TUI is ready for a prompt // {"type":"text_delta","text":"..."} // {"type":"result","result":"..."} // {"type":"restarted"} // {"type":"error","message":"..."} // // This daemon NEVER uses `claude -p`; it drives the real interactive TUI. package main import ( "bufio" "encoding/json" "flag" "fmt" "net" "os" "os/exec" "path/filepath" "strings" "sync" "time" "github.com/creack/pty" "fn-registry/functions/tui" ) const ( ptyRows = 40 ptyCols = 120 ) // wireEvent is one NDJSON line emitted by the tee_anthropic_sse addon. type wireEvent struct { Type string `json:"type"` StreamID int `json:"stream_id"` HasTools bool `json:"has_tools"` Text string `json:"text"` StopReason string `json:"stop_reason"` } // cmdIn is one NDJSON command read from stdin. type cmdIn struct { Cmd string `json:"cmd"` Prompt string `json:"prompt"` } // evOut is one NDJSON event written to stdout. type evOut struct { Type string `json:"type"` Text string `json:"text,omitempty"` Result string `json:"result,omitempty"` Message string `json:"message,omitempty"` } type config struct { cwd string port string addon string ca string bin string warmupS time.Duration } type daemon struct { cfg config mitm *exec.Cmd wireCh chan wireEvent ptmx *os.File claude *exec.Cmd rawMu sync.Mutex raw []byte outMu sync.Mutex out *json.Encoder } func main() { cfg := config{} flag.StringVar(&cfg.cwd, "cwd", "/home/enmanuel/fn_registry", "cwd for claude (MCP-approved repo)") flag.StringVar(&cfg.port, "port", "8901", "mitmproxy port") root := flag.String("root", "/home/enmanuel/fn_registry", "registry root (to locate the addon)") flag.StringVar(&cfg.addon, "addon", "", "tee_anthropic_sse addon path") flag.StringVar(&cfg.ca, "ca", os.Getenv("HOME")+"/.mitmproxy/mitmproxy-ca-cert.pem", "mitmproxy CA cert") flag.StringVar(&cfg.bin, "bin", "claude", "claude binary") warmup := flag.Duration("warmup", 12*time.Second, "max wait for the TUI to become ready") flag.Parse() cfg.warmupS = *warmup if cfg.addon == "" { cfg.addon = filepath.Join(*root, "python/functions/cybersecurity/tee_anthropic_sse.py") } d := &daemon{cfg: cfg, out: json.NewEncoder(os.Stdout)} if err := d.startProxy(); err != nil { d.emit(evOut{Type: "error", Message: "start proxy: " + err.Error()}) os.Exit(1) } if err := d.startClaude(); err != nil { d.emit(evOut{Type: "error", Message: "start claude: " + err.Error()}) os.Exit(1) } if !d.waitReady(cfg.warmupS) { d.emit(evOut{Type: "error", Message: "claude TUI did not become ready"}) // keep going — the user can still try; but signal it } d.emit(evOut{Type: "ready"}) d.loop() } // startProxy launches mitmdump with the SSE tee addon and starts a goroutine that // parses its NDJSON stdout into wireCh. FN_WIRE_ONLY_TOOLS isolates the main reply. func (d *daemon) startProxy() error { cmd := exec.Command("mitmdump", "-p", d.cfg.port, "-s", d.cfg.addon, "-q") cmd.Env = append(os.Environ(), "FN_WIRE_ONLY_TOOLS=1") cmd.Stderr = os.Stderr stdout, err := cmd.StdoutPipe() if err != nil { return err } if err := cmd.Start(); err != nil { return err } d.mitm = cmd d.wireCh = make(chan wireEvent, 256) go func() { sc := bufio.NewScanner(stdout) sc.Buffer(make([]byte, 1024*1024), 1024*1024) for sc.Scan() { line := strings.TrimSpace(sc.Text()) if line == "" { continue } var ev wireEvent if json.Unmarshal([]byte(line), &ev) == nil { d.wireCh <- ev } } }() if !waitPort("127.0.0.1:"+d.cfg.port, 10*time.Second) { return fmt.Errorf("proxy did not listen on %s", d.cfg.port) } return nil } // startClaude launches the interactive claude TUI in a PTY, routed through the // proxy, and starts a goroutine that accumulates the raw render into d.raw. func (d *daemon) startClaude() error { cmd := exec.Command(d.cfg.bin) cmd.Dir = d.cfg.cwd cmd.Env = append(os.Environ(), "HTTPS_PROXY=http://127.0.0.1:"+d.cfg.port, "HTTP_PROXY=http://127.0.0.1:"+d.cfg.port, "NODE_EXTRA_CA_CERTS="+d.cfg.ca, "SSL_CERT_FILE="+d.cfg.ca, "REQUESTS_CA_BUNDLE="+d.cfg.ca, ) ptmx, err := pty.Start(cmd) if err != nil { return err } _ = pty.Setsize(ptmx, &pty.Winsize{Rows: ptyRows, Cols: ptyCols}) d.ptmx = ptmx d.claude = cmd go func() { buf := make([]byte, 4096) for { n, err := ptmx.Read(buf) if n > 0 { d.rawMu.Lock() d.raw = append(d.raw, buf[:n]...) // Cap the buffer so it does not grow without bound; keep the tail // (the current screen lives at the end). if len(d.raw) > 256*1024 { d.raw = d.raw[len(d.raw)-128*1024:] } d.rawMu.Unlock() } if err != nil { return } } }() return nil } // screen renders the current PTY buffer to a 2D screen via vt_render. func (d *daemon) screen() string { d.rawMu.Lock() raw := string(d.raw) d.rawMu.Unlock() return tui.VTRender(raw, ptyRows, ptyCols) } // isReady reports whether the TUI shows an idle input box (ready for a prompt): // the prompt marker is present and no generation spinner is active. func isReady(screen string) bool { if screen == "" { return false } if strings.Contains(screen, "esc to interrupt") { return false // generating } return strings.Contains(screen, "❯") } // waitReady polls the rendered screen until the input box is idle or timeout. func (d *daemon) waitReady(timeout time.Duration) bool { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { if isReady(d.screen()) { // small settle so the box is fully drawn time.Sleep(250 * time.Millisecond) return true } time.Sleep(200 * time.Millisecond) } return isReady(d.screen()) } func (d *daemon) emit(ev evOut) { d.outMu.Lock() _ = d.out.Encode(ev) d.outMu.Unlock() } // drainWire empties any buffered wire events (residue from a previous turn). func (d *daemon) drainWire() { for { select { case <-d.wireCh: default: return } } } // handleSend types the prompt into the live TUI and streams the answer read from // the wire until message_stop. func (d *daemon) handleSend(prompt string) { if prompt == "" { d.emit(evOut{Type: "error", Message: "empty prompt"}) return } d.drainWire() // Type the prompt, settle, then Enter (a glued \r is treated as a newline). _, _ = d.ptmx.Write([]byte(prompt)) time.Sleep(450 * time.Millisecond) _, _ = d.ptmx.Write([]byte("\r")) // Follow the next has_tools stream to its message_stop. mainStream := 0 var answer strings.Builder timeout := time.After(120 * time.Second) for { select { case ev := <-d.wireCh: switch ev.Type { case "message_start": if mainStream == 0 && ev.HasTools { mainStream = ev.StreamID } case "text_delta": if ev.StreamID == mainStream { answer.WriteString(ev.Text) d.emit(evOut{Type: "text_delta", Text: ev.Text}) } case "message_stop": if ev.StreamID == mainStream { d.emit(evOut{Type: "result", Result: answer.String()}) // Let the TUI redraw the idle input box before the next prompt. d.waitReady(8 * time.Second) d.emit(evOut{Type: "ready"}) return } } case <-timeout: d.emit(evOut{Type: "error", Message: "timeout waiting for answer"}) d.emit(evOut{Type: "ready"}) return } } } // handleRestart kills the claude TUI and relaunches it (fresh conversation). The // proxy is kept alive. func (d *daemon) handleRestart() { if d.claude != nil && d.claude.Process != nil { _ = d.claude.Process.Kill() _, _ = d.claude.Process.Wait() } if d.ptmx != nil { _ = d.ptmx.Close() } d.rawMu.Lock() d.raw = nil d.rawMu.Unlock() if err := d.startClaude(); err != nil { d.emit(evOut{Type: "error", Message: "restart: " + err.Error()}) return } d.waitReady(d.cfg.warmupS) d.emit(evOut{Type: "restarted"}) d.emit(evOut{Type: "ready"}) } func (d *daemon) shutdown() { if d.claude != nil && d.claude.Process != nil { _ = d.claude.Process.Kill() } if d.mitm != nil && d.mitm.Process != nil { _ = d.mitm.Process.Kill() } } // loop reads NDJSON commands from stdin and dispatches them. func (d *daemon) loop() { sc := bufio.NewScanner(os.Stdin) sc.Buffer(make([]byte, 1024*1024), 1024*1024) for sc.Scan() { line := strings.TrimSpace(sc.Text()) if line == "" { continue } var c cmdIn if json.Unmarshal([]byte(line), &c) != nil { d.emit(evOut{Type: "error", Message: "bad command json"}) continue } switch c.Cmd { case "send": d.handleSend(c.Prompt) case "restart": d.handleRestart() case "shutdown": d.shutdown() return default: d.emit(evOut{Type: "error", Message: "unknown cmd: " + c.Cmd}) } } d.shutdown() } func waitPort(addr string, timeout time.Duration) bool { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { c, err := net.DialTimeout("tcp", addr, 300*time.Millisecond) if err == nil { _ = c.Close() return true } time.Sleep(200 * time.Millisecond) } return false }