9c7d9705fd
Obtiene el texto del modelo interceptando el stream SSE de api.anthropic.com /v1/messages con un mitmproxy, en vez de parsear el render de la terminal. Dirige la TUI interactiva real (NUNCA claude -p) por el proxy con claude_pipe, y emite el texto exacto token a token como NDJSON. Compone tee_anthropic_sse_py_cybersecurity (addon mitmproxy). Corta por message_stop (sin idle ciego): ~9s vs ~15s de parsear la TUI, y texto exacto sin artefactos. Validado end-to-end contra claude real.
242 lines
7.0 KiB
Go
242 lines
7.0 KiB
Go
// Command claude_wire gets claude's answer by intercepting the model's network
|
|
// stream — the SSE response from api.anthropic.com — instead of parsing the
|
|
// terminal render. It drives the real interactive claude TUI (never `claude -p`)
|
|
// through a mitmproxy that tees the /v1/messages SSE, and emits the exact model
|
|
// text token by token as NDJSON.
|
|
//
|
|
// Why this beats parsing the TUI render:
|
|
// - Exact text, byte for byte (no heuristics, no spinner artifacts, no scroll
|
|
// truncation).
|
|
// - Real token-level streaming (the API's content_block_delta events).
|
|
// - No blind idle wait: the message_stop event tells us precisely when the
|
|
// answer finished, so there is no 4s idle tail.
|
|
// - Stable protocol (Anthropic SSE) instead of a UI that changes between
|
|
// claude versions.
|
|
//
|
|
// Pipeline:
|
|
//
|
|
// mitmdump + tee_anthropic_sse addon ── captures /v1/messages SSE → NDJSON
|
|
// claude_pipe (drives the TUI via PTY) ── sends the prompt, keeps claude alive
|
|
// this runner ── reads the proxy NDJSON, emits text_delta
|
|
// + result, kills everything on message_stop
|
|
//
|
|
// claude_wire does NOT use `claude -p`. The TUI is driven exactly as a human would;
|
|
// the text is read off the wire, not off the screen.
|
|
//
|
|
// Output (NDJSON, same shape as claude_pipe --stream):
|
|
//
|
|
// {"type":"text_delta","text":"..."}
|
|
// {"type":"result","subtype":"success","is_error":false,"result":"<full answer>"}
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// wireEvent is one NDJSON line emitted by the tee_anthropic_sse addon.
|
|
type wireEvent struct {
|
|
Type string `json:"type"`
|
|
StreamID int `json:"stream_id"`
|
|
Model string `json:"model"`
|
|
HasTools bool `json:"has_tools"`
|
|
Text string `json:"text"`
|
|
StopReason string `json:"stop_reason"`
|
|
}
|
|
|
|
// outEvent is one NDJSON line this runner emits to its own stdout.
|
|
type outEvent struct {
|
|
Type string `json:"type"`
|
|
Text string `json:"text,omitempty"`
|
|
Subtype string `json:"subtype,omitempty"`
|
|
IsError bool `json:"is_error,omitempty"`
|
|
Result string `json:"result,omitempty"`
|
|
}
|
|
|
|
func main() {
|
|
var (
|
|
prompt = arg("--prompt", "")
|
|
cwd = arg("--cwd", "/home/enmanuel/fn_registry")
|
|
port = arg("--port", "8901")
|
|
root = arg("--root", "/home/enmanuel/fn_registry")
|
|
addon = arg("--addon", "")
|
|
caPath = arg("--ca", os.Getenv("HOME")+"/.mitmproxy/mitmproxy-ca-cert.pem")
|
|
pipeBin = arg("--pipe", "/home/enmanuel/fn_registry/apps/claude_pipe/claude_pipe")
|
|
warmup = arg("--warmup", "5s")
|
|
maxStr = arg("--max", "120s")
|
|
)
|
|
// The prompt may also be a trailing positional arg.
|
|
if prompt == "" {
|
|
if p := positional(); p != "" {
|
|
prompt = p
|
|
}
|
|
}
|
|
if prompt == "" {
|
|
fmt.Fprintln(os.Stderr, "claude_wire: no prompt (use --prompt or a positional arg)")
|
|
os.Exit(2)
|
|
}
|
|
if addon == "" {
|
|
addon = filepath.Join(root, "python/functions/cybersecurity/tee_anthropic_sse.py")
|
|
}
|
|
|
|
maxDur, err := time.ParseDuration(maxStr)
|
|
if err != nil {
|
|
maxDur = 120 * time.Second
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), maxDur)
|
|
defer cancel()
|
|
|
|
// 1. Start mitmdump with the SSE tee addon. FN_WIRE_ONLY_TOOLS isolates the
|
|
// main Claude Code response (has_tools) from title/classifier calls.
|
|
mitm := exec.CommandContext(ctx, "mitmdump", "-p", port, "-s", addon, "-q")
|
|
mitm.Env = append(os.Environ(), "FN_WIRE_ONLY_TOOLS=1")
|
|
mitm.Stderr = os.Stderr
|
|
ndjson, err := mitm.StdoutPipe()
|
|
if err != nil {
|
|
fail("mitmdump stdout pipe", err)
|
|
}
|
|
if err := mitm.Start(); err != nil {
|
|
fail("start mitmdump", err)
|
|
}
|
|
defer kill(mitm)
|
|
|
|
// 2. Wait for the proxy to listen.
|
|
if !waitPort("127.0.0.1:"+port, 10*time.Second) {
|
|
fail("proxy did not come up", fmt.Errorf("port %s", port))
|
|
}
|
|
|
|
// 3. Drive the interactive claude TUI through the proxy with claude_pipe. Its
|
|
// own parsed output is irrelevant here — we only need it to launch claude
|
|
// and type the prompt. A long idle keeps it from cutting before message_stop;
|
|
// we kill it as soon as the wire reports the answer is done.
|
|
pipe := exec.CommandContext(ctx, pipeBin,
|
|
"--cwd", cwd, "--warmup", warmup, "--idle", "30s", "--max", maxStr,
|
|
"--format", "text", prompt)
|
|
pipe.Env = append(os.Environ(),
|
|
"HTTPS_PROXY=http://127.0.0.1:"+port,
|
|
"HTTP_PROXY=http://127.0.0.1:"+port,
|
|
"NODE_EXTRA_CA_CERTS="+caPath,
|
|
"SSL_CERT_FILE="+caPath,
|
|
"REQUESTS_CA_BUNDLE="+caPath,
|
|
)
|
|
pipe.Stdout = nil
|
|
pipe.Stderr = nil
|
|
if err := pipe.Start(); err != nil {
|
|
fail("start claude_pipe", err)
|
|
}
|
|
defer kill(pipe)
|
|
|
|
// 4. Read the proxy NDJSON. The addon already filtered to the main stream, so
|
|
// we follow the first stream we see and stop at its message_stop.
|
|
enc := json.NewEncoder(os.Stdout)
|
|
sc := bufio.NewScanner(ndjson)
|
|
sc.Buffer(make([]byte, 1024*1024), 1024*1024)
|
|
|
|
var answer strings.Builder
|
|
mainStream := 0
|
|
|
|
for sc.Scan() {
|
|
line := strings.TrimSpace(sc.Text())
|
|
if line == "" {
|
|
continue
|
|
}
|
|
var ev wireEvent
|
|
if json.Unmarshal([]byte(line), &ev) != nil {
|
|
continue
|
|
}
|
|
switch ev.Type {
|
|
case "message_start":
|
|
if mainStream == 0 && ev.HasTools {
|
|
mainStream = ev.StreamID
|
|
}
|
|
case "text_delta":
|
|
if ev.StreamID == mainStream {
|
|
answer.WriteString(ev.Text)
|
|
_ = enc.Encode(outEvent{Type: "text_delta", Text: ev.Text})
|
|
}
|
|
case "message_stop":
|
|
if ev.StreamID == mainStream {
|
|
_ = enc.Encode(outEvent{
|
|
Type: "result",
|
|
Subtype: "success",
|
|
IsError: answer.Len() == 0,
|
|
Result: answer.String(),
|
|
})
|
|
return // defers kill mitmdump + claude_pipe
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stream ended without a message_stop (timeout / claude died). Emit whatever
|
|
// we have so the consumer is not left hanging.
|
|
_ = enc.Encode(outEvent{
|
|
Type: "result",
|
|
Subtype: "incomplete",
|
|
IsError: answer.Len() == 0,
|
|
Result: answer.String(),
|
|
})
|
|
}
|
|
|
|
// --- tiny flag/util helpers (no external deps) ---
|
|
|
|
func arg(name, def string) string {
|
|
for i, a := range os.Args[1:] {
|
|
if a == name && i+2 <= len(os.Args)-1 {
|
|
return os.Args[i+2]
|
|
}
|
|
if strings.HasPrefix(a, name+"=") {
|
|
return strings.TrimPrefix(a, name+"=")
|
|
}
|
|
}
|
|
return def
|
|
}
|
|
|
|
// positional returns the last argument if it is not a flag or a flag value.
|
|
func positional() string {
|
|
args := os.Args[1:]
|
|
for i := len(args) - 1; i >= 0; i-- {
|
|
a := args[i]
|
|
if strings.HasPrefix(a, "--") {
|
|
return ""
|
|
}
|
|
// Skip if this is the value of a preceding flag.
|
|
if i > 0 && strings.HasPrefix(args[i-1], "--") && !strings.Contains(args[i-1], "=") {
|
|
continue
|
|
}
|
|
return a
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func waitPort(addr string, timeout time.Duration) bool {
|
|
deadline := time.Now().Add(timeout)
|
|
for time.Now().Before(deadline) {
|
|
c, err := net.DialTimeout("tcp", addr, 300*time.Millisecond)
|
|
if err == nil {
|
|
_ = c.Close()
|
|
return true
|
|
}
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
return false
|
|
}
|
|
|
|
func kill(c *exec.Cmd) {
|
|
if c != nil && c.Process != nil {
|
|
_ = c.Process.Kill()
|
|
}
|
|
}
|
|
|
|
func fail(what string, err error) {
|
|
fmt.Fprintf(os.Stderr, "claude_wire: %s: %v\n", what, err)
|
|
os.Exit(1)
|
|
}
|