"""mitmproxy addon that tees Anthropic SSE streams to stdout as NDJSON. Load with: mitmdump -p 8901 -s tee_anthropic_sse.py -q For each POST /v1/messages response that streams text/event-stream, the addon emits one NDJSON line per meaningful SSE event to stdout: {"type":"message_start","stream_id":1,"model":"claude-opus-4-8","has_tools":true} {"type":"text_delta","stream_id":1,"text":"Hello"} {"type":"tool_use_start","stream_id":1,"tool_name":"Bash","tool_id":"toolu_01..."} {"type":"tool_json_delta","stream_id":1,"partial_json":"{\"command\":\"ls"} {"type":"message_stop","stream_id":1,"stop_reason":"end_turn"} stdout is EXCLUSIVELY NDJSON — suitable for piping. All addon diagnostics go to stderr. Set FN_WIRE_ONLY_TOOLS=1 to suppress streams whose request body has no "tools" array (title generators, classifiers, etc.). """ from __future__ import annotations import json import os import sys from typing import Iterator # --------------------------------------------------------------------------- # Pure helpers — testable without mitmproxy # --------------------------------------------------------------------------- def split_sse_events(buf: bytes) -> tuple[list[str], bytes]: """Split a byte buffer into complete SSE event blocks and a leftover tail. SSE events are separated by a blank line (``\\n\\n``). Any bytes after the last complete event are returned unchanged as *leftover* so they can be prepended to the next chunk. Args: buf: Raw bytes accumulated from one or more SSE chunks. Returns: A 2-tuple ``(events, leftover)`` where *events* is a list of complete event block strings (without the trailing ``\\n\\n``) and *leftover* is the remaining bytes that do not yet form a complete event. """ text = buf.decode("utf-8", errors="replace") # Split on the blank-line delimiter that separates SSE events. parts = text.split("\n\n") # The last element is either empty (buffer ended exactly on \n\n) or an # incomplete event that must be carried forward. complete = [p for p in parts[:-1] if p.strip()] leftover_str = parts[-1] return complete, leftover_str.encode("utf-8") def event_to_ndjson( event_block: str, stream_id: int, stop_holder: dict, ) -> list[dict]: """Parse one SSE event block and return zero or more NDJSON dicts. Args: event_block: A single SSE event block string (the content between two ``\\n\\n`` separators), e.g. ``"event: content_block_delta\\ndata: {...}"``. stream_id: Monotonic integer that identifies the current stream. stop_holder: A mutable dict used to carry ``stop_reason`` across calls. The caller passes the same dict for all events of one stream; this function writes ``stop_holder["stop_reason"]`` on ``message_delta`` events and reads it on ``message_stop``. Returns: A (possibly empty) list of dicts ready to be JSON-serialised as NDJSON. """ event_type = "" data_str = "" for line in event_block.splitlines(): if line.startswith("event:"): event_type = line[len("event:"):].strip() elif line.startswith("data:"): data_str = line[len("data:"):].strip() if not data_str: return [] try: data = json.loads(data_str) except json.JSONDecodeError: return [] results: list[dict] = [] if event_type == "content_block_delta": delta = data.get("delta", {}) delta_type = delta.get("type", "") if delta_type == "text_delta": results.append( { "type": "text_delta", "stream_id": stream_id, "text": delta.get("text", ""), } ) elif delta_type == "input_json_delta": results.append( { "type": "tool_json_delta", "stream_id": stream_id, "partial_json": delta.get("partial_json", ""), } ) elif event_type == "content_block_start": cb = data.get("content_block", {}) if cb.get("type") == "tool_use": results.append( { "type": "tool_use_start", "stream_id": stream_id, "tool_name": cb.get("name", ""), "tool_id": cb.get("id", ""), } ) # content_block_start for text blocks → nothing to emit elif event_type == "message_delta": delta = data.get("delta", {}) reason = delta.get("stop_reason") if reason: stop_holder["stop_reason"] = reason elif event_type == "message_stop": results.append( { "type": "message_stop", "stream_id": stream_id, "stop_reason": stop_holder.get("stop_reason", "end_turn"), } ) return results # --------------------------------------------------------------------------- # mitmproxy addon # --------------------------------------------------------------------------- class AnthropicSSETee: """mitmproxy addon: tee Anthropic /v1/messages SSE streams to stdout. One instance is shared across all intercepted flows. Each SSE stream gets a monotonically increasing ``stream_id`` so the consumer can correlate lines from concurrent or sequential streams. """ def __init__(self) -> None: self._stream_counter: int = 0 self._wire_only_tools: bool = os.environ.get("FN_WIRE_ONLY_TOOLS", "") == "1" # ------------------------------------------------------------------ # mitmproxy hooks # ------------------------------------------------------------------ def request(self, flow) -> None: # noqa: ANN001 """Called when a request is received (before it is sent upstream). For the target /v1/messages endpoint, strip the Accept-Encoding header so the API responds with an uncompressed SSE stream. Otherwise the streaming tee would see gzip/brotli bytes (which never contain the ``\\n\\n`` event delimiter) and a stateful streaming decompressor would be required. The extra bytes on the local hop are irrelevant; claude still parses the SSE normally. """ req = flow.request if req.method == "POST" and req.path.startswith("/v1/messages"): req.headers.pop("accept-encoding", None) def responseheaders(self, flow) -> None: # noqa: ANN001 """Called when response headers are received (before body). If the flow is a streaming Anthropic messages endpoint, activate mitmproxy's streaming mode and attach the tee function so the response body is forwarded to claude in real time while we parse it. """ req = flow.request resp = flow.response # Filter: must be POST /v1/messages (with or without query params) if req.method != "POST": return if not req.path.startswith("/v1/messages"): return # Filter: response must be SSE ct = resp.headers.get("content-type", "") if "event-stream" not in ct: return # Parse request body for metadata try: body = json.loads(req.content or b"{}") except (json.JSONDecodeError, Exception): # Cannot parse body — skip this flow without breaking the proxy print( f"[tee_anthropic_sse] WARN: could not parse request body for {req.path}", file=sys.stderr, flush=True, ) return model: str = body.get("model", "unknown") has_tools: bool = bool(body.get("tools")) # Optionally suppress non-tool streams (title/classifier calls) if self._wire_only_tools and not has_tools: return self._stream_counter += 1 stream_id = self._stream_counter # Emit the stream-start event so the consumer knows what is coming _emit({"type": "message_start", "stream_id": stream_id, "model": model, "has_tools": has_tools}) # Build the per-stream tee closure and hand it to mitmproxy flow.response.stream = _make_tee(stream_id) # --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _emit(obj: dict) -> None: """Write one NDJSON line to stdout (flush immediately).""" print(json.dumps(obj), flush=True) def _make_tee(stream_id: int): """Return a mitmproxy streaming function for a single SSE stream. The returned callable is assigned to ``flow.response.stream`` and will be called by mitmproxy for each chunk of the response body. It MUST return the chunk unchanged so claude receives the full stream. """ buf: bytearray = bytearray() stop_holder: dict = {} def tee(chunk: bytes) -> bytes: nonlocal buf buf.extend(chunk) try: events, leftover = split_sse_events(bytes(buf)) buf = bytearray(leftover) for block in events: for obj in event_to_ndjson(block, stream_id, stop_holder): _emit(obj) except Exception as exc: # noqa: BLE001 print( f"[tee_anthropic_sse] ERROR in tee for stream {stream_id}: {exc}", file=sys.stderr, flush=True, ) # Always return the original chunk — claude must receive its stream return chunk return tee # --------------------------------------------------------------------------- # mitmproxy entrypoint # --------------------------------------------------------------------------- addons = [AnthropicSSETee()]