Files
agent d9982d853d feat: scaffold claude_session — daemon de sesion claude caliente (NDJSON)
Daemon de larga vida que mantiene una TUI claude interactiva viva y responde
prompts en ~2.7s, embebible como subproceso via NDJSON por stdin/stdout.

Arranca mitmproxy (addon tee_anthropic_sse) + claude TUI en PTY (creack/pty
directo, persistente) una vez. Cada prompt se teclea en la TUI viva; la
respuesta se lee del SSE de la red (exacta, corta en message_stop). El cold
start (~7s) se paga una vez; los siguientes mensajes ~2.7s, con memoria entre
turnos. Protocolo: send/restart/shutdown -> ready/text_delta/result/restarted.

Validado: 2.7s por mensaje en caliente (vs 15s parseando TUI, vs 9s one-shot),
restart relanza la conversacion. Reusa tee_anthropic_sse_py_cybersecurity +
vt_render_go_tui.
2026-06-04 00:40:29 +02:00

382 lines
9.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Command claude_session is a long-lived daemon that keeps an interactive claude
// TUI session hot and answers prompts fast, intended to be embedded in an app as a
// subprocess and driven over stdin/stdout with NDJSON.
//
// It starts a mitmproxy (with the tee_anthropic_sse addon) and a claude TUI in a
// PTY once, then keeps both alive. Each prompt is typed into the live TUI; the
// answer is read off the wire (the model's SSE), so it is exact and finishes at
// message_stop with no blind idle. Because the TUI stays alive, the first prompt
// pays the cold start (~7s) and subsequent prompts only pay generation (~2-3s),
// and the conversation keeps context across turns.
//
// Protocol (NDJSON, one JSON object per line):
//
// stdin (commands):
// {"cmd":"send","prompt":"..."} type a prompt, stream the answer
// {"cmd":"restart"} kill+relaunch the TUI (fresh conversation, proxy kept)
// {"cmd":"shutdown"} stop everything and exit
// stdout (events):
// {"type":"ready"} the TUI is ready for a prompt
// {"type":"text_delta","text":"..."}
// {"type":"result","result":"..."}
// {"type":"restarted"}
// {"type":"error","message":"..."}
//
// This daemon NEVER uses `claude -p`; it drives the real interactive TUI.
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/creack/pty"
"fn-registry/functions/tui"
)
const (
ptyRows = 40
ptyCols = 120
)
// wireEvent is one NDJSON line emitted by the tee_anthropic_sse addon.
type wireEvent struct {
Type string `json:"type"`
StreamID int `json:"stream_id"`
HasTools bool `json:"has_tools"`
Text string `json:"text"`
StopReason string `json:"stop_reason"`
}
// cmdIn is one NDJSON command read from stdin.
type cmdIn struct {
Cmd string `json:"cmd"`
Prompt string `json:"prompt"`
}
// evOut is one NDJSON event written to stdout.
type evOut struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
Result string `json:"result,omitempty"`
Message string `json:"message,omitempty"`
}
type config struct {
cwd string
port string
addon string
ca string
bin string
warmupS time.Duration
}
type daemon struct {
cfg config
mitm *exec.Cmd
wireCh chan wireEvent
ptmx *os.File
claude *exec.Cmd
rawMu sync.Mutex
raw []byte
outMu sync.Mutex
out *json.Encoder
}
func main() {
cfg := config{}
flag.StringVar(&cfg.cwd, "cwd", "/home/enmanuel/fn_registry", "cwd for claude (MCP-approved repo)")
flag.StringVar(&cfg.port, "port", "8901", "mitmproxy port")
root := flag.String("root", "/home/enmanuel/fn_registry", "registry root (to locate the addon)")
flag.StringVar(&cfg.addon, "addon", "", "tee_anthropic_sse addon path")
flag.StringVar(&cfg.ca, "ca", os.Getenv("HOME")+"/.mitmproxy/mitmproxy-ca-cert.pem", "mitmproxy CA cert")
flag.StringVar(&cfg.bin, "bin", "claude", "claude binary")
warmup := flag.Duration("warmup", 12*time.Second, "max wait for the TUI to become ready")
flag.Parse()
cfg.warmupS = *warmup
if cfg.addon == "" {
cfg.addon = filepath.Join(*root, "python/functions/cybersecurity/tee_anthropic_sse.py")
}
d := &daemon{cfg: cfg, out: json.NewEncoder(os.Stdout)}
if err := d.startProxy(); err != nil {
d.emit(evOut{Type: "error", Message: "start proxy: " + err.Error()})
os.Exit(1)
}
if err := d.startClaude(); err != nil {
d.emit(evOut{Type: "error", Message: "start claude: " + err.Error()})
os.Exit(1)
}
if !d.waitReady(cfg.warmupS) {
d.emit(evOut{Type: "error", Message: "claude TUI did not become ready"})
// keep going — the user can still try; but signal it
}
d.emit(evOut{Type: "ready"})
d.loop()
}
// startProxy launches mitmdump with the SSE tee addon and starts a goroutine that
// parses its NDJSON stdout into wireCh. FN_WIRE_ONLY_TOOLS isolates the main reply.
func (d *daemon) startProxy() error {
cmd := exec.Command("mitmdump", "-p", d.cfg.port, "-s", d.cfg.addon, "-q")
cmd.Env = append(os.Environ(), "FN_WIRE_ONLY_TOOLS=1")
cmd.Stderr = os.Stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
if err := cmd.Start(); err != nil {
return err
}
d.mitm = cmd
d.wireCh = make(chan wireEvent, 256)
go func() {
sc := bufio.NewScanner(stdout)
sc.Buffer(make([]byte, 1024*1024), 1024*1024)
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
if line == "" {
continue
}
var ev wireEvent
if json.Unmarshal([]byte(line), &ev) == nil {
d.wireCh <- ev
}
}
}()
if !waitPort("127.0.0.1:"+d.cfg.port, 10*time.Second) {
return fmt.Errorf("proxy did not listen on %s", d.cfg.port)
}
return nil
}
// startClaude launches the interactive claude TUI in a PTY, routed through the
// proxy, and starts a goroutine that accumulates the raw render into d.raw.
func (d *daemon) startClaude() error {
cmd := exec.Command(d.cfg.bin)
cmd.Dir = d.cfg.cwd
cmd.Env = append(os.Environ(),
"HTTPS_PROXY=http://127.0.0.1:"+d.cfg.port,
"HTTP_PROXY=http://127.0.0.1:"+d.cfg.port,
"NODE_EXTRA_CA_CERTS="+d.cfg.ca,
"SSL_CERT_FILE="+d.cfg.ca,
"REQUESTS_CA_BUNDLE="+d.cfg.ca,
)
ptmx, err := pty.Start(cmd)
if err != nil {
return err
}
_ = pty.Setsize(ptmx, &pty.Winsize{Rows: ptyRows, Cols: ptyCols})
d.ptmx = ptmx
d.claude = cmd
go func() {
buf := make([]byte, 4096)
for {
n, err := ptmx.Read(buf)
if n > 0 {
d.rawMu.Lock()
d.raw = append(d.raw, buf[:n]...)
// Cap the buffer so it does not grow without bound; keep the tail
// (the current screen lives at the end).
if len(d.raw) > 256*1024 {
d.raw = d.raw[len(d.raw)-128*1024:]
}
d.rawMu.Unlock()
}
if err != nil {
return
}
}
}()
return nil
}
// screen renders the current PTY buffer to a 2D screen via vt_render.
func (d *daemon) screen() string {
d.rawMu.Lock()
raw := string(d.raw)
d.rawMu.Unlock()
return tui.VTRender(raw, ptyRows, ptyCols)
}
// isReady reports whether the TUI shows an idle input box (ready for a prompt):
// the prompt marker is present and no generation spinner is active.
func isReady(screen string) bool {
if screen == "" {
return false
}
if strings.Contains(screen, "esc to interrupt") {
return false // generating
}
return strings.Contains(screen, "")
}
// waitReady polls the rendered screen until the input box is idle or timeout.
func (d *daemon) waitReady(timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if isReady(d.screen()) {
// small settle so the box is fully drawn
time.Sleep(250 * time.Millisecond)
return true
}
time.Sleep(200 * time.Millisecond)
}
return isReady(d.screen())
}
func (d *daemon) emit(ev evOut) {
d.outMu.Lock()
_ = d.out.Encode(ev)
d.outMu.Unlock()
}
// drainWire empties any buffered wire events (residue from a previous turn).
func (d *daemon) drainWire() {
for {
select {
case <-d.wireCh:
default:
return
}
}
}
// handleSend types the prompt into the live TUI and streams the answer read from
// the wire until message_stop.
func (d *daemon) handleSend(prompt string) {
if prompt == "" {
d.emit(evOut{Type: "error", Message: "empty prompt"})
return
}
d.drainWire()
// Type the prompt, settle, then Enter (a glued \r is treated as a newline).
_, _ = d.ptmx.Write([]byte(prompt))
time.Sleep(450 * time.Millisecond)
_, _ = d.ptmx.Write([]byte("\r"))
// Follow the next has_tools stream to its message_stop.
mainStream := 0
var answer strings.Builder
timeout := time.After(120 * time.Second)
for {
select {
case ev := <-d.wireCh:
switch ev.Type {
case "message_start":
if mainStream == 0 && ev.HasTools {
mainStream = ev.StreamID
}
case "text_delta":
if ev.StreamID == mainStream {
answer.WriteString(ev.Text)
d.emit(evOut{Type: "text_delta", Text: ev.Text})
}
case "message_stop":
if ev.StreamID == mainStream {
d.emit(evOut{Type: "result", Result: answer.String()})
// Let the TUI redraw the idle input box before the next prompt.
d.waitReady(8 * time.Second)
d.emit(evOut{Type: "ready"})
return
}
}
case <-timeout:
d.emit(evOut{Type: "error", Message: "timeout waiting for answer"})
d.emit(evOut{Type: "ready"})
return
}
}
}
// handleRestart kills the claude TUI and relaunches it (fresh conversation). The
// proxy is kept alive.
func (d *daemon) handleRestart() {
if d.claude != nil && d.claude.Process != nil {
_ = d.claude.Process.Kill()
_, _ = d.claude.Process.Wait()
}
if d.ptmx != nil {
_ = d.ptmx.Close()
}
d.rawMu.Lock()
d.raw = nil
d.rawMu.Unlock()
if err := d.startClaude(); err != nil {
d.emit(evOut{Type: "error", Message: "restart: " + err.Error()})
return
}
d.waitReady(d.cfg.warmupS)
d.emit(evOut{Type: "restarted"})
d.emit(evOut{Type: "ready"})
}
func (d *daemon) shutdown() {
if d.claude != nil && d.claude.Process != nil {
_ = d.claude.Process.Kill()
}
if d.mitm != nil && d.mitm.Process != nil {
_ = d.mitm.Process.Kill()
}
}
// loop reads NDJSON commands from stdin and dispatches them.
func (d *daemon) loop() {
sc := bufio.NewScanner(os.Stdin)
sc.Buffer(make([]byte, 1024*1024), 1024*1024)
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
if line == "" {
continue
}
var c cmdIn
if json.Unmarshal([]byte(line), &c) != nil {
d.emit(evOut{Type: "error", Message: "bad command json"})
continue
}
switch c.Cmd {
case "send":
d.handleSend(c.Prompt)
case "restart":
d.handleRestart()
case "shutdown":
d.shutdown()
return
default:
d.emit(evOut{Type: "error", Message: "unknown cmd: " + c.Cmd})
}
}
d.shutdown()
}
func waitPort(addr string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
c, err := net.DialTimeout("tcp", addr, 300*time.Millisecond)
if err == nil {
_ = c.Close()
return true
}
time.Sleep(200 * time.Millisecond)
}
return false
}