388 lines
14 KiB
Plaintext
388 lines
14 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Recolección de datos: Binance + Bitstamp L3\n",
|
|
"\n",
|
|
"**Objetivo:** Dataset de 1M+ filas guardado en `data/`\n",
|
|
"\n",
|
|
"| Fuente | Tipo | Método | Qué obtenemos |\n",
|
|
"|---|---|---|---|\n",
|
|
"| Binance | aggTrades (fills agrupados por taker) | REST paginado | 1M+ trades históricos |\n",
|
|
"| Binance | Order book L2 | REST snapshots | Profundidad del libro |\n",
|
|
"| Bitstamp | L3 live_orders | WebSocket | Cada orden individual con ID |"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 1,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Data dir: /home/lucas/fn_registry/analysis/estudio_mercados/data\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"import aiohttp\n",
|
|
"import asyncio\n",
|
|
"import websockets\n",
|
|
"import json\n",
|
|
"import time\n",
|
|
"import polars as pl\n",
|
|
"import numpy as np\n",
|
|
"from datetime import datetime, timedelta\n",
|
|
"from pathlib import Path\n",
|
|
"\n",
|
|
"DATA_DIR = Path('../data')\n",
|
|
"DATA_DIR.mkdir(exist_ok=True)\n",
|
|
"\n",
|
|
"BINANCE_BASE = 'https://api.binance.com'\n",
|
|
"BITSTAMP_WS = 'wss://ws.bitstamp.net'\n",
|
|
"\n",
|
|
"print(f'Data dir: {DATA_DIR.resolve()}')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"---\n",
|
|
"## 1. Binance aggTrades — 1M+ filas\n",
|
|
"\n",
|
|
"Los `aggTrades` agrupan fills de la misma taker order:\n",
|
|
"- Cada fila = 1 taker order (o parte si cruzó muchos niveles)\n",
|
|
"- Campo `a` = aggregate trade ID\n",
|
|
"- Campo `m` = true si el maker es buyer (taker es seller)\n",
|
|
"- Paginamos con `fromId` para ir hacia atrás en el tiempo"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"async def fetch_binance_agg_trades(\n",
|
|
" symbol: str = 'BTCUSDT',\n",
|
|
" target_rows: int = 1_000_000,\n",
|
|
" batch_size: int = 1000,\n",
|
|
") -> pl.DataFrame:\n",
|
|
" \"\"\"Descarga aggTrades de Binance paginando hacia atrás.\n",
|
|
"\n",
|
|
" Cada aggTrade agrupa fills de la misma taker order:\n",
|
|
" - a: aggregate trade id\n",
|
|
" - p: price\n",
|
|
" - q: quantity\n",
|
|
" - f: first trade id\n",
|
|
" - l: last trade id\n",
|
|
" - T: timestamp\n",
|
|
" - m: was the buyer the maker? (true = taker sold, false = taker bought)\n",
|
|
" \"\"\"\n",
|
|
" all_records = []\n",
|
|
" from_id = None\n",
|
|
" total = 0\n",
|
|
" start_time = time.time()\n",
|
|
"\n",
|
|
" async with aiohttp.ClientSession() as session:\n",
|
|
" while total < target_rows:\n",
|
|
" params = {'symbol': symbol, 'limit': batch_size}\n",
|
|
" if from_id is not None:\n",
|
|
" params['fromId'] = from_id\n",
|
|
"\n",
|
|
" async with session.get(f'{BINANCE_BASE}/api/v3/aggTrades', params=params) as resp:\n",
|
|
" if resp.status != 200:\n",
|
|
" text = await resp.text()\n",
|
|
" print(f'Error {resp.status}: {text}')\n",
|
|
" break\n",
|
|
" data = await resp.json()\n",
|
|
"\n",
|
|
" if not data:\n",
|
|
" break\n",
|
|
"\n",
|
|
" for row in data:\n",
|
|
" all_records.append({\n",
|
|
" 'agg_trade_id': row['a'],\n",
|
|
" 'price': float(row['p']),\n",
|
|
" 'qty': float(row['q']),\n",
|
|
" 'first_trade_id': row['f'],\n",
|
|
" 'last_trade_id': row['l'],\n",
|
|
" 'timestamp': row['T'],\n",
|
|
" 'is_buyer_maker': row['m'], # True = taker vendió\n",
|
|
" 'side': 'sell' if row['m'] else 'buy', # taker side\n",
|
|
" 'n_fills': row['l'] - row['f'] + 1, # fills en esta agg\n",
|
|
" })\n",
|
|
"\n",
|
|
" # Avanzar: siguiente página desde el último ID + 1\n",
|
|
" from_id = data[-1]['a'] + 1\n",
|
|
" total += len(data)\n",
|
|
"\n",
|
|
" if total % 50_000 == 0:\n",
|
|
" elapsed = time.time() - start_time\n",
|
|
" rate = total / elapsed\n",
|
|
" eta = (target_rows - total) / rate if rate > 0 else 0\n",
|
|
" ts = datetime.fromtimestamp(data[-1]['T'] / 1000)\n",
|
|
" print(f' {total:>8,} rows | {rate:,.0f} rows/s | ETA {eta:.0f}s | hasta {ts}')\n",
|
|
"\n",
|
|
" # Rate limit: Binance permite 1200 req/min en aggTrades\n",
|
|
" await asyncio.sleep(0.05)\n",
|
|
"\n",
|
|
" elapsed = time.time() - start_time\n",
|
|
" print(f'\\nDescargados {total:,} aggTrades en {elapsed:.1f}s ({total/elapsed:,.0f} rows/s)')\n",
|
|
"\n",
|
|
" return pl.DataFrame(all_records)\n",
|
|
"\n",
|
|
"\n",
|
|
"print('fetch_binance_agg_trades() definida')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Descargar 1M+ aggTrades de BTC/USDT\n",
|
|
"binance_trades = await fetch_binance_agg_trades('BTCUSDT', target_rows=1_000_000)\n",
|
|
"\n",
|
|
"print(f'\\nShape: {binance_trades.shape}')\n",
|
|
"print(f'Columnas: {binance_trades.columns}')\n",
|
|
"print(binance_trades.head(5))\n",
|
|
"print(f'\\nRango temporal:')\n",
|
|
"t_min = datetime.fromtimestamp(binance_trades['timestamp'].min() / 1000)\n",
|
|
"t_max = datetime.fromtimestamp(binance_trades['timestamp'].max() / 1000)\n",
|
|
"print(f' {t_min} → {t_max}')\n",
|
|
"print(f' Duración: {t_max - t_min}')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Guardar Binance aggTrades\n",
|
|
"out_path = DATA_DIR / 'binance_btcusdt_aggtrades.csv'\n",
|
|
"binance_trades.write_csv(str(out_path))\n",
|
|
"size_mb = out_path.stat().st_size / 1024 / 1024\n",
|
|
"print(f'Guardado: {out_path}')\n",
|
|
"print(f' {binance_trades.shape[0]:,} filas, {size_mb:.1f} MB')\n",
|
|
"\n",
|
|
"# Estadísticas rápidas\n",
|
|
"print(f'\\nEstadísticas:')\n",
|
|
"print(f' Buys (taker): {binance_trades.filter(pl.col(\"side\") == \"buy\").shape[0]:,}')\n",
|
|
"print(f' Sells (taker): {binance_trades.filter(pl.col(\"side\") == \"sell\").shape[0]:,}')\n",
|
|
"print(f' Precio min: ${binance_trades[\"price\"].min():,.2f}')\n",
|
|
"print(f' Precio max: ${binance_trades[\"price\"].max():,.2f}')\n",
|
|
"print(f' Qty mediana: {binance_trades[\"qty\"].median():.6f} BTC')\n",
|
|
"print(f' Qty max: {binance_trades[\"qty\"].max():.4f} BTC')\n",
|
|
"print(f' Fills/aggTrade mediana: {binance_trades[\"n_fills\"].median():.0f}')\n",
|
|
"print(f' Fills/aggTrade max: {binance_trades[\"n_fills\"].max()}')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"---\n",
|
|
"## 2. Bitstamp L3 — órdenes individuales via WebSocket\n",
|
|
"\n",
|
|
"Cada mensaje tiene:\n",
|
|
"- `id`: ID único de la orden\n",
|
|
"- `order_type`: 0 = buy, 1 = sell\n",
|
|
"- `price`, `amount`\n",
|
|
"- `datetime`, `microtimestamp`\n",
|
|
"\n",
|
|
"Los canales:\n",
|
|
"- `live_orders_btcusd`: cada orden creada\n",
|
|
"- `live_trades_btcusd`: cada ejecución con IDs de maker y taker"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"async def record_bitstamp_l3(\n",
|
|
" pair: str = 'btcusd',\n",
|
|
" duration_seconds: int = 300,\n",
|
|
") -> tuple[pl.DataFrame, pl.DataFrame]:\n",
|
|
" \"\"\"Graba datos L3 de Bitstamp via WebSocket.\n",
|
|
"\n",
|
|
" Retorna (orders_df, trades_df) con todas las órdenes y trades capturados.\n",
|
|
" \"\"\"\n",
|
|
" orders = []\n",
|
|
" trades = []\n",
|
|
" start = time.time()\n",
|
|
" msg_count = 0\n",
|
|
"\n",
|
|
" async with websockets.connect(BITSTAMP_WS) as ws:\n",
|
|
" # Suscribirse a órdenes individuales + trades\n",
|
|
" for channel in [f'live_orders_{pair}', f'live_trades_{pair}']:\n",
|
|
" await ws.send(json.dumps({\n",
|
|
" 'event': 'bts:subscribe',\n",
|
|
" 'data': {'channel': channel}\n",
|
|
" }))\n",
|
|
"\n",
|
|
" print(f'Grabando Bitstamp L3 ({pair}) por {duration_seconds}s...')\n",
|
|
"\n",
|
|
" while time.time() - start < duration_seconds:\n",
|
|
" try:\n",
|
|
" raw = await asyncio.wait_for(ws.recv(), timeout=5.0)\n",
|
|
" msg = json.loads(raw)\n",
|
|
" msg_count += 1\n",
|
|
"\n",
|
|
" event = msg.get('event', '')\n",
|
|
" channel = msg.get('channel', '')\n",
|
|
" data = msg.get('data', {})\n",
|
|
"\n",
|
|
" if isinstance(data, str):\n",
|
|
" try:\n",
|
|
" data = json.loads(data)\n",
|
|
" except:\n",
|
|
" continue\n",
|
|
"\n",
|
|
" # Órdenes (L3)\n",
|
|
" if 'live_orders' in channel and event in ('order_created', 'order_changed', 'order_deleted'):\n",
|
|
" orders.append({\n",
|
|
" 'event': event,\n",
|
|
" 'order_id': data.get('id', ''),\n",
|
|
" 'side': 'buy' if data.get('order_type') == 0 else 'sell',\n",
|
|
" 'price': float(data.get('price', 0)),\n",
|
|
" 'amount': float(data.get('amount', 0)),\n",
|
|
" 'datetime': data.get('datetime', ''),\n",
|
|
" 'microtimestamp': data.get('microtimestamp', ''),\n",
|
|
" })\n",
|
|
"\n",
|
|
" # Trades\n",
|
|
" elif 'live_trades' in channel and event == 'trade':\n",
|
|
" trades.append({\n",
|
|
" 'trade_id': data.get('id', ''),\n",
|
|
" 'side': 'buy' if data.get('type') == 0 else 'sell',\n",
|
|
" 'price': float(data.get('price', 0)),\n",
|
|
" 'amount': float(data.get('amount', 0)),\n",
|
|
" 'buy_order_id': data.get('buy_order_id', ''),\n",
|
|
" 'sell_order_id': data.get('sell_order_id', ''),\n",
|
|
" 'timestamp': data.get('timestamp', ''),\n",
|
|
" 'microtimestamp': data.get('microtimestamp', ''),\n",
|
|
" })\n",
|
|
"\n",
|
|
" if msg_count % 5000 == 0:\n",
|
|
" elapsed = time.time() - start\n",
|
|
" print(f' {elapsed:.0f}s: {len(orders):,} orders, {len(trades):,} trades ({msg_count:,} msgs)')\n",
|
|
"\n",
|
|
" except asyncio.TimeoutError:\n",
|
|
" continue\n",
|
|
"\n",
|
|
" elapsed = time.time() - start\n",
|
|
" print(f'\\nGrabación terminada: {elapsed:.0f}s')\n",
|
|
" print(f' Órdenes L3: {len(orders):,}')\n",
|
|
" print(f' Trades: {len(trades):,}')\n",
|
|
" print(f' Msgs total: {msg_count:,}')\n",
|
|
"\n",
|
|
" orders_df = pl.DataFrame(orders) if orders else pl.DataFrame()\n",
|
|
" trades_df = pl.DataFrame(trades) if trades else pl.DataFrame()\n",
|
|
"\n",
|
|
" return orders_df, trades_df\n",
|
|
"\n",
|
|
"\n",
|
|
"print('record_bitstamp_l3() definida')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Grabar 5 minutos de L3 de Bitstamp\n",
|
|
"bs_orders, bs_trades = await record_bitstamp_l3('btcusd', duration_seconds=300)\n",
|
|
"\n",
|
|
"if bs_orders.shape[0] > 0:\n",
|
|
" print(f'\\n=== Órdenes L3 ===')\n",
|
|
" print(f'Shape: {bs_orders.shape}')\n",
|
|
" print(bs_orders.head(5))\n",
|
|
" print(f'\\nEventos:')\n",
|
|
" print(bs_orders.group_by('event').agg(pl.len().alias('count')).sort('count', descending=True))\n",
|
|
"\n",
|
|
"if bs_trades.shape[0] > 0:\n",
|
|
" print(f'\\n=== Trades ===')\n",
|
|
" print(f'Shape: {bs_trades.shape}')\n",
|
|
" print(bs_trades.head(5))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Guardar Bitstamp L3\n",
|
|
"if bs_orders.shape[0] > 0:\n",
|
|
" path = DATA_DIR / 'bitstamp_btcusd_l3_orders.csv'\n",
|
|
" bs_orders.write_csv(str(path))\n",
|
|
" print(f'Guardado: {path} ({bs_orders.shape[0]:,} filas, {path.stat().st_size/1024/1024:.1f} MB)')\n",
|
|
"\n",
|
|
"if bs_trades.shape[0] > 0:\n",
|
|
" path = DATA_DIR / 'bitstamp_btcusd_l3_trades.csv'\n",
|
|
" bs_trades.write_csv(str(path))\n",
|
|
" print(f'Guardado: {path} ({bs_trades.shape[0]:,} filas, {path.stat().st_size/1024/1024:.1f} MB)')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"---\n",
|
|
"## 3. Resumen del dataset"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import os\n",
|
|
"\n",
|
|
"print('=' * 70)\n",
|
|
"print(' DATASET RECOLECTADO')\n",
|
|
"print('=' * 70)\n",
|
|
"\n",
|
|
"total_rows = 0\n",
|
|
"for f in sorted(DATA_DIR.glob('*.csv')):\n",
|
|
" size_mb = f.stat().st_size / 1024 / 1024\n",
|
|
" # Contar filas rápido\n",
|
|
" try:\n",
|
|
" nrows = pl.scan_csv(str(f)).select(pl.len()).collect().item()\n",
|
|
" except:\n",
|
|
" nrows = '?'\n",
|
|
" total_rows += nrows if isinstance(nrows, int) else 0\n",
|
|
" print(f' {f.name:<45} {nrows:>10,} filas {size_mb:>7.1f} MB')\n",
|
|
"\n",
|
|
"print(f'\\n TOTAL: {total_rows:>10,} filas')\n",
|
|
"print('=' * 70)"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"name": "python",
|
|
"version": "3.13.0"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 4
|
|
}
|