feat: scaffold claude_session — daemon de sesion claude caliente (NDJSON)

Daemon de larga vida que mantiene una TUI claude interactiva viva y responde
prompts en ~2.7s, embebible como subproceso via NDJSON por stdin/stdout.

Arranca mitmproxy (addon tee_anthropic_sse) + claude TUI en PTY (creack/pty
directo, persistente) una vez. Cada prompt se teclea en la TUI viva; la
respuesta se lee del SSE de la red (exacta, corta en message_stop). El cold
start (~7s) se paga una vez; los siguientes mensajes ~2.7s, con memoria entre
turnos. Protocolo: send/restart/shutdown -> ready/text_delta/result/restarted.

Validado: 2.7s por mensaje en caliente (vs 15s parseando TUI, vs 9s one-shot),
restart relanza la conversacion. Reusa tee_anthropic_sse_py_cybersecurity +
vt_render_go_tui.
This commit is contained in:
agent
2026-06-04 00:40:29 +02:00
commit d9982d853d
6 changed files with 632 additions and 0 deletions
+381
View File
@@ -0,0 +1,381 @@
// Command claude_session is a long-lived daemon that keeps an interactive claude
// TUI session hot and answers prompts fast, intended to be embedded in an app as a
// subprocess and driven over stdin/stdout with NDJSON.
//
// It starts a mitmproxy (with the tee_anthropic_sse addon) and a claude TUI in a
// PTY once, then keeps both alive. Each prompt is typed into the live TUI; the
// answer is read off the wire (the model's SSE), so it is exact and finishes at
// message_stop with no blind idle. Because the TUI stays alive, the first prompt
// pays the cold start (~7s) and subsequent prompts only pay generation (~2-3s),
// and the conversation keeps context across turns.
//
// Protocol (NDJSON, one JSON object per line):
//
// stdin (commands):
// {"cmd":"send","prompt":"..."} type a prompt, stream the answer
// {"cmd":"restart"} kill+relaunch the TUI (fresh conversation, proxy kept)
// {"cmd":"shutdown"} stop everything and exit
// stdout (events):
// {"type":"ready"} the TUI is ready for a prompt
// {"type":"text_delta","text":"..."}
// {"type":"result","result":"..."}
// {"type":"restarted"}
// {"type":"error","message":"..."}
//
// This daemon NEVER uses `claude -p`; it drives the real interactive TUI.
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/creack/pty"
"fn-registry/functions/tui"
)
const (
ptyRows = 40
ptyCols = 120
)
// wireEvent is one NDJSON line emitted by the tee_anthropic_sse addon.
type wireEvent struct {
Type string `json:"type"`
StreamID int `json:"stream_id"`
HasTools bool `json:"has_tools"`
Text string `json:"text"`
StopReason string `json:"stop_reason"`
}
// cmdIn is one NDJSON command read from stdin.
type cmdIn struct {
Cmd string `json:"cmd"`
Prompt string `json:"prompt"`
}
// evOut is one NDJSON event written to stdout.
type evOut struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
Result string `json:"result,omitempty"`
Message string `json:"message,omitempty"`
}
type config struct {
cwd string
port string
addon string
ca string
bin string
warmupS time.Duration
}
type daemon struct {
cfg config
mitm *exec.Cmd
wireCh chan wireEvent
ptmx *os.File
claude *exec.Cmd
rawMu sync.Mutex
raw []byte
outMu sync.Mutex
out *json.Encoder
}
func main() {
cfg := config{}
flag.StringVar(&cfg.cwd, "cwd", "/home/enmanuel/fn_registry", "cwd for claude (MCP-approved repo)")
flag.StringVar(&cfg.port, "port", "8901", "mitmproxy port")
root := flag.String("root", "/home/enmanuel/fn_registry", "registry root (to locate the addon)")
flag.StringVar(&cfg.addon, "addon", "", "tee_anthropic_sse addon path")
flag.StringVar(&cfg.ca, "ca", os.Getenv("HOME")+"/.mitmproxy/mitmproxy-ca-cert.pem", "mitmproxy CA cert")
flag.StringVar(&cfg.bin, "bin", "claude", "claude binary")
warmup := flag.Duration("warmup", 12*time.Second, "max wait for the TUI to become ready")
flag.Parse()
cfg.warmupS = *warmup
if cfg.addon == "" {
cfg.addon = filepath.Join(*root, "python/functions/cybersecurity/tee_anthropic_sse.py")
}
d := &daemon{cfg: cfg, out: json.NewEncoder(os.Stdout)}
if err := d.startProxy(); err != nil {
d.emit(evOut{Type: "error", Message: "start proxy: " + err.Error()})
os.Exit(1)
}
if err := d.startClaude(); err != nil {
d.emit(evOut{Type: "error", Message: "start claude: " + err.Error()})
os.Exit(1)
}
if !d.waitReady(cfg.warmupS) {
d.emit(evOut{Type: "error", Message: "claude TUI did not become ready"})
// keep going — the user can still try; but signal it
}
d.emit(evOut{Type: "ready"})
d.loop()
}
// startProxy launches mitmdump with the SSE tee addon and starts a goroutine that
// parses its NDJSON stdout into wireCh. FN_WIRE_ONLY_TOOLS isolates the main reply.
func (d *daemon) startProxy() error {
cmd := exec.Command("mitmdump", "-p", d.cfg.port, "-s", d.cfg.addon, "-q")
cmd.Env = append(os.Environ(), "FN_WIRE_ONLY_TOOLS=1")
cmd.Stderr = os.Stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
if err := cmd.Start(); err != nil {
return err
}
d.mitm = cmd
d.wireCh = make(chan wireEvent, 256)
go func() {
sc := bufio.NewScanner(stdout)
sc.Buffer(make([]byte, 1024*1024), 1024*1024)
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
if line == "" {
continue
}
var ev wireEvent
if json.Unmarshal([]byte(line), &ev) == nil {
d.wireCh <- ev
}
}
}()
if !waitPort("127.0.0.1:"+d.cfg.port, 10*time.Second) {
return fmt.Errorf("proxy did not listen on %s", d.cfg.port)
}
return nil
}
// startClaude launches the interactive claude TUI in a PTY, routed through the
// proxy, and starts a goroutine that accumulates the raw render into d.raw.
func (d *daemon) startClaude() error {
cmd := exec.Command(d.cfg.bin)
cmd.Dir = d.cfg.cwd
cmd.Env = append(os.Environ(),
"HTTPS_PROXY=http://127.0.0.1:"+d.cfg.port,
"HTTP_PROXY=http://127.0.0.1:"+d.cfg.port,
"NODE_EXTRA_CA_CERTS="+d.cfg.ca,
"SSL_CERT_FILE="+d.cfg.ca,
"REQUESTS_CA_BUNDLE="+d.cfg.ca,
)
ptmx, err := pty.Start(cmd)
if err != nil {
return err
}
_ = pty.Setsize(ptmx, &pty.Winsize{Rows: ptyRows, Cols: ptyCols})
d.ptmx = ptmx
d.claude = cmd
go func() {
buf := make([]byte, 4096)
for {
n, err := ptmx.Read(buf)
if n > 0 {
d.rawMu.Lock()
d.raw = append(d.raw, buf[:n]...)
// Cap the buffer so it does not grow without bound; keep the tail
// (the current screen lives at the end).
if len(d.raw) > 256*1024 {
d.raw = d.raw[len(d.raw)-128*1024:]
}
d.rawMu.Unlock()
}
if err != nil {
return
}
}
}()
return nil
}
// screen renders the current PTY buffer to a 2D screen via vt_render.
func (d *daemon) screen() string {
d.rawMu.Lock()
raw := string(d.raw)
d.rawMu.Unlock()
return tui.VTRender(raw, ptyRows, ptyCols)
}
// isReady reports whether the TUI shows an idle input box (ready for a prompt):
// the prompt marker is present and no generation spinner is active.
func isReady(screen string) bool {
if screen == "" {
return false
}
if strings.Contains(screen, "esc to interrupt") {
return false // generating
}
return strings.Contains(screen, "")
}
// waitReady polls the rendered screen until the input box is idle or timeout.
func (d *daemon) waitReady(timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if isReady(d.screen()) {
// small settle so the box is fully drawn
time.Sleep(250 * time.Millisecond)
return true
}
time.Sleep(200 * time.Millisecond)
}
return isReady(d.screen())
}
func (d *daemon) emit(ev evOut) {
d.outMu.Lock()
_ = d.out.Encode(ev)
d.outMu.Unlock()
}
// drainWire empties any buffered wire events (residue from a previous turn).
func (d *daemon) drainWire() {
for {
select {
case <-d.wireCh:
default:
return
}
}
}
// handleSend types the prompt into the live TUI and streams the answer read from
// the wire until message_stop.
func (d *daemon) handleSend(prompt string) {
if prompt == "" {
d.emit(evOut{Type: "error", Message: "empty prompt"})
return
}
d.drainWire()
// Type the prompt, settle, then Enter (a glued \r is treated as a newline).
_, _ = d.ptmx.Write([]byte(prompt))
time.Sleep(450 * time.Millisecond)
_, _ = d.ptmx.Write([]byte("\r"))
// Follow the next has_tools stream to its message_stop.
mainStream := 0
var answer strings.Builder
timeout := time.After(120 * time.Second)
for {
select {
case ev := <-d.wireCh:
switch ev.Type {
case "message_start":
if mainStream == 0 && ev.HasTools {
mainStream = ev.StreamID
}
case "text_delta":
if ev.StreamID == mainStream {
answer.WriteString(ev.Text)
d.emit(evOut{Type: "text_delta", Text: ev.Text})
}
case "message_stop":
if ev.StreamID == mainStream {
d.emit(evOut{Type: "result", Result: answer.String()})
// Let the TUI redraw the idle input box before the next prompt.
d.waitReady(8 * time.Second)
d.emit(evOut{Type: "ready"})
return
}
}
case <-timeout:
d.emit(evOut{Type: "error", Message: "timeout waiting for answer"})
d.emit(evOut{Type: "ready"})
return
}
}
}
// handleRestart kills the claude TUI and relaunches it (fresh conversation). The
// proxy is kept alive.
func (d *daemon) handleRestart() {
if d.claude != nil && d.claude.Process != nil {
_ = d.claude.Process.Kill()
_, _ = d.claude.Process.Wait()
}
if d.ptmx != nil {
_ = d.ptmx.Close()
}
d.rawMu.Lock()
d.raw = nil
d.rawMu.Unlock()
if err := d.startClaude(); err != nil {
d.emit(evOut{Type: "error", Message: "restart: " + err.Error()})
return
}
d.waitReady(d.cfg.warmupS)
d.emit(evOut{Type: "restarted"})
d.emit(evOut{Type: "ready"})
}
func (d *daemon) shutdown() {
if d.claude != nil && d.claude.Process != nil {
_ = d.claude.Process.Kill()
}
if d.mitm != nil && d.mitm.Process != nil {
_ = d.mitm.Process.Kill()
}
}
// loop reads NDJSON commands from stdin and dispatches them.
func (d *daemon) loop() {
sc := bufio.NewScanner(os.Stdin)
sc.Buffer(make([]byte, 1024*1024), 1024*1024)
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
if line == "" {
continue
}
var c cmdIn
if json.Unmarshal([]byte(line), &c) != nil {
d.emit(evOut{Type: "error", Message: "bad command json"})
continue
}
switch c.Cmd {
case "send":
d.handleSend(c.Prompt)
case "restart":
d.handleRestart()
case "shutdown":
d.shutdown()
return
default:
d.emit(evOut{Type: "error", Message: "unknown cmd: " + c.Cmd})
}
}
d.shutdown()
}
func waitPort(addr string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
c, err := net.DialTimeout("tcp", addr, 300*time.Millisecond)
if err == nil {
_ = c.Close()
return true
}
time.Sleep(200 * time.Millisecond)
}
return false
}