feat: funciones Python datascience, finance, cybersecurity y pipelines

Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-05 17:11:32 +02:00
parent 25a392df48
commit 63a9cb5273
62 changed files with 5376 additions and 0 deletions
@@ -0,0 +1,123 @@
---
name: extraction_pipeline
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def extraction_pipeline(file_path: str, entity_presets: list[dict], relation_types: list[str], llm_chat_json: Callable[[list[dict]], dict], chunk_size: int = 500, chunk_overlap: int = 50, confidence_threshold: float = 0.5, dedup_threshold: float = 0.85, on_progress: Callable[[str, float], None] | None = None) -> ExtractionResult"
description: "Pipeline completa de extraccion de entidades y relaciones desde un documento. Orquesta extract_text_from_file -> preprocess_text -> split_text_into_chunks -> extract_entities_llm por chunk -> deduplicate_entities -> extract_relations_llm por chunk -> deduplicate_relations."
tags: [pipeline, extraction, entities, relations, llm, nlp, fuzzygraph, datascience]
uses_functions:
- extract_text_from_file_py_core
- preprocess_text_py_core
- split_text_into_chunks_py_core
- build_entity_schema_prompt_py_datascience
- build_relation_schema_prompt_py_datascience
- extract_entities_llm_py_datascience
- extract_relations_llm_py_datascience
- deduplicate_entities_py_datascience
- deduplicate_relations_py_datascience
uses_types:
- entity_candidate_py_datascience
- extraction_result_py_datascience
- extraction_stats_py_datascience
- relation_candidate_py_datascience
returns:
- extraction_result_py_datascience
returns_optional: false
error_type: "error_go_core"
imports:
- time
- warnings
- typing.Callable
tested: true
tests:
- "documento con entidades y relaciones retorna ExtractionResult completo"
- "documento vacio retorna ExtractionResult con listas vacias"
- "documento sin entidades detectables retorna listas vacias"
- "archivo no encontrado lanza FileNotFoundError"
- "entity presets vacio lanza ValueError"
- "progress callback se invoca durante la ejecucion"
- "stats se rellenan correctamente con conteos y tiempo"
test_file_path: "python/functions/pipelines/extraction_pipeline_test.py"
file_path: "python/functions/pipelines/extraction_pipeline.py"
---
## Ejemplo
```python
from python.functions.pipelines.extraction_pipeline import extraction_pipeline
entity_presets = [
{
"type_ref": "osint_person_go_cybersecurity",
"label": "Person",
"metadata_fields": ["full_name", "alias", "nationality"],
},
{
"type_ref": "osint_domain_go_cybersecurity",
"label": "Domain",
"metadata_fields": ["fqdn", "registrar"],
},
]
relation_types = ["operates", "owns", "funds", "communicates_with", "related_to"]
# Inyectar un cliente LLM real
def llm_chat_json(messages):
# llamada al proveedor LLM elegido
...
result = extraction_pipeline(
file_path="report.pdf",
entity_presets=entity_presets,
relation_types=relation_types,
llm_chat_json=llm_chat_json,
chunk_size=500,
chunk_overlap=50,
confidence_threshold=0.5,
dedup_threshold=0.85,
on_progress=lambda msg, pct: print(f"[{pct:.0%}] {msg}"),
)
print(f"Entities: {len(result.entities)}, Relations: {len(result.relations)}")
print(f"Stats: {result.stats}")
# Integrar con fuzzygraph / operations.db
for entity in result.entities:
db.add_entity(
name=entity.name,
type_ref=entity.type_ref,
metadata=entity.attributes,
)
for relation in result.relations:
db.add_relation(
name=relation.relation_type,
from_entity=relation.from_id,
to_entity=relation.to_id,
)
```
## Algoritmo
1. **Extract:** `extract_text_from_file(file_path)` — texto crudo desde PDF, TXT, Markdown
2. **Preprocess:** `preprocess_text(text)` — normaliza espacios, caracteres especiales
3. **Split:** `split_text_into_chunks(text, chunk_size, chunk_overlap)` — divide en ventanas solapadas
4. **Extract entities per chunk (0-40%):** Para cada chunk llama `extract_entities_llm` con el schema de presets. Anota `source_chunk_index` en cada candidato
5. **Filter:** filtra por `confidence >= confidence_threshold`
6. **Deduplicate entities (40%):** `deduplicate_entities` con fuzzy matching, produce `entity_id_map`
7. **Extract relations per chunk (40-80%):** Para cada chunk obtiene las entidades de ese chunk y llama `extract_relations_llm`
8. **Deduplicate relations (80-100%):** `deduplicate_relations` resuelve nombres a IDs y colapsa duplicados
9. **Return:** `ExtractionResult` con entidades, relaciones y stats del proceso
## Notas
- El parametro `llm_chat_json` inyecta el cliente LLM, sin acoplamiento a ningun proveedor (OpenAI, Anthropic, Ollama, etc.)
- El progress callback cubre: 0-40% extraccion de entidades, 40-80% extraccion de relaciones, 80-100% deduplicacion
- Si el archivo no existe lanza `FileNotFoundError` antes de cualquier llamada al LLM
- Si `entity_presets` esta vacio lanza `ValueError`
- Errores en chunks individuales se capturan con warnings y continuan (robustez)
- Los `entity_id_map` de `deduplicate_entities` conectan nombres originales del texto con IDs UUID finales para `deduplicate_relations`
- La retorna `ExtractionResult` esta lista para insertar en `operations.db` via `fn ops entity add` / `fn ops relation add`
@@ -0,0 +1,211 @@
"""Pipeline de extraccion de entidades y relaciones desde un documento."""
from __future__ import annotations
import sys
import os
import time
import warnings
from typing import Callable
# Soporte para ejecucion desde la raiz del registry o desde el directorio del archivo
_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from python.functions.core.extract_text_from_file import extract_text_from_file
from python.functions.core.core import preprocess_text
from python.functions.core.split_text_into_chunks import split_text_into_chunks
from python.functions.datascience.build_entity_schema_prompt import build_entity_schema_prompt
from python.functions.datascience.build_relation_schema_prompt import build_relation_schema_prompt
from python.functions.datascience.extract_entities_llm import extract_entities_llm
from python.functions.datascience.extract_relations_llm import extract_relations_llm
from python.functions.datascience.deduplicate_entities import deduplicate_entities
from python.functions.datascience.deduplicate_relations import deduplicate_relations
from python.types.datascience.entity_candidate import EntityCandidate
from python.types.datascience.extraction_result import ExtractionResult
from python.types.datascience.extraction_stats import ExtractionStats
def extraction_pipeline(
file_path: str,
entity_presets: list[dict],
relation_types: list[str],
llm_chat_json: Callable[[list[dict]], dict],
chunk_size: int = 500,
chunk_overlap: int = 50,
confidence_threshold: float = 0.5,
dedup_threshold: float = 0.85,
on_progress: Callable[[str, float], None] | None = None,
) -> ExtractionResult:
"""Pipeline completa de extraccion de entidades y relaciones desde un documento.
Orquesta extract_text_from_file -> preprocess_text -> split_text_into_chunks
-> extract_entities_llm por chunk -> deduplicate_entities ->
extract_relations_llm por chunk -> deduplicate_relations.
Args:
file_path: ruta al archivo a procesar (PDF, Markdown, TXT).
entity_presets: lista de dicts con type_ref, label y metadata_fields.
Ejemplo: [{"type_ref": "osint_person_go_cybersecurity",
"label": "Person",
"metadata_fields": ["full_name", "nationality"]}]
relation_types: tipos de relacion permitidos para extraccion.
Ejemplo: ["funds", "employs", "communicates_with", "owns"]
llm_chat_json: funcion inyectada que recibe messages OpenAI y retorna dict
con la respuesta JSON ya parseada. Sin acoplamiento a ningun proveedor.
chunk_size: numero de caracteres por chunk (default 500).
chunk_overlap: overlap entre chunks consecutivos (default 50).
confidence_threshold: umbral minimo de confidence para aceptar entidades
candidatas antes de deduplicar (default 0.5).
dedup_threshold: score minimo de similitud para mergear entidades (default 0.85).
on_progress: callback opcional de progreso (message: str, pct: float 0-1).
0-40%: extraccion de entidades, 40-80%: extraccion de relaciones,
80-100%: deduplicacion.
Returns:
ExtractionResult con entidades y relaciones deduplicadas y stats del proceso.
Raises:
FileNotFoundError: si file_path no existe.
ValueError: si entity_presets esta vacio.
"""
if not entity_presets:
raise ValueError("entity_presets no puede estar vacio")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Archivo no encontrado: {file_path}")
def _progress(msg: str, pct: float) -> None:
if on_progress is not None:
try:
on_progress(msg, pct)
except Exception:
pass
start_time = time.monotonic()
stats = ExtractionStats()
# ── Paso 1: Extraer texto ──────────────────────────────────────────────────
_progress("Extracting text from file...", 0.0)
try:
raw_text = extract_text_from_file(file_path)
except Exception as exc:
warnings.warn(f"extraction_pipeline: error al extraer texto: {exc}")
raw_text = ""
# ── Paso 2: Preprocesar ────────────────────────────────────────────────────
clean_text = preprocess_text(raw_text)
stats.total_chars = len(clean_text)
# ── Paso 3: Dividir en chunks ──────────────────────────────────────────────
chunks = split_text_into_chunks(clean_text, chunk_size=chunk_size, overlap=chunk_overlap)
n = len(chunks)
stats.total_chunks = n
if n == 0:
stats.processing_time_seconds = time.monotonic() - start_time
return ExtractionResult(entities=[], relations=[], stats=stats)
# ── Paso 4: Extraer entidades por chunk ────────────────────────────────────
all_raw_entities: list[EntityCandidate] = []
for i, chunk in enumerate(chunks):
_progress(f"Extracting entities from chunk {i + 1}/{n}", (i / n) * 0.4)
try:
candidates = extract_entities_llm(
text=chunk,
entity_schema=entity_presets,
llm_chat_json=llm_chat_json,
)
except Exception as exc:
warnings.warn(
f"extraction_pipeline: error en extract_entities_llm chunk {i}: {exc}"
)
candidates = []
for candidate in candidates:
# Anotar el chunk de origen
if i not in candidate.source_chunk_indices:
candidate.source_chunk_indices.append(i)
all_raw_entities.append(candidate)
# ── Paso 5: Filtrar por confidence ─────────────────────────────────────────
filtered_entities = [
e for e in all_raw_entities if e.confidence >= confidence_threshold
]
stats.raw_entities_count = len(filtered_entities)
# Actualizar stats de tipos
for ent in filtered_entities:
stats.entity_types_found[ent.type_ref] = (
stats.entity_types_found.get(ent.type_ref, 0) + 1
)
# ── Paso 6: Deduplicar entidades ───────────────────────────────────────────
_progress("Deduplicating entities...", 0.4)
dedup_result = deduplicate_entities(filtered_entities, name_threshold=dedup_threshold)
stats.final_entities_count = dedup_result.total_after
stats.entities_merged = dedup_result.total_before - dedup_result.total_after
final_entities = dedup_result.entities
entity_id_map = dedup_result.name_to_id # nombre_original -> entity_id
# ── Paso 7: Extraer relaciones por chunk ───────────────────────────────────
all_raw_relations = []
for i, chunk in enumerate(chunks):
_progress(f"Extracting relations...", 0.4 + (i / n) * 0.4)
# Obtener entidades relevantes de este chunk
chunk_entities = [
e for e in final_entities if i in e.source_chunk_indices
]
# Si no hay entidades en este chunk especifico, usar todas
if not chunk_entities:
chunk_entities = final_entities
if len(chunk_entities) < 2:
continue
try:
chunk_relations = extract_relations_llm(
text=chunk,
entities=chunk_entities,
relation_types=relation_types,
llm_chat_json=llm_chat_json,
)
except Exception as exc:
warnings.warn(
f"extraction_pipeline: error en extract_relations_llm chunk {i}: {exc}"
)
chunk_relations = []
for rel in chunk_relations:
rel.source_chunk_index = i
all_raw_relations.extend(chunk_relations)
stats.raw_relations_count = len(all_raw_relations)
# Actualizar stats de tipos de relacion
for rel in all_raw_relations:
stats.relation_types_found[rel.relation_type] = (
stats.relation_types_found.get(rel.relation_type, 0) + 1
)
# ── Paso 8: Deduplicar relaciones ──────────────────────────────────────────
_progress("Deduplicating relations...", 0.8)
final_relations = deduplicate_relations(all_raw_relations, entity_id_map)
stats.final_relations_count = len(final_relations)
stats.relations_merged = stats.raw_relations_count - len(final_relations)
stats.processing_time_seconds = time.monotonic() - start_time
_progress("Done", 1.0)
return ExtractionResult(
entities=final_entities,
relations=final_relations,
stats=stats,
)
@@ -0,0 +1,227 @@
"""Tests para extraction_pipeline."""
from __future__ import annotations
import os
import sys
import tempfile
_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from python.functions.pipelines.extraction_pipeline import extraction_pipeline
# ── LLM stubs ─────────────────────────────────────────────────────────────────
def _llm_with_entities(messages: list[dict]) -> dict:
"""LLM stub que retorna entidades fijas para el primer mensaje de extraccion."""
system_content = messages[0]["content"] if messages else ""
if "entity" in system_content.lower() or "entities" in system_content.lower():
return {
"entities": [
{
"name": "John Smith",
"type_ref": "osint_person_go_cybersecurity",
"attributes": {"full_name": "John Smith", "nationality": "US"},
"confidence": 0.95,
},
{
"name": "evil-corp.com",
"type_ref": "osint_domain_go_cybersecurity",
"attributes": {"fqdn": "evil-corp.com"},
"confidence": 0.88,
},
]
}
# Llamada de relaciones
return {
"relations": [
{
"from_name": "John Smith",
"to_name": "evil-corp.com",
"relation_type": "operates",
"description": "John Smith operates evil-corp.com",
"confidence": 0.8,
}
]
}
def _llm_empty(messages: list[dict]) -> dict:
"""LLM stub que retorna siempre resultado vacio."""
system_content = messages[0]["content"] if messages else ""
if "entit" in system_content.lower():
return {"entities": []}
return {"relations": []}
ENTITY_PRESETS = [
{
"type_ref": "osint_person_go_cybersecurity",
"label": "Person",
"metadata_fields": ["full_name", "alias", "nationality"],
},
{
"type_ref": "osint_domain_go_cybersecurity",
"label": "Domain",
"metadata_fields": ["fqdn", "registrar"],
},
]
RELATION_TYPES = ["operates", "owns", "funds", "communicates_with", "related_to"]
# ── Tests ──────────────────────────────────────────────────────────────────────
def test_documento_con_entidades_y_relaciones():
"""documento con entidades y relaciones retorna ExtractionResult completo"""
text = (
"John Smith, a US national, operates the domain evil-corp.com. "
"He was identified as the main administrator of the infrastructure."
)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(text)
tmp_path = f.name
try:
result = extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_with_entities,
chunk_size=500,
chunk_overlap=50,
confidence_threshold=0.5,
dedup_threshold=0.85,
)
assert result is not None
assert len(result.entities) >= 1
assert result.stats.total_chunks >= 1
assert result.stats.total_chars > 0
finally:
os.unlink(tmp_path)
def test_documento_vacio():
"""documento vacio retorna ExtractionResult con listas vacias"""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write("")
tmp_path = f.name
try:
result = extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_empty,
)
assert result is not None
assert result.entities == []
assert result.relations == []
assert result.stats.total_chunks == 0
finally:
os.unlink(tmp_path)
def test_documento_sin_entidades_detectables():
"""documento sin entidades detectables retorna listas vacias"""
text = "The weather is nice today. The sun shines brightly over the mountains."
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(text)
tmp_path = f.name
try:
result = extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_empty,
confidence_threshold=0.5,
)
assert result is not None
assert result.entities == []
assert result.relations == []
assert result.stats.raw_entities_count == 0
finally:
os.unlink(tmp_path)
def test_archivo_no_encontrado_lanza_filenotfounderror():
"""archivo no encontrado lanza FileNotFoundError"""
import pytest
with pytest.raises(FileNotFoundError):
extraction_pipeline(
file_path="/tmp/no_existe_para_test_extraccion_pipeline.txt",
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_empty,
)
def test_entity_presets_vacio_lanza_valueerror():
"""entity presets vacio lanza ValueError"""
import pytest
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write("some text")
tmp_path = f.name
try:
with pytest.raises(ValueError):
extraction_pipeline(
file_path=tmp_path,
entity_presets=[],
relation_types=RELATION_TYPES,
llm_chat_json=_llm_empty,
)
finally:
os.unlink(tmp_path)
def test_progress_callback_se_invoca():
"""progress callback se invoca durante la ejecucion"""
calls: list[tuple[str, float]] = []
def _on_progress(msg: str, pct: float) -> None:
calls.append((msg, pct))
text = "John Smith operates evil-corp.com."
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(text)
tmp_path = f.name
try:
extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_with_entities,
on_progress=_on_progress,
)
assert len(calls) > 0
messages = [c[0] for c in calls]
assert any("Extracting" in m or "Done" in m or "Dedup" in m for m in messages)
finally:
os.unlink(tmp_path)
def test_stats_se_rellenan_correctamente():
"""stats se rellenan correctamente con conteos y tiempo"""
text = "John Smith, a US national, operates the domain evil-corp.com."
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(text)
tmp_path = f.name
try:
result = extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_with_entities,
)
assert result.stats.total_chars > 0
assert result.stats.total_chunks >= 1
assert result.stats.processing_time_seconds >= 0.0
finally:
os.unlink(tmp_path)
@@ -0,0 +1,74 @@
---
name: monte_carlo_market
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def monte_carlo_market(n_simulations: int, base_params: dict, vary_params: dict, seed_start: int) -> list[dict]"
description: "Ejecuta N simulaciones de mercado con parámetros variados uniformemente. Cada simulación usa run_market_sim y retorna métricas resumen: spreads, trades por tick, volatilidad realizada y PnL total de makers."
tags: [montecarlo, simulation, market, launcher, finance, microstructure]
uses_functions:
- run_market_sim_py_pipelines
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [numpy]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/monte_carlo_market.py"
---
## Ejemplo
```bash
# 10 simulaciones con sigma y gamma variables
python python/functions/pipelines/monte_carlo_market.py -n 10
```
```python
from monte_carlo_market import monte_carlo_market
results = monte_carlo_market(
n_simulations=50,
base_params={'n_ticks': 300, 'n_makers': 3},
vary_params={
'sigma': (0.005, 0.05),
'gamma': (0.01, 1.0),
'hawkes_alpha': (0.1, 0.9),
},
seed_start=42,
)
# Cada resultado tiene: sim_id, seed, sigma, gamma, hawkes_alpha,
# total_trades, mean_spread, std_spread, mean_trades_per_tick,
# price_return, maker_total_pnl, realized_vol
```
## Flujo
1. Para cada simulación i en range(n_simulations):
- Tomar `base_params` + `seed = seed_start + i`
- Samplear `vary_params` uniformemente con rng derivado de `seed_start`
- Llamar `run_market_sim(**params)`
- Calcular métricas resumen sobre el resultado
2. Reportar progreso cada 10% de simulaciones
3. Retornar lista de dicts con params usados + métricas
## Métricas por simulación
| Campo | Descripción |
|---|---|
| `total_trades` | Número total de trades en la simulación |
| `mean_spread` | Spread bid-ask medio |
| `std_spread` | Desviación estándar del spread |
| `mean_trades_per_tick` | Intensidad media del flujo de órdenes |
| `price_return` | Retorno % del precio fundamental |
| `maker_total_pnl` | PnL agregado de todos los makers |
| `realized_vol` | Volatilidad realizada de los trade prices (si hay trades) |
## Notas
`vary_params` acepta cualquier parámetro válido de `run_market_sim` como clave, con valor `(min, max)`.
Los parámetros en `base_params` tienen precedencia sobre los defaults pero son sobreescritos por `vary_params`.
@@ -0,0 +1,91 @@
"""Ejecuta N simulaciones de mercado con parámetros variables para análisis Monte Carlo."""
import sys
import os
import json
def monte_carlo_market(
n_simulations: int = 100,
base_params: dict | None = None,
vary_params: dict | None = None,
seed_start: int = 0,
) -> list[dict]:
"""Ejecuta N simulaciones variando parámetros.
base_params: parámetros fijos para run_market_sim
vary_params: dict de param_name -> (min, max) para variar uniformemente
Retorna lista de dicts, cada uno con los params usados + métricas resumen.
"""
import numpy as np
sys.path.insert(0, os.path.join(os.environ.get('FN_REGISTRY_ROOT', os.path.expanduser('~/fn_registry')), 'python', 'functions'))
sys.path.insert(0, os.path.join(os.environ.get('FN_REGISTRY_ROOT', os.path.expanduser('~/fn_registry')), 'python', 'functions', 'pipelines'))
from run_market_sim import run_market_sim
if base_params is None:
base_params = {}
if vary_params is None:
vary_params = {}
rng = np.random.default_rng(seed_start)
results = []
for i in range(n_simulations):
params = dict(base_params)
params['seed'] = seed_start + i
# Variar parámetros
varied = {}
for pname, (pmin, pmax) in vary_params.items():
val = rng.uniform(pmin, pmax)
params[pname] = round(val, 6)
varied[pname] = params[pname]
sim = run_market_sim(**params)
# Métricas resumen
spreads = sim['spreads']
trade_prices = sim['trade_prices']
n_per_tick = sim['n_trades_per_tick']
result = {
'sim_id': i,
'seed': params['seed'],
**varied,
'total_trades': sim['total_trades'],
'mean_spread': round(np.mean(spreads), 6) if spreads else 0,
'std_spread': round(np.std(spreads), 6) if spreads else 0,
'mean_trades_per_tick': round(np.mean(n_per_tick), 2),
'price_return': round((sim['fundamental_prices'][-1] / sim['fundamental_prices'][0] - 1) * 100, 4),
'maker_total_pnl': round(sum(sim['maker_pnls']), 2),
}
if trade_prices:
tp = np.array(trade_prices)
log_ret = np.diff(np.log(tp[tp > 0]))
if len(log_ret) > 1:
result['realized_vol'] = round(float(np.std(log_ret)), 6)
results.append(result)
if (i + 1) % max(1, n_simulations // 10) == 0:
print(f' {i+1}/{n_simulations} simulaciones completadas')
return results
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-n', type=int, default=10)
args = parser.parse_args()
results = monte_carlo_market(
n_simulations=args.n,
base_params={'n_ticks': 200},
vary_params={'sigma': (0.005, 0.05), 'gamma': (0.01, 1.0)},
)
print(json.dumps(results[-1], indent=2))
print(f'\n{len(results)} simulaciones completadas')
@@ -0,0 +1,65 @@
---
name: run_market_sim
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def run_market_sim(initial_price: float, n_ticks: int, sigma: float, mu: float, jump_intensity: float, jump_size_std: float, n_makers: int, maker_spread: float, gamma: float, maker_levels: int, maker_qty: float, n_takers_lambda: float, taker_size_alpha: float, taker_size_min: float, taker_size_max: float, hawkes_alpha: float, hawkes_beta: float, seed: int) -> dict"
description: "Simula un mercado completo con matching engine FIFO. Makers usan Avellaneda-Stoikov, takers llegan según proceso Hawkes con tamaños power-law. Retorna trades, spreads, midprices y PnL de makers."
tags: [simulation, market, matching-engine, montecarlo, launcher, finance, microstructure]
uses_functions:
- generate_gbm_prices_py_finance
- avellaneda_stoikov_quotes_py_finance
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [numpy]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/pipelines/run_market_sim.py"
---
## Ejemplo
```bash
python python/functions/pipelines/run_market_sim.py
# {
# "total_trades": 1234,
# "mean_spread": 0.4821,
# "maker_pnls": [12.5, -3.2, 8.1, 5.6, -1.4]
# }
```
```python
from run_market_sim import run_market_sim
result = run_market_sim(
initial_price=100.0,
n_ticks=200,
sigma=0.01,
n_makers=3,
seed=0,
)
print(result['total_trades'])
print(result['maker_pnls'])
```
## Flujo
1. `generate_gbm_prices` — genera la serie de precios fundamentales con GBM + saltos
2. Loop por ticks:
- Cada maker coloca quotes via `avellaneda_stoikov_quotes`
- Takers llegan según Poisson con intensidad modulada por excitación Hawkes
- Tamaños de taker siguen distribución Pareto (power-law)
- Matching FIFO sobre el order book simplificado
- Excitación Hawkes decae exponencialmente entre ticks
3. Mark-to-market final de inventarios de makers
## Notas
Los parámetros Hawkes (`hawkes_alpha`, `hawkes_beta`) controlan la autocorrelación del flujo de órdenes.
`branching_ratio = hawkes_alpha / hawkes_beta`; si > 1, el proceso es explosivo.
El matching es simplificado: no hay cancelaciones intra-tick, el book se reconstituye en cada tick.
@@ -0,0 +1,149 @@
"""Ejecuta una simulación de mercado completa con matching engine FIFO."""
import sys
import os
import json
def run_market_sim(
initial_price: float = 100.0,
n_ticks: int = 500,
sigma: float = 0.02,
mu: float = 0.0,
jump_intensity: float = 0.02,
jump_size_std: float = 0.05,
n_makers: int = 5,
maker_spread: float = 0.5,
gamma: float = 0.1,
maker_levels: int = 3,
maker_qty: float = 10.0,
n_takers_lambda: float = 2.0,
taker_size_alpha: float = 2.0,
taker_size_min: float = 1.0,
taker_size_max: float = 100.0,
hawkes_alpha: float = 0.5,
hawkes_beta: float = 1.0,
seed: int = 42,
) -> dict:
"""Simula un mercado con makers (Avellaneda-Stoikov) y takers (Hawkes + power-law).
Retorna dict con:
- trade_prices, trade_times, trade_sizes: listas de trades
- spreads, midprices: series por tick
- n_trades_per_tick: arrivals por tick
- maker_pnls: PnL final de cada maker
- total_trades: conteo total
"""
import numpy as np
# Importar funciones del registry
sys.path.insert(0, os.path.join(os.environ.get('FN_REGISTRY_ROOT', os.path.expanduser('~/fn_registry')), 'python', 'functions'))
from finance.finance import generate_gbm_prices, avellaneda_stoikov_quotes
rng = np.random.default_rng(seed)
# Generar precios fundamentales
fund_prices = generate_gbm_prices(initial_price, n_ticks, sigma, mu, jump_intensity, jump_size_std, seed)
# Order book simplificado: listas de (price, qty, maker_idx)
# Matching inline para no depender del notebook
trade_prices, trade_times, trade_sizes = [], [], []
spreads, midprices = [], []
n_trades_per_tick = []
maker_inventories = [0.0] * n_makers
maker_pnls = [0.0] * n_makers
hawkes_excitation = 0.0
for t in range(n_ticks):
mid = fund_prices[t]
# Makers place orders
all_bids = [] # (price, qty, maker_idx)
all_asks = []
for m in range(n_makers):
noise = rng.uniform(-0.05, 0.05)
quotes = avellaneda_stoikov_quotes(
mid + noise, maker_inventories[m], gamma, sigma, maker_spread, maker_levels, maker_qty
)
for q in quotes:
if q['side'] == 'buy':
all_bids.append((q['price'], q['qty'], m))
else:
all_asks.append((q['price'], q['qty'], m))
all_bids.sort(key=lambda x: -x[0]) # best bid first
all_asks.sort(key=lambda x: x[0]) # best ask first
# Record book state
if all_bids and all_asks:
spreads.append(all_asks[0][0] - all_bids[0][0])
midprices.append((all_bids[0][0] + all_asks[0][0]) / 2)
else:
spreads.append(0.0)
midprices.append(mid)
# Takers arrive (Hawkes)
lam = max(0.1, n_takers_lambda + hawkes_excitation)
n_takers = rng.poisson(lam)
tick_trades = 0
for _ in range(n_takers):
side = 'buy' if rng.random() < 0.5 else 'sell'
raw_size = (rng.pareto(taker_size_alpha) + 1) * taker_size_min
qty_remaining = min(round(raw_size, 1), taker_size_max)
book = list(all_asks) if side == 'buy' else list(all_bids)
for i, (price, available, maker_idx) in enumerate(book):
if qty_remaining <= 0:
break
fill = min(qty_remaining, available)
trade_prices.append(price)
trade_times.append(t)
trade_sizes.append(fill)
tick_trades += 1
qty_remaining -= fill
if side == 'buy':
maker_inventories[maker_idx] -= fill
maker_pnls[maker_idx] += price * fill
else:
maker_inventories[maker_idx] += fill
maker_pnls[maker_idx] -= price * fill
book[i] = (price, available - fill, maker_idx)
if side == 'buy':
all_asks = [(p, q, m) for p, q, m in book if q > 0]
else:
all_bids = [(p, q, m) for p, q, m in book if q > 0]
hawkes_excitation *= np.exp(-hawkes_beta)
hawkes_excitation += hawkes_alpha * tick_trades
n_trades_per_tick.append(tick_trades)
# Mark to market
final_price = fund_prices[-1]
for m in range(n_makers):
maker_pnls[m] += maker_inventories[m] * final_price
return {
'trade_prices': trade_prices,
'trade_times': trade_times,
'trade_sizes': trade_sizes,
'spreads': spreads,
'midprices': midprices,
'n_trades_per_tick': n_trades_per_tick,
'fundamental_prices': fund_prices,
'maker_pnls': [round(p, 2) for p in maker_pnls],
'total_trades': len(trade_prices),
}
if __name__ == '__main__':
result = run_market_sim()
print(json.dumps({
'total_trades': result['total_trades'],
'mean_spread': round(sum(result['spreads']) / len(result['spreads']), 4),
'maker_pnls': result['maker_pnls'],
}, indent=2))