"""HTTP replay engine: reproduce an ordered sequence of captured HTTP calls. This is the reusable core of Level 1 ("pure HTTP") of the record -> distill -> replay pattern. It takes the call specs produced by ``har_extract_calls_py_cybersecurity`` and replays them in order over a single ``requests.Session`` (shared cookie jar), supporting ``{{param}}`` substitution and extracting values from one response to feed later steps (e.g. a CSRF token from the initial GET injected as a header in a subsequent POST). """ import re import requests _PLACEHOLDER_RE = re.compile(r"\{\{\s*([A-Za-z0-9_]+)\s*\}\}") def _subst(value, ctx, missing): """Replace every ``{{name}}`` occurrence in ``value`` using ``ctx``. If a referenced param is missing from ``ctx``, the literal ``{{name}}`` is kept untouched and the name is appended to ``missing`` (deduplicated). Non-string values are returned unchanged. """ if not isinstance(value, str): return value def repl(match: "re.Match") -> str: name = match.group(1) if name in ctx and ctx[name] is not None: return str(ctx[name]) if name not in missing: missing.append(name) return match.group(0) return _PLACEHOLDER_RE.sub(repl, value) def _subst_dict(d, ctx, missing): """Apply ``_subst`` to every value of a dict, returning a new dict.""" if not d: return {} out = {} for k, v in d.items(): out[k] = _subst(v, ctx, missing) return out def _json_dot_path(data, expr: str): """Walk a simple dot-path over a parsed JSON value. Supports dict keys and list indices: ``"data.items.0.token"``. A segment that is all digits is treated as a list index. Returns the value or ``None`` if any segment cannot be resolved. """ cur = data for seg in expr.split("."): if seg == "": continue if isinstance(cur, list) and seg.isdigit(): idx = int(seg) if 0 <= idx < len(cur): cur = cur[idx] else: return None elif isinstance(cur, dict) and seg in cur: cur = cur[seg] else: return None return cur def _apply_extract_rule(rule, resp, session): """Resolve a single extract rule against a response. Returns str value or "". Rule types: - json: dot-path over ``resp.json()``. - regex: ``re.search`` over ``resp.text``; group(1) if present, else group(0). - header: ``resp.headers.get(expr)``. - set_cookie: ``session.cookies.get(expr)``. """ rtype = rule.get("type", "json") expr = rule.get("expr", "") try: if rtype == "json": value = _json_dot_path(resp.json(), expr) return "" if value is None else str(value) if rtype == "regex": m = re.search(expr, resp.text) if not m: return "" if m.groups(): return "" if m.group(1) is None else str(m.group(1)) return str(m.group(0)) if rtype == "header": value = resp.headers.get(expr) return "" if value is None else str(value) if rtype == "set_cookie": value = session.cookies.get(expr) return "" if value is None else str(value) except (ValueError, TypeError): return "" return "" def http_replay_sequence( calls: list[dict], *, params: dict | None = None, extract: list[dict] | None = None, timeout_s: float = 30.0, verify_tls: bool = True, allow_redirects: bool = True, base_headers: dict | None = None, ) -> dict: """Replay an ordered sequence of HTTP call specs over a shared session. Args: calls: List of call specs, each ``{"method","url","headers"(dict),"cookies"(dict opc),"body"(str|None), "body_type":"json"|"form"|"raw"|None}``. params: Initial context dict for ``{{param}}`` substitution (copied). extract: List of extract rules ``{"from": int|"last", "type": "json"|"regex"|"header"|"set_cookie", "expr": str, "as": str}``. Applied right after the referenced step runs. timeout_s: Per-request timeout in seconds. verify_tls: Whether to verify TLS certificates (set on the session). allow_redirects: Whether requests should follow redirects. base_headers: Default headers merged into the session. Returns: Dict with ``status`` ("ok"|"error"), ``steps`` (per-step records), ``params_final`` (the context after all steps) and ``error`` (message when ``status == "error"``). """ ctx: dict = dict(params) if params else {} extract = extract or [] steps: list[dict] = [] # Validate input shape before opening a session. if not isinstance(calls, list): return { "status": "error", "steps": [], "params_final": ctx, "error": "calls must be a list of call specs", } session = requests.Session() session.verify = verify_tls if base_headers: session.headers.update(base_headers) status = "ok" error_msg = "" try: for i, call in enumerate(calls): if not isinstance(call, dict): status = "error" error_msg = f"step {i}: call spec must be a dict" steps.append( { "idx": i, "method": "", "url": "", "status_code": 0, "ok": False, "extracted": {}, "missing_params": [], "error": "call spec must be a dict", } ) break missing: list[str] = [] method = (call.get("method") or "GET").upper() url = _subst(call.get("url") or "", ctx, missing) headers = _subst_dict(call.get("headers"), ctx, missing) cookies = _subst_dict(call.get("cookies"), ctx, missing) body = _subst(call.get("body"), ctx, missing) body_type = call.get("body_type") kwargs: dict = { "headers": headers or None, "cookies": cookies or None, "timeout": timeout_s, "allow_redirects": allow_redirects, } # json/form/raw all send the body as-is via data= (the body is # already a serialized string; do NOT re-serialize JSON). if body is not None: kwargs["data"] = body try: resp = session.request(method, url, **kwargs) except requests.RequestException as exc: status = "error" error_msg = f"step {i}: {exc}" steps.append( { "idx": i, "method": method, "url": url, "status_code": 0, "ok": False, "extracted": {}, "missing_params": missing, "error": str(exc), } ) break code = resp.status_code ok = 200 <= code < 400 # Apply extract rules targeting this step. "last" == the step just run. extracted: dict = {} extract_notes: list[str] = [] for rule in extract: frm = rule.get("from") if frm == "last" or frm == i: as_name = rule.get("as") if not as_name: continue value = _apply_extract_rule(rule, resp, session) ctx[as_name] = value extracted[as_name] = value if value == "": extract_notes.append(f"extract '{as_name}' not found") steps.append( { "idx": i, "method": method, "url": url, "status_code": code, "ok": ok, "extracted": extracted, "missing_params": missing, "error": "; ".join(extract_notes), } ) finally: session.close() return { "status": status, "steps": steps, "params_final": ctx, "error": error_msg, }