"""mitmproxy addon that rotates the capture file every N minutes. Load with: mitmdump -s rotate_capture_flows.py --set rotate_min=20 --set capture_dir=/path/to/dir """ import glob import os import time from datetime import datetime from mitmproxy import ctx import mitmproxy.io class Rotator: """Addon that writes flows to a rotating series of .mitm files. A new file is opened whenever the current one has been open for at least ``rotate_min`` minutes. The rollover check happens on every ``response`` event, so the file only rotates when traffic is actually flowing through the proxy. """ def __init__(self) -> None: self._writer: mitmproxy.io.FlowWriter | None = None self._fh = None # file handle opened in "wb" self._opened_at: float = 0.0 self._current_path: str = "" self._rotate_min: int = 20 self._capture_dir: str = "." self._exclude: set = set() self._max_total_mb: int = 0 self._max_age_days: int = 0 # ------------------------------------------------------------------ # mitmproxy lifecycle hooks # ------------------------------------------------------------------ def load(self, loader) -> None: """Register addon options.""" loader.add_option( name="rotate_min", typespec=int, default=20, help="Minutes per capture file before rolling over.", ) loader.add_option( name="capture_dir", typespec=str, default=".", help="Directory where rotating .mitm files are written.", ) loader.add_option( name="exclude_hosts", typespec=str, default="", help=( "Comma-separated hosts or host:port that must never be " "recorded (e.g. the proxy's own web UI). Flows matching " "either the host alone or host:port are dropped silently." ), ) loader.add_option( name="max_total_mb", typespec=int, default=0, help=( "Retention cap on total size of the capture directory in MB. " "When a new file is rolled, the oldest .mitm files are deleted " "until the total drops below this limit. 0 disables the cap." ), ) loader.add_option( name="max_age_days", typespec=int, default=0, help=( "Retention cap on capture age in days. On rollover, .mitm files " "older than this are deleted. 0 disables the age cap." ), ) def configure(self, updated) -> None: """Read option values and ensure the capture directory exists.""" self._rotate_min = ctx.options.rotate_min self._capture_dir = ctx.options.capture_dir self._exclude = { h.strip() for h in ctx.options.exclude_hosts.split(",") if h.strip() } self._max_total_mb = ctx.options.max_total_mb self._max_age_days = ctx.options.max_age_days if self._capture_dir: os.makedirs(self._capture_dir, exist_ok=True) def response(self, flow) -> None: """Called for every completed HTTP response. Rolls the file over if the rotation interval has elapsed, then records the flow. Flows whose host (or host:port) is in the exclude list are dropped without being written. """ if self._exclude: host = flow.request.pretty_host if host in self._exclude or f"{host}:{flow.request.port}" in self._exclude: return now = time.time() if self._writer is None or (now - self._opened_at) >= self._rotate_min * 60: self._roll() self._writer.add(flow) # Flush after every flow so the capture survives an abrupt kill # (SIGKILL, crash, power loss). A capture proxy must never lose its # window of traffic just because the process died without cleanup. if self._fh is not None: self._fh.flush() def done(self) -> None: """Clean up when mitmdump shuts down.""" self._close() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _roll(self) -> None: """Close the current file (if any) and open a new one.""" self._close() timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = f"traffic-{timestamp}.mitm" self._current_path = os.path.join(self._capture_dir, filename) self._fh = open(self._current_path, "wb") self._writer = mitmproxy.io.FlowWriter(self._fh) self._opened_at = time.time() ctx.log.info(f"rotate_capture_flows: opened new capture file {self._current_path}") # Enforce retention after opening the new active file, so the file # currently being written is never a deletion candidate. self._enforce_retention() def _enforce_retention(self) -> None: """Delete old capture files that exceed the age or size limits. Two independent caps, whichever applies first: - age: files older than ``max_age_days`` are removed. - size: oldest files are removed until the directory total drops below ``max_total_mb``. The currently active file is never deleted. Both caps are skipped when set to 0. """ if self._max_age_days <= 0 and self._max_total_mb <= 0: return try: pattern = os.path.join(self._capture_dir, "traffic-*.mitm") files = [f for f in glob.glob(pattern) if f != self._current_path] except Exception: return # Pair each file with its mtime/size once, skipping vanished files. stats = [] for f in files: try: st = os.stat(f) stats.append((f, st.st_mtime, st.st_size)) except OSError: continue stats.sort(key=lambda t: t[1]) # oldest first # 1. Age cap. if self._max_age_days > 0: cutoff = time.time() - self._max_age_days * 86400 kept = [] for f, mtime, size in stats: if mtime < cutoff: self._safe_remove(f) else: kept.append((f, mtime, size)) stats = kept # 2. Size cap. if self._max_total_mb > 0: limit = self._max_total_mb * 1024 * 1024 total = sum(size for _, _, size in stats) for f, _mtime, size in stats: # oldest first if total <= limit: break if self._safe_remove(f): total -= size @staticmethod def _safe_remove(path: str) -> bool: try: os.remove(path) ctx.log.info(f"rotate_capture_flows: pruned old capture {path}") return True except OSError: return False def _close(self) -> None: """Flush and close the current writer and file handle.""" if self._fh is not None: try: self._fh.flush() self._fh.close() except Exception: pass finally: self._fh = None self._writer = None addons = [Rotator()]