feat(cybersecurity): auto-commit con 2 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
Load with: mitmdump -s rotate_capture_flows.py --set rotate_min=20 --set capture_dir=/path/to/dir
|
||||
"""
|
||||
|
||||
import glob
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime
|
||||
@@ -28,6 +29,8 @@ class Rotator:
|
||||
self._rotate_min: int = 20
|
||||
self._capture_dir: str = "."
|
||||
self._exclude: set = set()
|
||||
self._max_total_mb: int = 0
|
||||
self._max_age_days: int = 0
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# mitmproxy lifecycle hooks
|
||||
@@ -57,6 +60,25 @@ class Rotator:
|
||||
"either the host alone or host:port are dropped silently."
|
||||
),
|
||||
)
|
||||
loader.add_option(
|
||||
name="max_total_mb",
|
||||
typespec=int,
|
||||
default=0,
|
||||
help=(
|
||||
"Retention cap on total size of the capture directory in MB. "
|
||||
"When a new file is rolled, the oldest .mitm files are deleted "
|
||||
"until the total drops below this limit. 0 disables the cap."
|
||||
),
|
||||
)
|
||||
loader.add_option(
|
||||
name="max_age_days",
|
||||
typespec=int,
|
||||
default=0,
|
||||
help=(
|
||||
"Retention cap on capture age in days. On rollover, .mitm files "
|
||||
"older than this are deleted. 0 disables the age cap."
|
||||
),
|
||||
)
|
||||
|
||||
def configure(self, updated) -> None:
|
||||
"""Read option values and ensure the capture directory exists."""
|
||||
@@ -65,6 +87,8 @@ class Rotator:
|
||||
self._exclude = {
|
||||
h.strip() for h in ctx.options.exclude_hosts.split(",") if h.strip()
|
||||
}
|
||||
self._max_total_mb = ctx.options.max_total_mb
|
||||
self._max_age_days = ctx.options.max_age_days
|
||||
if self._capture_dir:
|
||||
os.makedirs(self._capture_dir, exist_ok=True)
|
||||
|
||||
@@ -111,6 +135,68 @@ class Rotator:
|
||||
|
||||
ctx.log.info(f"rotate_capture_flows: opened new capture file {self._current_path}")
|
||||
|
||||
# Enforce retention after opening the new active file, so the file
|
||||
# currently being written is never a deletion candidate.
|
||||
self._enforce_retention()
|
||||
|
||||
def _enforce_retention(self) -> None:
|
||||
"""Delete old capture files that exceed the age or size limits.
|
||||
|
||||
Two independent caps, whichever applies first:
|
||||
- age: files older than ``max_age_days`` are removed.
|
||||
- size: oldest files are removed until the directory total drops
|
||||
below ``max_total_mb``.
|
||||
The currently active file is never deleted. Both caps are skipped
|
||||
when set to 0.
|
||||
"""
|
||||
if self._max_age_days <= 0 and self._max_total_mb <= 0:
|
||||
return
|
||||
try:
|
||||
pattern = os.path.join(self._capture_dir, "traffic-*.mitm")
|
||||
files = [f for f in glob.glob(pattern) if f != self._current_path]
|
||||
except Exception:
|
||||
return
|
||||
|
||||
# Pair each file with its mtime/size once, skipping vanished files.
|
||||
stats = []
|
||||
for f in files:
|
||||
try:
|
||||
st = os.stat(f)
|
||||
stats.append((f, st.st_mtime, st.st_size))
|
||||
except OSError:
|
||||
continue
|
||||
stats.sort(key=lambda t: t[1]) # oldest first
|
||||
|
||||
# 1. Age cap.
|
||||
if self._max_age_days > 0:
|
||||
cutoff = time.time() - self._max_age_days * 86400
|
||||
kept = []
|
||||
for f, mtime, size in stats:
|
||||
if mtime < cutoff:
|
||||
self._safe_remove(f)
|
||||
else:
|
||||
kept.append((f, mtime, size))
|
||||
stats = kept
|
||||
|
||||
# 2. Size cap.
|
||||
if self._max_total_mb > 0:
|
||||
limit = self._max_total_mb * 1024 * 1024
|
||||
total = sum(size for _, _, size in stats)
|
||||
for f, _mtime, size in stats: # oldest first
|
||||
if total <= limit:
|
||||
break
|
||||
if self._safe_remove(f):
|
||||
total -= size
|
||||
|
||||
@staticmethod
|
||||
def _safe_remove(path: str) -> bool:
|
||||
try:
|
||||
os.remove(path)
|
||||
ctx.log.info(f"rotate_capture_flows: pruned old capture {path}")
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def _close(self) -> None:
|
||||
"""Flush and close the current writer and file handle."""
|
||||
if self._fh is not None:
|
||||
|
||||
Reference in New Issue
Block a user