{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Recolección de datos: Binance + Bitstamp L3\n", "\n", "**Objetivo:** Dataset de 1M+ filas guardado en `data/`\n", "\n", "| Fuente | Tipo | Método | Qué obtenemos |\n", "|---|---|---|---|\n", "| Binance | aggTrades (fills agrupados por taker) | REST paginado | 1M+ trades históricos |\n", "| Binance | Order book L2 | REST snapshots | Profundidad del libro |\n", "| Bitstamp | L3 live_orders | WebSocket | Cada orden individual con ID |" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Data dir: /home/lucas/fn_registry/analysis/estudio_mercados/data\n" ] } ], "source": [ "import aiohttp\n", "import asyncio\n", "import websockets\n", "import json\n", "import time\n", "import polars as pl\n", "import numpy as np\n", "from datetime import datetime, timedelta\n", "from pathlib import Path\n", "\n", "DATA_DIR = Path('../data')\n", "DATA_DIR.mkdir(exist_ok=True)\n", "\n", "BINANCE_BASE = 'https://api.binance.com'\n", "BITSTAMP_WS = 'wss://ws.bitstamp.net'\n", "\n", "print(f'Data dir: {DATA_DIR.resolve()}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 1. Binance aggTrades — 1M+ filas\n", "\n", "Los `aggTrades` agrupan fills de la misma taker order:\n", "- Cada fila = 1 taker order (o parte si cruzó muchos niveles)\n", "- Campo `a` = aggregate trade ID\n", "- Campo `m` = true si el maker es buyer (taker es seller)\n", "- Paginamos con `fromId` para ir hacia atrás en el tiempo" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async def fetch_binance_agg_trades(\n", " symbol: str = 'BTCUSDT',\n", " target_rows: int = 1_000_000,\n", " batch_size: int = 1000,\n", ") -> pl.DataFrame:\n", " \"\"\"Descarga aggTrades de Binance paginando hacia atrás.\n", "\n", " Cada aggTrade agrupa fills de la misma taker order:\n", " - a: aggregate trade id\n", " - p: price\n", " - q: quantity\n", " - f: first trade id\n", " - l: last trade id\n", " - T: timestamp\n", " - m: was the buyer the maker? (true = taker sold, false = taker bought)\n", " \"\"\"\n", " all_records = []\n", " from_id = None\n", " total = 0\n", " start_time = time.time()\n", "\n", " async with aiohttp.ClientSession() as session:\n", " while total < target_rows:\n", " params = {'symbol': symbol, 'limit': batch_size}\n", " if from_id is not None:\n", " params['fromId'] = from_id\n", "\n", " async with session.get(f'{BINANCE_BASE}/api/v3/aggTrades', params=params) as resp:\n", " if resp.status != 200:\n", " text = await resp.text()\n", " print(f'Error {resp.status}: {text}')\n", " break\n", " data = await resp.json()\n", "\n", " if not data:\n", " break\n", "\n", " for row in data:\n", " all_records.append({\n", " 'agg_trade_id': row['a'],\n", " 'price': float(row['p']),\n", " 'qty': float(row['q']),\n", " 'first_trade_id': row['f'],\n", " 'last_trade_id': row['l'],\n", " 'timestamp': row['T'],\n", " 'is_buyer_maker': row['m'], # True = taker vendió\n", " 'side': 'sell' if row['m'] else 'buy', # taker side\n", " 'n_fills': row['l'] - row['f'] + 1, # fills en esta agg\n", " })\n", "\n", " # Avanzar: siguiente página desde el último ID + 1\n", " from_id = data[-1]['a'] + 1\n", " total += len(data)\n", "\n", " if total % 50_000 == 0:\n", " elapsed = time.time() - start_time\n", " rate = total / elapsed\n", " eta = (target_rows - total) / rate if rate > 0 else 0\n", " ts = datetime.fromtimestamp(data[-1]['T'] / 1000)\n", " print(f' {total:>8,} rows | {rate:,.0f} rows/s | ETA {eta:.0f}s | hasta {ts}')\n", "\n", " # Rate limit: Binance permite 1200 req/min en aggTrades\n", " await asyncio.sleep(0.05)\n", "\n", " elapsed = time.time() - start_time\n", " print(f'\\nDescargados {total:,} aggTrades en {elapsed:.1f}s ({total/elapsed:,.0f} rows/s)')\n", "\n", " return pl.DataFrame(all_records)\n", "\n", "\n", "print('fetch_binance_agg_trades() definida')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Descargar 1M+ aggTrades de BTC/USDT\n", "binance_trades = await fetch_binance_agg_trades('BTCUSDT', target_rows=1_000_000)\n", "\n", "print(f'\\nShape: {binance_trades.shape}')\n", "print(f'Columnas: {binance_trades.columns}')\n", "print(binance_trades.head(5))\n", "print(f'\\nRango temporal:')\n", "t_min = datetime.fromtimestamp(binance_trades['timestamp'].min() / 1000)\n", "t_max = datetime.fromtimestamp(binance_trades['timestamp'].max() / 1000)\n", "print(f' {t_min} → {t_max}')\n", "print(f' Duración: {t_max - t_min}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Guardar Binance aggTrades\n", "out_path = DATA_DIR / 'binance_btcusdt_aggtrades.csv'\n", "binance_trades.write_csv(str(out_path))\n", "size_mb = out_path.stat().st_size / 1024 / 1024\n", "print(f'Guardado: {out_path}')\n", "print(f' {binance_trades.shape[0]:,} filas, {size_mb:.1f} MB')\n", "\n", "# Estadísticas rápidas\n", "print(f'\\nEstadísticas:')\n", "print(f' Buys (taker): {binance_trades.filter(pl.col(\"side\") == \"buy\").shape[0]:,}')\n", "print(f' Sells (taker): {binance_trades.filter(pl.col(\"side\") == \"sell\").shape[0]:,}')\n", "print(f' Precio min: ${binance_trades[\"price\"].min():,.2f}')\n", "print(f' Precio max: ${binance_trades[\"price\"].max():,.2f}')\n", "print(f' Qty mediana: {binance_trades[\"qty\"].median():.6f} BTC')\n", "print(f' Qty max: {binance_trades[\"qty\"].max():.4f} BTC')\n", "print(f' Fills/aggTrade mediana: {binance_trades[\"n_fills\"].median():.0f}')\n", "print(f' Fills/aggTrade max: {binance_trades[\"n_fills\"].max()}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 2. Bitstamp L3 — órdenes individuales via WebSocket\n", "\n", "Cada mensaje tiene:\n", "- `id`: ID único de la orden\n", "- `order_type`: 0 = buy, 1 = sell\n", "- `price`, `amount`\n", "- `datetime`, `microtimestamp`\n", "\n", "Los canales:\n", "- `live_orders_btcusd`: cada orden creada\n", "- `live_trades_btcusd`: cada ejecución con IDs de maker y taker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async def record_bitstamp_l3(\n", " pair: str = 'btcusd',\n", " duration_seconds: int = 300,\n", ") -> tuple[pl.DataFrame, pl.DataFrame]:\n", " \"\"\"Graba datos L3 de Bitstamp via WebSocket.\n", "\n", " Retorna (orders_df, trades_df) con todas las órdenes y trades capturados.\n", " \"\"\"\n", " orders = []\n", " trades = []\n", " start = time.time()\n", " msg_count = 0\n", "\n", " async with websockets.connect(BITSTAMP_WS) as ws:\n", " # Suscribirse a órdenes individuales + trades\n", " for channel in [f'live_orders_{pair}', f'live_trades_{pair}']:\n", " await ws.send(json.dumps({\n", " 'event': 'bts:subscribe',\n", " 'data': {'channel': channel}\n", " }))\n", "\n", " print(f'Grabando Bitstamp L3 ({pair}) por {duration_seconds}s...')\n", "\n", " while time.time() - start < duration_seconds:\n", " try:\n", " raw = await asyncio.wait_for(ws.recv(), timeout=5.0)\n", " msg = json.loads(raw)\n", " msg_count += 1\n", "\n", " event = msg.get('event', '')\n", " channel = msg.get('channel', '')\n", " data = msg.get('data', {})\n", "\n", " if isinstance(data, str):\n", " try:\n", " data = json.loads(data)\n", " except:\n", " continue\n", "\n", " # Órdenes (L3)\n", " if 'live_orders' in channel and event in ('order_created', 'order_changed', 'order_deleted'):\n", " orders.append({\n", " 'event': event,\n", " 'order_id': data.get('id', ''),\n", " 'side': 'buy' if data.get('order_type') == 0 else 'sell',\n", " 'price': float(data.get('price', 0)),\n", " 'amount': float(data.get('amount', 0)),\n", " 'datetime': data.get('datetime', ''),\n", " 'microtimestamp': data.get('microtimestamp', ''),\n", " })\n", "\n", " # Trades\n", " elif 'live_trades' in channel and event == 'trade':\n", " trades.append({\n", " 'trade_id': data.get('id', ''),\n", " 'side': 'buy' if data.get('type') == 0 else 'sell',\n", " 'price': float(data.get('price', 0)),\n", " 'amount': float(data.get('amount', 0)),\n", " 'buy_order_id': data.get('buy_order_id', ''),\n", " 'sell_order_id': data.get('sell_order_id', ''),\n", " 'timestamp': data.get('timestamp', ''),\n", " 'microtimestamp': data.get('microtimestamp', ''),\n", " })\n", "\n", " if msg_count % 5000 == 0:\n", " elapsed = time.time() - start\n", " print(f' {elapsed:.0f}s: {len(orders):,} orders, {len(trades):,} trades ({msg_count:,} msgs)')\n", "\n", " except asyncio.TimeoutError:\n", " continue\n", "\n", " elapsed = time.time() - start\n", " print(f'\\nGrabación terminada: {elapsed:.0f}s')\n", " print(f' Órdenes L3: {len(orders):,}')\n", " print(f' Trades: {len(trades):,}')\n", " print(f' Msgs total: {msg_count:,}')\n", "\n", " orders_df = pl.DataFrame(orders) if orders else pl.DataFrame()\n", " trades_df = pl.DataFrame(trades) if trades else pl.DataFrame()\n", "\n", " return orders_df, trades_df\n", "\n", "\n", "print('record_bitstamp_l3() definida')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Grabar 5 minutos de L3 de Bitstamp\n", "bs_orders, bs_trades = await record_bitstamp_l3('btcusd', duration_seconds=300)\n", "\n", "if bs_orders.shape[0] > 0:\n", " print(f'\\n=== Órdenes L3 ===')\n", " print(f'Shape: {bs_orders.shape}')\n", " print(bs_orders.head(5))\n", " print(f'\\nEventos:')\n", " print(bs_orders.group_by('event').agg(pl.len().alias('count')).sort('count', descending=True))\n", "\n", "if bs_trades.shape[0] > 0:\n", " print(f'\\n=== Trades ===')\n", " print(f'Shape: {bs_trades.shape}')\n", " print(bs_trades.head(5))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Guardar Bitstamp L3\n", "if bs_orders.shape[0] > 0:\n", " path = DATA_DIR / 'bitstamp_btcusd_l3_orders.csv'\n", " bs_orders.write_csv(str(path))\n", " print(f'Guardado: {path} ({bs_orders.shape[0]:,} filas, {path.stat().st_size/1024/1024:.1f} MB)')\n", "\n", "if bs_trades.shape[0] > 0:\n", " path = DATA_DIR / 'bitstamp_btcusd_l3_trades.csv'\n", " bs_trades.write_csv(str(path))\n", " print(f'Guardado: {path} ({bs_trades.shape[0]:,} filas, {path.stat().st_size/1024/1024:.1f} MB)')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 3. Resumen del dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "print('=' * 70)\n", "print(' DATASET RECOLECTADO')\n", "print('=' * 70)\n", "\n", "total_rows = 0\n", "for f in sorted(DATA_DIR.glob('*.csv')):\n", " size_mb = f.stat().st_size / 1024 / 1024\n", " # Contar filas rápido\n", " try:\n", " nrows = pl.scan_csv(str(f)).select(pl.len()).collect().item()\n", " except:\n", " nrows = '?'\n", " total_rows += nrows if isinstance(nrows, int) else 0\n", " print(f' {f.name:<45} {nrows:>10,} filas {size_mb:>7.1f} MB')\n", "\n", "print(f'\\n TOTAL: {total_rows:>10,} filas')\n", "print('=' * 70)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.13.0" } }, "nbformat": 4, "nbformat_minor": 4 }