"""mitmproxy addon that rotates the capture file every N minutes. Load with: mitmdump -s rotate_capture_flows.py --set rotate_min=20 --set capture_dir=/path/to/dir """ import os import time from datetime import datetime from mitmproxy import ctx import mitmproxy.io class Rotator: """Addon that writes flows to a rotating series of .mitm files. A new file is opened whenever the current one has been open for at least ``rotate_min`` minutes. The rollover check happens on every ``response`` event, so the file only rotates when traffic is actually flowing through the proxy. """ def __init__(self) -> None: self._writer: mitmproxy.io.FlowWriter | None = None self._fh = None # file handle opened in "wb" self._opened_at: float = 0.0 self._current_path: str = "" self._rotate_min: int = 20 self._capture_dir: str = "." self._exclude: set = set() # ------------------------------------------------------------------ # mitmproxy lifecycle hooks # ------------------------------------------------------------------ def load(self, loader) -> None: """Register addon options.""" loader.add_option( name="rotate_min", typespec=int, default=20, help="Minutes per capture file before rolling over.", ) loader.add_option( name="capture_dir", typespec=str, default=".", help="Directory where rotating .mitm files are written.", ) loader.add_option( name="exclude_hosts", typespec=str, default="", help=( "Comma-separated hosts or host:port that must never be " "recorded (e.g. the proxy's own web UI). Flows matching " "either the host alone or host:port are dropped silently." ), ) def configure(self, updated) -> None: """Read option values and ensure the capture directory exists.""" self._rotate_min = ctx.options.rotate_min self._capture_dir = ctx.options.capture_dir self._exclude = { h.strip() for h in ctx.options.exclude_hosts.split(",") if h.strip() } if self._capture_dir: os.makedirs(self._capture_dir, exist_ok=True) def response(self, flow) -> None: """Called for every completed HTTP response. Rolls the file over if the rotation interval has elapsed, then records the flow. Flows whose host (or host:port) is in the exclude list are dropped without being written. """ if self._exclude: host = flow.request.pretty_host if host in self._exclude or f"{host}:{flow.request.port}" in self._exclude: return now = time.time() if self._writer is None or (now - self._opened_at) >= self._rotate_min * 60: self._roll() self._writer.add(flow) # Flush after every flow so the capture survives an abrupt kill # (SIGKILL, crash, power loss). A capture proxy must never lose its # window of traffic just because the process died without cleanup. if self._fh is not None: self._fh.flush() def done(self) -> None: """Clean up when mitmdump shuts down.""" self._close() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _roll(self) -> None: """Close the current file (if any) and open a new one.""" self._close() timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = f"traffic-{timestamp}.mitm" self._current_path = os.path.join(self._capture_dir, filename) self._fh = open(self._current_path, "wb") self._writer = mitmproxy.io.FlowWriter(self._fh) self._opened_at = time.time() ctx.log.info(f"rotate_capture_flows: opened new capture file {self._current_path}") def _close(self) -> None: """Flush and close the current writer and file handle.""" if self._fh is not None: try: self._fh.flush() self._fh.close() except Exception: pass finally: self._fh = None self._writer = None addons = [Rotator()]