729921e16e
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
279 lines
9.8 KiB
Python
279 lines
9.8 KiB
Python
"""mitmproxy addon that tees Anthropic SSE streams to stdout as NDJSON.
|
|
|
|
Load with: mitmdump -p 8901 -s tee_anthropic_sse.py -q
|
|
|
|
For each POST /v1/messages response that streams text/event-stream, the addon
|
|
emits one NDJSON line per meaningful SSE event to stdout:
|
|
|
|
{"type":"message_start","stream_id":1,"model":"claude-opus-4-8","has_tools":true}
|
|
{"type":"text_delta","stream_id":1,"text":"Hello"}
|
|
{"type":"tool_use_start","stream_id":1,"tool_name":"Bash","tool_id":"toolu_01..."}
|
|
{"type":"tool_json_delta","stream_id":1,"partial_json":"{\"command\":\"ls"}
|
|
{"type":"message_stop","stream_id":1,"stop_reason":"end_turn"}
|
|
|
|
stdout is EXCLUSIVELY NDJSON — suitable for piping. All addon diagnostics go
|
|
to stderr.
|
|
|
|
Set FN_WIRE_ONLY_TOOLS=1 to suppress streams whose request body has no "tools"
|
|
array (title generators, classifiers, etc.).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from typing import Iterator
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pure helpers — testable without mitmproxy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def split_sse_events(buf: bytes) -> tuple[list[str], bytes]:
|
|
"""Split a byte buffer into complete SSE event blocks and a leftover tail.
|
|
|
|
SSE events are separated by a blank line (``\\n\\n``). Any bytes after
|
|
the last complete event are returned unchanged as *leftover* so they can
|
|
be prepended to the next chunk.
|
|
|
|
Args:
|
|
buf: Raw bytes accumulated from one or more SSE chunks.
|
|
|
|
Returns:
|
|
A 2-tuple ``(events, leftover)`` where *events* is a list of complete
|
|
event block strings (without the trailing ``\\n\\n``) and *leftover*
|
|
is the remaining bytes that do not yet form a complete event.
|
|
"""
|
|
text = buf.decode("utf-8", errors="replace")
|
|
# Split on the blank-line delimiter that separates SSE events.
|
|
parts = text.split("\n\n")
|
|
# The last element is either empty (buffer ended exactly on \n\n) or an
|
|
# incomplete event that must be carried forward.
|
|
complete = [p for p in parts[:-1] if p.strip()]
|
|
leftover_str = parts[-1]
|
|
return complete, leftover_str.encode("utf-8")
|
|
|
|
|
|
def event_to_ndjson(
|
|
event_block: str,
|
|
stream_id: int,
|
|
stop_holder: dict,
|
|
) -> list[dict]:
|
|
"""Parse one SSE event block and return zero or more NDJSON dicts.
|
|
|
|
Args:
|
|
event_block: A single SSE event block string (the content between two
|
|
``\\n\\n`` separators), e.g. ``"event: content_block_delta\\ndata: {...}"``.
|
|
stream_id: Monotonic integer that identifies the current stream.
|
|
stop_holder: A mutable dict used to carry ``stop_reason`` across calls.
|
|
The caller passes the same dict for all events of one stream; this
|
|
function writes ``stop_holder["stop_reason"]`` on ``message_delta``
|
|
events and reads it on ``message_stop``.
|
|
|
|
Returns:
|
|
A (possibly empty) list of dicts ready to be JSON-serialised as NDJSON.
|
|
"""
|
|
event_type = ""
|
|
data_str = ""
|
|
|
|
for line in event_block.splitlines():
|
|
if line.startswith("event:"):
|
|
event_type = line[len("event:"):].strip()
|
|
elif line.startswith("data:"):
|
|
data_str = line[len("data:"):].strip()
|
|
|
|
if not data_str:
|
|
return []
|
|
|
|
try:
|
|
data = json.loads(data_str)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
results: list[dict] = []
|
|
|
|
if event_type == "content_block_delta":
|
|
delta = data.get("delta", {})
|
|
delta_type = delta.get("type", "")
|
|
if delta_type == "text_delta":
|
|
results.append(
|
|
{
|
|
"type": "text_delta",
|
|
"stream_id": stream_id,
|
|
"text": delta.get("text", ""),
|
|
}
|
|
)
|
|
elif delta_type == "input_json_delta":
|
|
results.append(
|
|
{
|
|
"type": "tool_json_delta",
|
|
"stream_id": stream_id,
|
|
"partial_json": delta.get("partial_json", ""),
|
|
}
|
|
)
|
|
|
|
elif event_type == "content_block_start":
|
|
cb = data.get("content_block", {})
|
|
if cb.get("type") == "tool_use":
|
|
results.append(
|
|
{
|
|
"type": "tool_use_start",
|
|
"stream_id": stream_id,
|
|
"tool_name": cb.get("name", ""),
|
|
"tool_id": cb.get("id", ""),
|
|
}
|
|
)
|
|
# content_block_start for text blocks → nothing to emit
|
|
|
|
elif event_type == "message_delta":
|
|
delta = data.get("delta", {})
|
|
reason = delta.get("stop_reason")
|
|
if reason:
|
|
stop_holder["stop_reason"] = reason
|
|
|
|
elif event_type == "message_stop":
|
|
results.append(
|
|
{
|
|
"type": "message_stop",
|
|
"stream_id": stream_id,
|
|
"stop_reason": stop_holder.get("stop_reason", "end_turn"),
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# mitmproxy addon
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class AnthropicSSETee:
|
|
"""mitmproxy addon: tee Anthropic /v1/messages SSE streams to stdout.
|
|
|
|
One instance is shared across all intercepted flows. Each SSE stream gets
|
|
a monotonically increasing ``stream_id`` so the consumer can correlate
|
|
lines from concurrent or sequential streams.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._stream_counter: int = 0
|
|
self._wire_only_tools: bool = os.environ.get("FN_WIRE_ONLY_TOOLS", "") == "1"
|
|
|
|
# ------------------------------------------------------------------
|
|
# mitmproxy hooks
|
|
# ------------------------------------------------------------------
|
|
|
|
def request(self, flow) -> None: # noqa: ANN001
|
|
"""Called when a request is received (before it is sent upstream).
|
|
|
|
For the target /v1/messages endpoint, strip the Accept-Encoding header so
|
|
the API responds with an uncompressed SSE stream. Otherwise the streaming
|
|
tee would see gzip/brotli bytes (which never contain the ``\\n\\n`` event
|
|
delimiter) and a stateful streaming decompressor would be required. The
|
|
extra bytes on the local hop are irrelevant; claude still parses the SSE
|
|
normally.
|
|
"""
|
|
req = flow.request
|
|
if req.method == "POST" and req.path.startswith("/v1/messages"):
|
|
req.headers.pop("accept-encoding", None)
|
|
|
|
def responseheaders(self, flow) -> None: # noqa: ANN001
|
|
"""Called when response headers are received (before body).
|
|
|
|
If the flow is a streaming Anthropic messages endpoint, activate
|
|
mitmproxy's streaming mode and attach the tee function so the response
|
|
body is forwarded to claude in real time while we parse it.
|
|
"""
|
|
req = flow.request
|
|
resp = flow.response
|
|
|
|
# Filter: must be POST /v1/messages (with or without query params)
|
|
if req.method != "POST":
|
|
return
|
|
if not req.path.startswith("/v1/messages"):
|
|
return
|
|
|
|
# Filter: response must be SSE
|
|
ct = resp.headers.get("content-type", "")
|
|
if "event-stream" not in ct:
|
|
return
|
|
|
|
# Parse request body for metadata
|
|
try:
|
|
body = json.loads(req.content or b"{}")
|
|
except (json.JSONDecodeError, Exception):
|
|
# Cannot parse body — skip this flow without breaking the proxy
|
|
print(
|
|
f"[tee_anthropic_sse] WARN: could not parse request body for {req.path}",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
return
|
|
|
|
model: str = body.get("model", "unknown")
|
|
has_tools: bool = bool(body.get("tools"))
|
|
|
|
# Optionally suppress non-tool streams (title/classifier calls)
|
|
if self._wire_only_tools and not has_tools:
|
|
return
|
|
|
|
self._stream_counter += 1
|
|
stream_id = self._stream_counter
|
|
|
|
# Emit the stream-start event so the consumer knows what is coming
|
|
_emit({"type": "message_start", "stream_id": stream_id, "model": model, "has_tools": has_tools})
|
|
|
|
# Build the per-stream tee closure and hand it to mitmproxy
|
|
flow.response.stream = _make_tee(stream_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Private helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _emit(obj: dict) -> None:
|
|
"""Write one NDJSON line to stdout (flush immediately)."""
|
|
print(json.dumps(obj), flush=True)
|
|
|
|
|
|
def _make_tee(stream_id: int):
|
|
"""Return a mitmproxy streaming function for a single SSE stream.
|
|
|
|
The returned callable is assigned to ``flow.response.stream`` and will be
|
|
called by mitmproxy for each chunk of the response body. It MUST return
|
|
the chunk unchanged so claude receives the full stream.
|
|
"""
|
|
buf: bytearray = bytearray()
|
|
stop_holder: dict = {}
|
|
|
|
def tee(chunk: bytes) -> bytes:
|
|
nonlocal buf
|
|
buf.extend(chunk)
|
|
try:
|
|
events, leftover = split_sse_events(bytes(buf))
|
|
buf = bytearray(leftover)
|
|
for block in events:
|
|
for obj in event_to_ndjson(block, stream_id, stop_holder):
|
|
_emit(obj)
|
|
except Exception as exc: # noqa: BLE001
|
|
print(
|
|
f"[tee_anthropic_sse] ERROR in tee for stream {stream_id}: {exc}",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
# Always return the original chunk — claude must receive its stream
|
|
return chunk
|
|
|
|
return tee
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# mitmproxy entrypoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
addons = [AnthropicSSETee()]
|