Files
fn_registry/python/functions/cybersecurity/rotate_capture_flows.py
T
2026-06-02 21:50:23 +02:00

128 lines
4.4 KiB
Python

"""mitmproxy addon that rotates the capture file every N minutes.
Load with: mitmdump -s rotate_capture_flows.py --set rotate_min=20 --set capture_dir=/path/to/dir
"""
import os
import time
from datetime import datetime
from mitmproxy import ctx
import mitmproxy.io
class Rotator:
"""Addon that writes flows to a rotating series of .mitm files.
A new file is opened whenever the current one has been open for
at least ``rotate_min`` minutes. The rollover check happens on
every ``response`` event, so the file only rotates when traffic
is actually flowing through the proxy.
"""
def __init__(self) -> None:
self._writer: mitmproxy.io.FlowWriter | None = None
self._fh = None # file handle opened in "wb"
self._opened_at: float = 0.0
self._current_path: str = ""
self._rotate_min: int = 20
self._capture_dir: str = "."
self._exclude: set = set()
# ------------------------------------------------------------------
# mitmproxy lifecycle hooks
# ------------------------------------------------------------------
def load(self, loader) -> None:
"""Register addon options."""
loader.add_option(
name="rotate_min",
typespec=int,
default=20,
help="Minutes per capture file before rolling over.",
)
loader.add_option(
name="capture_dir",
typespec=str,
default=".",
help="Directory where rotating .mitm files are written.",
)
loader.add_option(
name="exclude_hosts",
typespec=str,
default="",
help=(
"Comma-separated hosts or host:port that must never be "
"recorded (e.g. the proxy's own web UI). Flows matching "
"either the host alone or host:port are dropped silently."
),
)
def configure(self, updated) -> None:
"""Read option values and ensure the capture directory exists."""
self._rotate_min = ctx.options.rotate_min
self._capture_dir = ctx.options.capture_dir
self._exclude = {
h.strip() for h in ctx.options.exclude_hosts.split(",") if h.strip()
}
if self._capture_dir:
os.makedirs(self._capture_dir, exist_ok=True)
def response(self, flow) -> None:
"""Called for every completed HTTP response.
Rolls the file over if the rotation interval has elapsed, then
records the flow. Flows whose host (or host:port) is in the
exclude list are dropped without being written.
"""
if self._exclude:
host = flow.request.pretty_host
if host in self._exclude or f"{host}:{flow.request.port}" in self._exclude:
return
now = time.time()
if self._writer is None or (now - self._opened_at) >= self._rotate_min * 60:
self._roll()
self._writer.add(flow)
# Flush after every flow so the capture survives an abrupt kill
# (SIGKILL, crash, power loss). A capture proxy must never lose its
# window of traffic just because the process died without cleanup.
if self._fh is not None:
self._fh.flush()
def done(self) -> None:
"""Clean up when mitmdump shuts down."""
self._close()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _roll(self) -> None:
"""Close the current file (if any) and open a new one."""
self._close()
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
filename = f"traffic-{timestamp}.mitm"
self._current_path = os.path.join(self._capture_dir, filename)
self._fh = open(self._current_path, "wb")
self._writer = mitmproxy.io.FlowWriter(self._fh)
self._opened_at = time.time()
ctx.log.info(f"rotate_capture_flows: opened new capture file {self._current_path}")
def _close(self) -> None:
"""Flush and close the current writer and file handle."""
if self._fh is not None:
try:
self._fh.flush()
self._fh.close()
except Exception:
pass
finally:
self._fh = None
self._writer = None
addons = [Rotator()]