Files
claude_wire/main.go
T
agent 9c7d9705fd feat: scaffold claude_wire — respuesta de claude interceptando el SSE de la red
Obtiene el texto del modelo interceptando el stream SSE de api.anthropic.com
/v1/messages con un mitmproxy, en vez de parsear el render de la terminal.
Dirige la TUI interactiva real (NUNCA claude -p) por el proxy con claude_pipe,
y emite el texto exacto token a token como NDJSON.

Compone tee_anthropic_sse_py_cybersecurity (addon mitmproxy). Corta por
message_stop (sin idle ciego): ~9s vs ~15s de parsear la TUI, y texto exacto
sin artefactos. Validado end-to-end contra claude real.
2026-06-04 00:25:25 +02:00

242 lines
7.0 KiB
Go

// Command claude_wire gets claude's answer by intercepting the model's network
// stream — the SSE response from api.anthropic.com — instead of parsing the
// terminal render. It drives the real interactive claude TUI (never `claude -p`)
// through a mitmproxy that tees the /v1/messages SSE, and emits the exact model
// text token by token as NDJSON.
//
// Why this beats parsing the TUI render:
// - Exact text, byte for byte (no heuristics, no spinner artifacts, no scroll
// truncation).
// - Real token-level streaming (the API's content_block_delta events).
// - No blind idle wait: the message_stop event tells us precisely when the
// answer finished, so there is no 4s idle tail.
// - Stable protocol (Anthropic SSE) instead of a UI that changes between
// claude versions.
//
// Pipeline:
//
// mitmdump + tee_anthropic_sse addon ── captures /v1/messages SSE → NDJSON
// claude_pipe (drives the TUI via PTY) ── sends the prompt, keeps claude alive
// this runner ── reads the proxy NDJSON, emits text_delta
// + result, kills everything on message_stop
//
// claude_wire does NOT use `claude -p`. The TUI is driven exactly as a human would;
// the text is read off the wire, not off the screen.
//
// Output (NDJSON, same shape as claude_pipe --stream):
//
// {"type":"text_delta","text":"..."}
// {"type":"result","subtype":"success","is_error":false,"result":"<full answer>"}
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
)
// wireEvent is one NDJSON line emitted by the tee_anthropic_sse addon.
type wireEvent struct {
Type string `json:"type"`
StreamID int `json:"stream_id"`
Model string `json:"model"`
HasTools bool `json:"has_tools"`
Text string `json:"text"`
StopReason string `json:"stop_reason"`
}
// outEvent is one NDJSON line this runner emits to its own stdout.
type outEvent struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
Subtype string `json:"subtype,omitempty"`
IsError bool `json:"is_error,omitempty"`
Result string `json:"result,omitempty"`
}
func main() {
var (
prompt = arg("--prompt", "")
cwd = arg("--cwd", "/home/enmanuel/fn_registry")
port = arg("--port", "8901")
root = arg("--root", "/home/enmanuel/fn_registry")
addon = arg("--addon", "")
caPath = arg("--ca", os.Getenv("HOME")+"/.mitmproxy/mitmproxy-ca-cert.pem")
pipeBin = arg("--pipe", "/home/enmanuel/fn_registry/apps/claude_pipe/claude_pipe")
warmup = arg("--warmup", "5s")
maxStr = arg("--max", "120s")
)
// The prompt may also be a trailing positional arg.
if prompt == "" {
if p := positional(); p != "" {
prompt = p
}
}
if prompt == "" {
fmt.Fprintln(os.Stderr, "claude_wire: no prompt (use --prompt or a positional arg)")
os.Exit(2)
}
if addon == "" {
addon = filepath.Join(root, "python/functions/cybersecurity/tee_anthropic_sse.py")
}
maxDur, err := time.ParseDuration(maxStr)
if err != nil {
maxDur = 120 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), maxDur)
defer cancel()
// 1. Start mitmdump with the SSE tee addon. FN_WIRE_ONLY_TOOLS isolates the
// main Claude Code response (has_tools) from title/classifier calls.
mitm := exec.CommandContext(ctx, "mitmdump", "-p", port, "-s", addon, "-q")
mitm.Env = append(os.Environ(), "FN_WIRE_ONLY_TOOLS=1")
mitm.Stderr = os.Stderr
ndjson, err := mitm.StdoutPipe()
if err != nil {
fail("mitmdump stdout pipe", err)
}
if err := mitm.Start(); err != nil {
fail("start mitmdump", err)
}
defer kill(mitm)
// 2. Wait for the proxy to listen.
if !waitPort("127.0.0.1:"+port, 10*time.Second) {
fail("proxy did not come up", fmt.Errorf("port %s", port))
}
// 3. Drive the interactive claude TUI through the proxy with claude_pipe. Its
// own parsed output is irrelevant here — we only need it to launch claude
// and type the prompt. A long idle keeps it from cutting before message_stop;
// we kill it as soon as the wire reports the answer is done.
pipe := exec.CommandContext(ctx, pipeBin,
"--cwd", cwd, "--warmup", warmup, "--idle", "30s", "--max", maxStr,
"--format", "text", prompt)
pipe.Env = append(os.Environ(),
"HTTPS_PROXY=http://127.0.0.1:"+port,
"HTTP_PROXY=http://127.0.0.1:"+port,
"NODE_EXTRA_CA_CERTS="+caPath,
"SSL_CERT_FILE="+caPath,
"REQUESTS_CA_BUNDLE="+caPath,
)
pipe.Stdout = nil
pipe.Stderr = nil
if err := pipe.Start(); err != nil {
fail("start claude_pipe", err)
}
defer kill(pipe)
// 4. Read the proxy NDJSON. The addon already filtered to the main stream, so
// we follow the first stream we see and stop at its message_stop.
enc := json.NewEncoder(os.Stdout)
sc := bufio.NewScanner(ndjson)
sc.Buffer(make([]byte, 1024*1024), 1024*1024)
var answer strings.Builder
mainStream := 0
for sc.Scan() {
line := strings.TrimSpace(sc.Text())
if line == "" {
continue
}
var ev wireEvent
if json.Unmarshal([]byte(line), &ev) != nil {
continue
}
switch ev.Type {
case "message_start":
if mainStream == 0 && ev.HasTools {
mainStream = ev.StreamID
}
case "text_delta":
if ev.StreamID == mainStream {
answer.WriteString(ev.Text)
_ = enc.Encode(outEvent{Type: "text_delta", Text: ev.Text})
}
case "message_stop":
if ev.StreamID == mainStream {
_ = enc.Encode(outEvent{
Type: "result",
Subtype: "success",
IsError: answer.Len() == 0,
Result: answer.String(),
})
return // defers kill mitmdump + claude_pipe
}
}
}
// Stream ended without a message_stop (timeout / claude died). Emit whatever
// we have so the consumer is not left hanging.
_ = enc.Encode(outEvent{
Type: "result",
Subtype: "incomplete",
IsError: answer.Len() == 0,
Result: answer.String(),
})
}
// --- tiny flag/util helpers (no external deps) ---
func arg(name, def string) string {
for i, a := range os.Args[1:] {
if a == name && i+2 <= len(os.Args)-1 {
return os.Args[i+2]
}
if strings.HasPrefix(a, name+"=") {
return strings.TrimPrefix(a, name+"=")
}
}
return def
}
// positional returns the last argument if it is not a flag or a flag value.
func positional() string {
args := os.Args[1:]
for i := len(args) - 1; i >= 0; i-- {
a := args[i]
if strings.HasPrefix(a, "--") {
return ""
}
// Skip if this is the value of a preceding flag.
if i > 0 && strings.HasPrefix(args[i-1], "--") && !strings.Contains(args[i-1], "=") {
continue
}
return a
}
return ""
}
func waitPort(addr string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
c, err := net.DialTimeout("tcp", addr, 300*time.Millisecond)
if err == nil {
_ = c.Close()
return true
}
time.Sleep(200 * time.Millisecond)
}
return false
}
func kill(c *exec.Cmd) {
if c != nil && c.Process != nil {
_ = c.Process.Kill()
}
}
func fail(what string, err error) {
fmt.Fprintf(os.Stderr, "claude_wire: %s: %v\n", what, err)
os.Exit(1)
}