Files
fn_registry/python/functions/cybersecurity/tee_anthropic_sse.py
T
2026-06-04 23:44:39 +02:00

279 lines
9.8 KiB
Python

"""mitmproxy addon that tees Anthropic SSE streams to stdout as NDJSON.
Load with: mitmdump -p 8901 -s tee_anthropic_sse.py -q
For each POST /v1/messages response that streams text/event-stream, the addon
emits one NDJSON line per meaningful SSE event to stdout:
{"type":"message_start","stream_id":1,"model":"claude-opus-4-8","has_tools":true}
{"type":"text_delta","stream_id":1,"text":"Hello"}
{"type":"tool_use_start","stream_id":1,"tool_name":"Bash","tool_id":"toolu_01..."}
{"type":"tool_json_delta","stream_id":1,"partial_json":"{\"command\":\"ls"}
{"type":"message_stop","stream_id":1,"stop_reason":"end_turn"}
stdout is EXCLUSIVELY NDJSON — suitable for piping. All addon diagnostics go
to stderr.
Set FN_WIRE_ONLY_TOOLS=1 to suppress streams whose request body has no "tools"
array (title generators, classifiers, etc.).
"""
from __future__ import annotations
import json
import os
import sys
from typing import Iterator
# ---------------------------------------------------------------------------
# Pure helpers — testable without mitmproxy
# ---------------------------------------------------------------------------
def split_sse_events(buf: bytes) -> tuple[list[str], bytes]:
"""Split a byte buffer into complete SSE event blocks and a leftover tail.
SSE events are separated by a blank line (``\\n\\n``). Any bytes after
the last complete event are returned unchanged as *leftover* so they can
be prepended to the next chunk.
Args:
buf: Raw bytes accumulated from one or more SSE chunks.
Returns:
A 2-tuple ``(events, leftover)`` where *events* is a list of complete
event block strings (without the trailing ``\\n\\n``) and *leftover*
is the remaining bytes that do not yet form a complete event.
"""
text = buf.decode("utf-8", errors="replace")
# Split on the blank-line delimiter that separates SSE events.
parts = text.split("\n\n")
# The last element is either empty (buffer ended exactly on \n\n) or an
# incomplete event that must be carried forward.
complete = [p for p in parts[:-1] if p.strip()]
leftover_str = parts[-1]
return complete, leftover_str.encode("utf-8")
def event_to_ndjson(
event_block: str,
stream_id: int,
stop_holder: dict,
) -> list[dict]:
"""Parse one SSE event block and return zero or more NDJSON dicts.
Args:
event_block: A single SSE event block string (the content between two
``\\n\\n`` separators), e.g. ``"event: content_block_delta\\ndata: {...}"``.
stream_id: Monotonic integer that identifies the current stream.
stop_holder: A mutable dict used to carry ``stop_reason`` across calls.
The caller passes the same dict for all events of one stream; this
function writes ``stop_holder["stop_reason"]`` on ``message_delta``
events and reads it on ``message_stop``.
Returns:
A (possibly empty) list of dicts ready to be JSON-serialised as NDJSON.
"""
event_type = ""
data_str = ""
for line in event_block.splitlines():
if line.startswith("event:"):
event_type = line[len("event:"):].strip()
elif line.startswith("data:"):
data_str = line[len("data:"):].strip()
if not data_str:
return []
try:
data = json.loads(data_str)
except json.JSONDecodeError:
return []
results: list[dict] = []
if event_type == "content_block_delta":
delta = data.get("delta", {})
delta_type = delta.get("type", "")
if delta_type == "text_delta":
results.append(
{
"type": "text_delta",
"stream_id": stream_id,
"text": delta.get("text", ""),
}
)
elif delta_type == "input_json_delta":
results.append(
{
"type": "tool_json_delta",
"stream_id": stream_id,
"partial_json": delta.get("partial_json", ""),
}
)
elif event_type == "content_block_start":
cb = data.get("content_block", {})
if cb.get("type") == "tool_use":
results.append(
{
"type": "tool_use_start",
"stream_id": stream_id,
"tool_name": cb.get("name", ""),
"tool_id": cb.get("id", ""),
}
)
# content_block_start for text blocks → nothing to emit
elif event_type == "message_delta":
delta = data.get("delta", {})
reason = delta.get("stop_reason")
if reason:
stop_holder["stop_reason"] = reason
elif event_type == "message_stop":
results.append(
{
"type": "message_stop",
"stream_id": stream_id,
"stop_reason": stop_holder.get("stop_reason", "end_turn"),
}
)
return results
# ---------------------------------------------------------------------------
# mitmproxy addon
# ---------------------------------------------------------------------------
class AnthropicSSETee:
"""mitmproxy addon: tee Anthropic /v1/messages SSE streams to stdout.
One instance is shared across all intercepted flows. Each SSE stream gets
a monotonically increasing ``stream_id`` so the consumer can correlate
lines from concurrent or sequential streams.
"""
def __init__(self) -> None:
self._stream_counter: int = 0
self._wire_only_tools: bool = os.environ.get("FN_WIRE_ONLY_TOOLS", "") == "1"
# ------------------------------------------------------------------
# mitmproxy hooks
# ------------------------------------------------------------------
def request(self, flow) -> None: # noqa: ANN001
"""Called when a request is received (before it is sent upstream).
For the target /v1/messages endpoint, strip the Accept-Encoding header so
the API responds with an uncompressed SSE stream. Otherwise the streaming
tee would see gzip/brotli bytes (which never contain the ``\\n\\n`` event
delimiter) and a stateful streaming decompressor would be required. The
extra bytes on the local hop are irrelevant; claude still parses the SSE
normally.
"""
req = flow.request
if req.method == "POST" and req.path.startswith("/v1/messages"):
req.headers.pop("accept-encoding", None)
def responseheaders(self, flow) -> None: # noqa: ANN001
"""Called when response headers are received (before body).
If the flow is a streaming Anthropic messages endpoint, activate
mitmproxy's streaming mode and attach the tee function so the response
body is forwarded to claude in real time while we parse it.
"""
req = flow.request
resp = flow.response
# Filter: must be POST /v1/messages (with or without query params)
if req.method != "POST":
return
if not req.path.startswith("/v1/messages"):
return
# Filter: response must be SSE
ct = resp.headers.get("content-type", "")
if "event-stream" not in ct:
return
# Parse request body for metadata
try:
body = json.loads(req.content or b"{}")
except (json.JSONDecodeError, Exception):
# Cannot parse body — skip this flow without breaking the proxy
print(
f"[tee_anthropic_sse] WARN: could not parse request body for {req.path}",
file=sys.stderr,
flush=True,
)
return
model: str = body.get("model", "unknown")
has_tools: bool = bool(body.get("tools"))
# Optionally suppress non-tool streams (title/classifier calls)
if self._wire_only_tools and not has_tools:
return
self._stream_counter += 1
stream_id = self._stream_counter
# Emit the stream-start event so the consumer knows what is coming
_emit({"type": "message_start", "stream_id": stream_id, "model": model, "has_tools": has_tools})
# Build the per-stream tee closure and hand it to mitmproxy
flow.response.stream = _make_tee(stream_id)
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
def _emit(obj: dict) -> None:
"""Write one NDJSON line to stdout (flush immediately)."""
print(json.dumps(obj), flush=True)
def _make_tee(stream_id: int):
"""Return a mitmproxy streaming function for a single SSE stream.
The returned callable is assigned to ``flow.response.stream`` and will be
called by mitmproxy for each chunk of the response body. It MUST return
the chunk unchanged so claude receives the full stream.
"""
buf: bytearray = bytearray()
stop_holder: dict = {}
def tee(chunk: bytes) -> bytes:
nonlocal buf
buf.extend(chunk)
try:
events, leftover = split_sse_events(bytes(buf))
buf = bytearray(leftover)
for block in events:
for obj in event_to_ndjson(block, stream_id, stop_holder):
_emit(obj)
except Exception as exc: # noqa: BLE001
print(
f"[tee_anthropic_sse] ERROR in tee for stream {stream_id}: {exc}",
file=sys.stderr,
flush=True,
)
# Always return the original chunk — claude must receive its stream
return chunk
return tee
# ---------------------------------------------------------------------------
# mitmproxy entrypoint
# ---------------------------------------------------------------------------
addons = [AnthropicSSETee()]