chore: initial sync

This commit is contained in:
fn-registry agent
2026-04-28 22:13:07 +02:00
commit f60da6fa6f
13 changed files with 5974 additions and 0 deletions
+40
View File
@@ -0,0 +1,40 @@
# JUPYTER HABILITADO EN ESTE ANALISIS
## Reglas OBLIGATORIAS para Claude
### 1. CODIGO INMUTABLE — NUNCA MODIFICAR CELDAS EXISTENTES
- **PROHIBIDO** usar NotebookEdit para reemplazar celdas existentes
- **SIEMPRE** anadir celdas NUEVAS al final del notebook
- Si hay un error en una celda, crear celda nueva con la correccion
- El historial de trabajo debe quedar intacto para trazabilidad
### 2. PROGRAMACION FUNCIONAL OBLIGATORIA
- **Funciones puras**: sin efectos secundarios, mismo input -> mismo output
- **Inmutabilidad**: nunca mutar datos, crear copias transformadas
- **Composicion**: funciones pequenas que se combinan
- Preferir: `map`, `filter`, `reduce`, list comprehensions
- Evitar: loops con mutacion, `global`, modificar argumentos in-place
### 3. SIEMPRE usar MCP jupyter para ejecutar codigo Python
- Las ejecuciones se ven en tiempo real en Jupyter Lab del usuario
- Compartimos variables y estado del kernel
- **NUNCA usar bash para ejecutar Python en este analisis**
### 4. Verificar Jupyter activo ANTES de ejecutar
- Si no esta activo: pedir al usuario que ejecute `./run-jupyter-lab.sh`
### 5. Gestion de notebooks
- Notebooks en la carpeta `notebooks/` o subcarpetas
- Si un notebook tiene >50 celdas, crear uno nuevo
- Nombrar descriptivamente: `01_exploracion.ipynb`, `02_limpieza.ipynb`
### 6. Gestion de Python
- **SIEMPRE usar `uv`** para gestionar dependencias
- Anadir paquetes con `uv add nombre_paquete`
### 7. Acceso al fn_registry
- `FN_REGISTRY_ROOT` apunta a la raiz del registry
- Para importar funciones Python: `sys.path.insert(0, os.path.join(os.environ["FN_REGISTRY_ROOT"], "python", "functions"))`
- Para consultar registry.db: `sqlite3` o `import sqlite3` con la ruta `$FN_REGISTRY_ROOT/registry.db`
+12
View File
@@ -0,0 +1,12 @@
.venv/
.mcp.json
.jupyter-port
.jupyter/
.jupyter_ystore.db
.ipython/
__pycache__/
*.pyc
.ipynb_checkpoints/
bin/
data/
.DS_Store
+1
View File
@@ -0,0 +1 @@
3.13
View File
+892
View File
@@ -0,0 +1,892 @@
"""
challenges.py — Challenges de programación de nivel production.
Inspirados en funciones reales del fn_registry. Prueban:
- Programación funcional (pipe, compose, curry, combinators)
- Procesamiento de datos (coercion, parsing, normalization)
- Algoritmos no triviales (árboles, grafos, scheduling)
- Patterns del mundo real (retry, rate limiting, schema validation)
"""
from eval_runner import Challenge
# ══════════════════════════════════════════════════════════════
# FUNCTIONAL PROGRAMMING
# ══════════════════════════════════════════════════════════════
FUNCTIONAL = [
Challenge(
id="fn_pipe",
name="Pipe with error propagation",
category="functional",
difficulty="medium",
prompt="""Write a Python function:
def pipe_safe(value, *fns):
\"\"\"Pipe a value through functions left-to-right. If any function raises,
return a tuple (None, error_string). On success return (result, None).\"\"\"
Example:
pipe_safe(5, lambda x: x*2, lambda x: x+1) == (11, None)
pipe_safe(0, lambda x: 10/x) == (None, "division by zero") # or similar
""",
test_code="""
# Success cases
assert pipe_safe(5, lambda x: x*2, lambda x: x+1) == (11, None)
assert pipe_safe("hello", str.upper, lambda s: s + "!") == ("HELLO!", None)
assert pipe_safe(42) == (42, None) # no functions
assert pipe_safe([3,1,2], sorted, lambda x: x[0]) == (1, None)
# Error propagation
result, err = pipe_safe(0, lambda x: 10/x)
assert result is None
assert err is not None and "division" in err.lower()
result, err = pipe_safe("abc", lambda x: x*2, int)
assert result is None
assert err is not None
print("PASS: pipe_safe")
""",
),
Challenge(
id="fn_group_by_multi",
name="Group by with transform",
category="functional",
difficulty="medium",
prompt="""Write a Python function:
def group_by_transform(xs: list, key_fn, value_fn=None) -> dict:
\"\"\"Group elements by key_fn. Optionally transform values with value_fn.
If value_fn is None, store raw elements. Preserves insertion order within groups.\"\"\"
Example:
group_by_transform(["hello", "hi", "bye"], lambda s: s[0])
# => {"h": ["hello", "hi"], "b": ["bye"]}
group_by_transform(["hello", "hi", "bye"], lambda s: s[0], str.upper)
# => {"h": ["HELLO", "HI"], "b": ["BYE"]}
""",
test_code="""
# Basic grouping
r = group_by_transform(["hello", "hi", "bye"], lambda s: s[0])
assert r == {"h": ["hello", "hi"], "b": ["bye"]}
# With value transform
r = group_by_transform(["hello", "hi", "bye"], lambda s: s[0], str.upper)
assert r == {"h": ["HELLO", "HI"], "b": ["BYE"]}
# Numbers
r = group_by_transform([1,2,3,4,5,6], lambda x: x % 2, lambda x: x**2)
assert r == {1: [1, 9, 25], 0: [4, 16, 36]}
# Empty
assert group_by_transform([], lambda x: x) == {}
# Single element
assert group_by_transform([42], lambda x: "k") == {"k": [42]}
print("PASS: group_by_transform")
""",
),
Challenge(
id="fn_memoize",
name="Memoize decorator with max size",
category="functional",
difficulty="hard",
prompt="""Write a Python function:
def memoize(max_size: int = 128):
\"\"\"Decorator that memoizes function results. When cache exceeds max_size,
evict the oldest entry (FIFO). The key is (args, tuple(sorted(kwargs.items()))).
Must work with both positional and keyword arguments.\"\"\"
Usage:
@memoize(max_size=3)
def add(a, b):
return a + b
""",
test_code="""
call_count = 0
@memoize(max_size=3)
def expensive(x, y=0):
global call_count
call_count += 1
return x + y
# First call — computes
call_count = 0
assert expensive(1, 2) == 3
assert call_count == 1
# Cached — no recompute
assert expensive(1, 2) == 3
assert call_count == 1
# Different args
assert expensive(3, 4) == 7
assert call_count == 2
# Kwargs
assert expensive(1, y=2) == 3
assert call_count == 2 # same as (1, 2) via kwargs
# Fill cache to max_size=3
assert expensive(10) == 10 # call 3
assert expensive(20) == 20 # call 4, evicts (1,2)
assert call_count == 4
# (1,2) was evicted, must recompute
assert expensive(1, 2) == 3
assert call_count == 5
print("PASS: memoize")
""",
max_tokens=1500,
),
Challenge(
id="fn_compose_async",
name="Partition with multiple predicates",
category="functional",
difficulty="medium",
prompt="""Write a Python function:
def multi_partition(xs: list, *predicates) -> list[list]:
\"\"\"Partition a list into N+1 buckets where N is the number of predicates.
Each element goes into the bucket of the FIRST predicate it satisfies.
Elements matching no predicate go into the last bucket.
Returns list of N+1 lists. Does not mutate input.\"\"\"
Example:
multi_partition([1,2,3,4,5,6,7,8,9,10],
lambda x: x % 3 == 0,
lambda x: x % 2 == 0)
# => [[3,6,9], [2,4,8,10], [1,5,7]]
# 6 goes to first bucket (div by 3) even though also div by 2
""",
test_code="""
# Basic
r = multi_partition([1,2,3,4,5,6,7,8,9,10], lambda x: x%3==0, lambda x: x%2==0)
assert r == [[3,6,9], [2,4,8,10], [1,5,7]], f"got {r}"
# No predicates — everything in remainder
assert multi_partition([1,2,3]) == [[1,2,3]]
# One predicate
r = multi_partition(["a","bb","ccc"], lambda s: len(s) > 1)
assert r == [["bb","ccc"], ["a"]]
# All match first
r = multi_partition([2,4,6], lambda x: x%2==0, lambda x: x>0)
assert r == [[2,4,6], [], []]
# Empty
r = multi_partition([], lambda x: True)
assert r == [[], []]
print("PASS: multi_partition")
""",
),
]
# ══════════════════════════════════════════════════════════════
# DATA PROCESSING
# ══════════════════════════════════════════════════════════════
DATA_PROCESSING = [
Challenge(
id="dp_coerce",
name="Type coercion with schema",
category="data_processing",
difficulty="hard",
prompt="""Write a Python function:
def coerce_types(data: dict, schema: dict[str, str]) -> tuple[dict, list[str]]:
\"\"\"Coerce dict values to types specified in schema. Never mutate original.
Schema maps field names to type strings: "int", "float", "str", "bool", "list[str]".
Rules:
- str → int: parse via float first (handle "3.0" → 3), warn if lossy ("3.7" → 3)
- str → float: standard float()
- str → bool: "true/1/yes" → True, "false/0/no" → False (case-insensitive)
- str → list[str]: split by "," and strip whitespace from each item
- Fields not in schema: pass through unchanged
- Fields in schema but not in data: skip
- Failed coercion: keep original value, add warning string to list
Returns (new_dict, warnings_list).\"\"\"
""",
test_code="""
# Basic coercions
d, w = coerce_types({"age": "25", "score": "3.14", "active": "yes"}, {"age": "int", "score": "float", "active": "bool"})
assert d == {"age": 25, "score": 3.14, "active": True}, f"got {d}"
assert w == []
# Lossy int coercion
d, w = coerce_types({"x": "3.7"}, {"x": "int"})
assert d["x"] == 3
assert len(w) == 1 and "lossy" in w[0].lower() or "3.7" in w[0]
# Bool variants
d, _ = coerce_types({"a": "TRUE", "b": "0", "c": "no"}, {"a": "bool", "b": "bool", "c": "bool"})
assert d == {"a": True, "b": False, "c": False}
# list[str]
d, _ = coerce_types({"tags": "a, b , c"}, {"tags": "list[str]"})
assert d == {"tags": ["a", "b", "c"]}
# Pass through unknown fields
d, _ = coerce_types({"name": "test", "age": "5"}, {"age": "int"})
assert d == {"name": "test", "age": 5}
# Failed coercion
d, w = coerce_types({"x": "not_a_number"}, {"x": "int"})
assert d["x"] == "not_a_number" # kept original
assert len(w) == 1
# No mutation
original = {"x": "5"}
d, _ = coerce_types(original, {"x": "int"})
assert original["x"] == "5"
assert d["x"] == 5
print("PASS: coerce_types")
""",
max_tokens=2048,
),
Challenge(
id="dp_frontmatter",
name="Extract YAML frontmatter",
category="data_processing",
difficulty="medium",
prompt="""Write a Python function:
def extract_frontmatter(content: str) -> tuple[str, dict | None]:
\"\"\"Extract YAML-like frontmatter delimited by '---' from start of markdown.
Frontmatter format:
---
key: value
another: something
---
Rest of content here.
Parse simple key:value pairs (no nested YAML needed). Values are always strings.
Do NOT use the yaml library.
Returns (content_without_frontmatter, parsed_dict_or_None).
If no frontmatter found, return (original_content, None).\"\"\"
""",
test_code="""
# Basic frontmatter
content = "---\\nname: test\\nversion: 1.0\\n---\\n\\nHello world"
body, meta = extract_frontmatter(content)
assert meta == {"name": "test", "version": "1.0"}, f"got {meta}"
assert body.strip() == "Hello world"
# No frontmatter
body, meta = extract_frontmatter("Just text")
assert meta is None
assert body == "Just text"
# Empty frontmatter
body, meta = extract_frontmatter("---\\n---\\nContent")
assert meta == {} or meta is not None
assert "Content" in body
# Values with colons
body, meta = extract_frontmatter("---\\nurl: http://example.com\\n---\\nBody")
assert meta["url"] == "http://example.com"
# Frontmatter must be at start
body, meta = extract_frontmatter("Some text\\n---\\nkey: val\\n---")
assert meta is None
print("PASS: extract_frontmatter")
""",
),
Challenge(
id="dp_json_extract",
name="Extract JSON from LLM response",
category="data_processing",
difficulty="hard",
prompt="""Write a Python function:
def extract_json_from_llm(content: str) -> dict:
\"\"\"Extract and parse JSON from messy LLM responses.
Must handle:
1. JSON inside ```json ... ``` code blocks
2. JSON inside ``` ... ``` blocks (no language tag)
3. Raw JSON with surrounding text
4. Trailing commas: {"a": 1,} or [1, 2,]
5. Python None instead of null
6. Single-quoted strings converted to double quotes
Returns parsed dict. Returns empty dict {} on failure.
Use only stdlib (json, re).\"\"\"
""",
test_code="""
import json
# Clean JSON block
assert extract_json_from_llm('```json\\n{"name": "test"}\\n```') == {"name": "test"}
# Block without language tag
assert extract_json_from_llm('```\\n{"x": 1}\\n```') == {"x": 1}
# JSON with surrounding text
r = extract_json_from_llm('Here is the result: {"count": 42} hope that helps!')
assert r == {"count": 42}
# Trailing commas
assert extract_json_from_llm('{"a": 1, "b": 2,}') == {"a": 1, "b": 2}
assert extract_json_from_llm('[1, 2, 3,]') == {} or extract_json_from_llm('{"items": [1,2,]}') == {"items": [1, 2]}
# Python None → null
assert extract_json_from_llm('{"value": None}') == {"value": None}
# Garbage input
assert extract_json_from_llm("no json here at all") == {}
assert extract_json_from_llm("") == {}
print("PASS: extract_json_from_llm")
""",
max_tokens=1500,
),
Challenge(
id="dp_smart_split",
name="Smart text splitter with token budget",
category="data_processing",
difficulty="hard",
prompt="""Write a Python function:
def smart_split(text: str, max_chars: int = 500, overlap: int = 50) -> list[str]:
\"\"\"Split text into chunks respecting max_chars with overlap between chunks.
Rules:
- Split at paragraph boundaries (double newline) when possible
- If a single paragraph exceeds max_chars, split at sentence boundaries (. ! ?)
- If a single sentence exceeds max_chars, hard-cut at max_chars
- Each chunk (except the first) starts with the last `overlap` characters of the previous chunk
- Strip leading/trailing whitespace from each chunk
- Never return empty chunks
Returns list of string chunks.\"\"\"
""",
test_code="""
# Simple paragraphs within budget
text = "First paragraph.\\n\\nSecond paragraph.\\n\\nThird paragraph."
chunks = smart_split(text, max_chars=100)
assert len(chunks) == 1
assert text.strip() in chunks[0]
# Force split between paragraphs
text = "A" * 100 + "\\n\\n" + "B" * 100
chunks = smart_split(text, max_chars=120, overlap=10)
assert len(chunks) >= 2
assert "A" * 100 in chunks[0]
assert "B" * 100 in chunks[-1]
# Overlap present
text = "Hello world this is text.\\n\\nAnother paragraph here."
chunks = smart_split(text, max_chars=30, overlap=5)
assert len(chunks) >= 2
for c in chunks:
assert len(c.strip()) > 0 # no empty chunks
# Very long single paragraph splits at sentence
text = "Short sentence. " * 50 # ~850 chars
chunks = smart_split(text, max_chars=200, overlap=20)
assert all(len(c) <= 220 for c in chunks) # max_chars + overlap tolerance
# Hard cut when no sentence boundary
text = "A" * 600
chunks = smart_split(text, max_chars=200, overlap=20)
assert len(chunks) >= 3
assert all(len(c) <= 220 for c in chunks)
# Empty/whitespace
assert smart_split("") == [] or smart_split("") == [""]
assert smart_split(" \\n\\n ") == [] or len(smart_split(" \\n\\n ")) <= 1
print("PASS: smart_split")
""",
max_tokens=2048,
),
]
# ══════════════════════════════════════════════════════════════
# ALGORITHMS
# ══════════════════════════════════════════════════════════════
ALGORITHMS = [
Challenge(
id="alg_topo_sort",
name="Topological sort with cycle detection",
category="algorithm",
difficulty="hard",
prompt="""Write a Python function:
def topo_sort(graph: dict[str, list[str]]) -> tuple[list[str], bool]:
\"\"\"Topological sort of a directed acyclic graph using Kahn's algorithm.
graph is adjacency list: {"a": ["b", "c"]} means a → b, a → c.
Nodes with no edges should also be included.
Returns (sorted_list, has_cycle).
- If no cycle: (topologically_sorted_nodes, False)
- If cycle detected: (partial_result, True)
When multiple valid orderings exist, prefer lexicographic order.\"\"\"
""",
test_code="""
# Simple DAG
order, cycle = topo_sort({"a": ["b", "c"], "b": ["d"], "c": ["d"], "d": []})
assert not cycle
assert order.index("a") < order.index("b")
assert order.index("a") < order.index("c")
assert order.index("b") < order.index("d")
assert order.index("c") < order.index("d")
# Lexicographic preference
order, cycle = topo_sort({"c": [], "b": [], "a": []})
assert not cycle
assert order == ["a", "b", "c"]
# Cycle detection
_, cycle = topo_sort({"a": ["b"], "b": ["c"], "c": ["a"]})
assert cycle
# Single node
order, cycle = topo_sort({"x": []})
assert order == ["x"]
assert not cycle
# Empty graph
order, cycle = topo_sort({})
assert order == []
assert not cycle
# Linear chain
order, cycle = topo_sort({"a": ["b"], "b": ["c"], "c": []})
assert order == ["a", "b", "c"]
assert not cycle
print("PASS: topo_sort")
""",
),
Challenge(
id="alg_interval_merge",
name="Interval scheduler with priorities",
category="algorithm",
difficulty="hard",
prompt="""Write a Python function:
def schedule_intervals(intervals: list[dict]) -> list[dict]:
\"\"\"Schedule non-overlapping intervals maximizing total priority.
Each interval is {"id": str, "start": int, "end": int, "priority": int}.
Intervals are half-open: [start, end). Two intervals [1,3) and [3,5) do NOT overlap.
Use weighted interval scheduling (dynamic programming).
Returns list of selected intervals sorted by start time.\"\"\"
""",
test_code="""
# Basic: pick higher priority
result = schedule_intervals([
{"id": "a", "start": 0, "end": 3, "priority": 2},
{"id": "b", "start": 1, "end": 4, "priority": 5},
{"id": "c", "start": 3, "end": 6, "priority": 3},
])
ids = [r["id"] for r in result]
assert "b" in ids # highest single priority
# b conflicts with a and c's start, so either [b] (5) or [a,c] (5) is valid
total = sum(r["priority"] for r in result)
assert total == 5, f"got total={total}"
# Non-overlapping, take all
result = schedule_intervals([
{"id": "a", "start": 0, "end": 2, "priority": 3},
{"id": "b", "start": 2, "end": 4, "priority": 3},
{"id": "c", "start": 4, "end": 6, "priority": 3},
])
assert len(result) == 3
assert sum(r["priority"] for r in result) == 9
# Empty
assert schedule_intervals([]) == []
# Single
result = schedule_intervals([{"id": "x", "start": 0, "end": 10, "priority": 7}])
assert len(result) == 1 and result[0]["id"] == "x"
# Prefer two small over one big
result = schedule_intervals([
{"id": "big", "start": 0, "end": 10, "priority": 5},
{"id": "s1", "start": 0, "end": 5, "priority": 3},
{"id": "s2", "start": 5, "end": 10, "priority": 3},
])
total = sum(r["priority"] for r in result)
assert total == 6 # s1 + s2 beats big
# Result sorted by start
for i in range(len(result) - 1):
assert result[i]["start"] <= result[i+1]["start"]
print("PASS: schedule_intervals")
""",
max_tokens=2048,
),
Challenge(
id="alg_tree_ops",
name="Tree operations suite",
category="algorithm",
difficulty="expert",
prompt="""Write three Python functions for tree manipulation:
1. def flatten_tree(tree: dict) -> list[dict]:
\"\"\"Flatten nested tree to list. Each node is a dict with optional 'children' key.
DFS pre-order. Remove 'children' key from output nodes. Deep copy nodes.\"\"\"
2. def find_path(tree: dict, target_id: str) -> list[str] | None:
\"\"\"Find path from root to node with given 'id' field. Returns list of ids
from root to target (inclusive), or None if not found.\"\"\"
3. def map_tree(tree: dict, fn) -> dict:
\"\"\"Apply fn to each node (excluding 'children' key), return new tree with
same structure. fn receives a dict without 'children' and returns a new dict.
Must not mutate original.\"\"\"
""",
test_code="""
import copy
tree = {
"id": "root", "name": "Root",
"children": [
{"id": "a", "name": "A", "children": [
{"id": "a1", "name": "A1"},
{"id": "a2", "name": "A2"},
]},
{"id": "b", "name": "B"},
]
}
original = copy.deepcopy(tree)
# flatten_tree
flat = flatten_tree(tree)
ids = [n["id"] for n in flat]
assert ids == ["root", "a", "a1", "a2", "b"], f"got {ids}"
assert all("children" not in n for n in flat)
assert tree == original # no mutation
# find_path
assert find_path(tree, "a2") == ["root", "a", "a2"]
assert find_path(tree, "root") == ["root"]
assert find_path(tree, "b") == ["root", "b"]
assert find_path(tree, "nonexistent") is None
# map_tree
result = map_tree(tree, lambda n: {**n, "name": n["name"].lower()})
assert result["name"] == "root"
assert result["children"][0]["name"] == "a"
assert result["children"][0]["children"][0]["name"] == "a1"
assert tree == original # no mutation
assert result["id"] == "root"
# Edge: leaf node
leaf = {"id": "solo", "val": 1}
flat = flatten_tree(leaf)
assert flat == [{"id": "solo", "val": 1}]
assert find_path(leaf, "solo") == ["solo"]
print("PASS: tree_ops")
""",
max_tokens=2048,
),
]
# ══════════════════════════════════════════════════════════════
# REAL-WORLD PATTERNS
# ══════════════════════════════════════════════════════════════
REAL_WORLD = [
Challenge(
id="rw_retry",
name="Retry with exponential backoff",
category="real_world",
difficulty="hard",
prompt="""Write a Python function:
def compute_backoff_delays(max_retries: int, base_delay: float = 1.0,
max_delay: float = 60.0, jitter: bool = False) -> list[float]:
\"\"\"Compute the sequence of backoff delays for retry logic.
Formula: delay = min(base_delay * 2^attempt, max_delay)
If jitter=True, multiply each delay by a factor between 0.5 and 1.0
(use deterministic half-jitter: factor = 0.75 for testability).
attempt starts at 0.
Returns list of `max_retries` delay values.\"\"\"
Also write:
def classify_error(status_code: int) -> str:
\"\"\"Classify HTTP status code for retry decisions.
Returns: 'permanent' (4xx except 429), 'transient' (5xx, 429, 408), or 'success' (2xx).
Any other code returns 'unknown'.\"\"\"
""",
test_code="""
# Basic exponential backoff
delays = compute_backoff_delays(5, base_delay=1.0, max_delay=60.0)
assert delays == [1.0, 2.0, 4.0, 8.0, 16.0], f"got {delays}"
# Capped at max_delay
delays = compute_backoff_delays(4, base_delay=10.0, max_delay=30.0)
assert delays == [10.0, 20.0, 30.0, 30.0], f"got {delays}"
# With jitter (deterministic 0.75 factor)
delays = compute_backoff_delays(3, base_delay=4.0, jitter=True)
assert delays == [3.0, 6.0, 12.0], f"got {delays}"
# Zero retries
assert compute_backoff_delays(0) == []
# Error classification
assert classify_error(200) == "success"
assert classify_error(201) == "success"
assert classify_error(400) == "permanent"
assert classify_error(403) == "permanent"
assert classify_error(404) == "permanent"
assert classify_error(429) == "transient" # rate limit
assert classify_error(408) == "transient" # timeout
assert classify_error(500) == "transient"
assert classify_error(503) == "transient"
assert classify_error(100) == "unknown"
assert classify_error(302) == "unknown"
print("PASS: retry_backoff")
""",
),
Challenge(
id="rw_schema_validate",
name="Schema validator for dicts",
category="real_world",
difficulty="expert",
prompt="""Write a Python function:
def validate(data: dict, schema: dict) -> list[str]:
\"\"\"Validate a dict against a schema. Return list of error strings (empty = valid).
Schema format — each key maps to a rule dict:
{
"field_name": {
"type": "str" | "int" | "float" | "bool" | "list" | "dict",
"required": True | False, # default False
"min": number, # minimum value (for int/float) or min length (for str/list)
"max": number, # maximum value or max length
"choices": [...], # allowed values
"pattern": "regex", # regex pattern (for str only)
}
}
Error messages should be descriptive: "field_name: expected type str, got int"
Check in order: required → type → min/max → choices → pattern.\"\"\"
""",
test_code="""
import re
schema = {
"name": {"type": "str", "required": True, "min": 1, "max": 50},
"age": {"type": "int", "required": True, "min": 0, "max": 150},
"email": {"type": "str", "pattern": r".+@.+\\..+"},
"role": {"type": "str", "choices": ["admin", "user", "guest"]},
"tags": {"type": "list", "max": 5},
}
# Valid data
errors = validate({"name": "Alice", "age": 30, "email": "a@b.com", "role": "admin", "tags": ["a"]}, schema)
assert errors == [], f"got {errors}"
# Missing required
errors = validate({"age": 25}, schema)
assert any("name" in e and "required" in e.lower() for e in errors), f"got {errors}"
# Wrong type
errors = validate({"name": 123, "age": 25}, schema)
assert any("name" in e and "type" in e.lower() for e in errors)
# Min/max violation
errors = validate({"name": "", "age": 25}, schema)
assert any("name" in e for e in errors) # min length 1
errors = validate({"name": "Bob", "age": -5}, schema)
assert any("age" in e for e in errors) # min 0
# Invalid choice
errors = validate({"name": "X", "age": 1, "role": "superuser"}, schema)
assert any("role" in e and "choices" in e.lower() for e in errors)
# Pattern mismatch
errors = validate({"name": "X", "age": 1, "email": "invalid"}, schema)
assert any("email" in e and "pattern" in e.lower() for e in errors)
# Extra fields ignored (no error)
errors = validate({"name": "X", "age": 1, "extra": "ok"}, schema)
assert not any("extra" in e for e in errors)
# Optional missing is fine
errors = validate({"name": "Test", "age": 50}, schema)
assert not any("email" in e for e in errors)
print("PASS: schema_validate")
""",
max_tokens=2500,
),
Challenge(
id="rw_rate_limiter",
name="Token bucket rate limiter",
category="real_world",
difficulty="expert",
prompt="""Write a Python class:
class TokenBucket:
\"\"\"Token bucket rate limiter (non-threaded, for testing).
Args:
capacity: Maximum tokens in bucket.
refill_rate: Tokens added per second.
Methods:
consume(tokens: int = 1, current_time: float = None) -> bool:
Try to consume tokens. Returns True if allowed, False if not enough tokens.
current_time is injectable for testing (defaults to time.time()).
Before checking, refill based on elapsed time since last refill.
tokens_available(current_time: float = None) -> float:
Return current token count after refill.
wait_time(tokens: int = 1, current_time: float = None) -> float:
Return seconds to wait before `tokens` would be available.
Returns 0.0 if tokens are already available.
\"\"\"
""",
test_code="""
# Basic usage
bucket = TokenBucket(capacity=10, refill_rate=1.0)
# Starts full
assert bucket.tokens_available(current_time=0) == 10
# Consume some
assert bucket.consume(3, current_time=0) == True
assert bucket.tokens_available(current_time=0) == 7
# Consume more than available
assert bucket.consume(8, current_time=0) == False
assert bucket.tokens_available(current_time=0) == 7 # unchanged
# Refill over time
assert bucket.tokens_available(current_time=2) == 9 # 7 + 2*1.0
# Consume after refill
assert bucket.consume(9, current_time=2) == True
assert bucket.tokens_available(current_time=2) == 0
# Don't exceed capacity
assert bucket.tokens_available(current_time=100) == 10 # capped at capacity
# Wait time
bucket2 = TokenBucket(capacity=5, refill_rate=2.0)
bucket2.consume(5, current_time=0)
assert bucket2.tokens_available(current_time=0) == 0
wt = bucket2.wait_time(4, current_time=0)
assert abs(wt - 2.0) < 0.01 # need 4 tokens at 2/s = 2s
# Already available
bucket3 = TokenBucket(capacity=10, refill_rate=1.0)
assert bucket3.wait_time(5, current_time=0) == 0.0
# Consume more than capacity
assert bucket3.consume(11, current_time=0) == False
print("PASS: token_bucket")
""",
max_tokens=1500,
),
Challenge(
id="rw_diff",
name="Simple line differ",
category="real_world",
difficulty="expert",
prompt="""Write a Python function:
def line_diff(old: str, new: str) -> list[str]:
\"\"\"Compute line-by-line diff between old and new text.
Returns list of diff lines:
- Lines only in old: prefixed with "- "
- Lines only in new: prefixed with "+ "
- Common lines: prefixed with " " (two spaces)
Use longest common subsequence (LCS) to produce minimal diff.
Split input on newlines. Empty string = no lines.\"\"\"
""",
test_code="""
# No changes
result = line_diff("a\\nb\\nc", "a\\nb\\nc")
assert result == [" a", " b", " c"]
# Addition
result = line_diff("a\\nc", "a\\nb\\nc")
assert result == [" a", "+ b", " c"], f"got {result}"
# Deletion
result = line_diff("a\\nb\\nc", "a\\nc")
assert result == [" a", "- b", " c"], f"got {result}"
# Replacement
result = line_diff("a\\nb\\nc", "a\\nX\\nc")
assert result == [" a", "- b", "+ X", " c"], f"got {result}"
# Complete change
result = line_diff("a\\nb", "c\\nd")
assert result == ["- a", "- b", "+ c", "+ d"]
# Empty inputs
assert line_diff("", "") == []
assert line_diff("a", "") == ["- a"]
assert line_diff("", "a") == ["+ a"]
# Multiple additions and deletions
result = line_diff("a\\nb\\nc\\nd", "a\\nc\\nd\\ne")
assert "- b" in result
assert "+ e" in result
assert " a" in result
assert " c" in result
assert " d" in result
print("PASS: line_diff")
""",
max_tokens=2048,
),
]
# ── Todos ─────────────────────────────────────────────────
ALL_CHALLENGES = FUNCTIONAL + DATA_PROCESSING + ALGORITHMS + REAL_WORLD
+157
View File
@@ -0,0 +1,157 @@
"""
eval_runner.py — Motor de evaluación de coding para modelos locales.
Prueba si un LLM puede generar funciones de calidad production-ready
al estilo del fn_registry: puras, genéricas, testeables, composables.
"""
import requests
import re
import subprocess
import tempfile
import time
import os
import json
from dataclasses import dataclass, field
API_BASE = "http://127.0.0.1:1234/v1"
# ── Tipos ─────────────────────────────────────────────────
@dataclass
class Challenge:
id: str
name: str
category: str # functional, data_processing, algorithm, real_world
difficulty: str # medium, hard, expert
prompt: str
test_code: str
max_tokens: int = 2048
@dataclass
class Result:
challenge_id: str
name: str
category: str
difficulty: str
passed: bool
error: str
code: str
raw_response: str
latency_ms: float
completion_tokens: int
tokens_per_second: float
# ── Motor ─────────────────────────────────────────────────
SYSTEM_PROMPT = """You are a senior software engineer writing production Python code for a function registry.
Rules:
- Return ONLY the function/class code inside a single ```python block
- Use type hints on all parameters and return types
- Functions must be pure when possible: no side effects, no mutation of inputs
- Use descriptive variable names, not single letters
- Handle edge cases (empty inputs, None, boundary values)
- No imports from external packages — only Python stdlib
- No print statements, no logging, no comments explaining obvious code
- Follow the function signature EXACTLY as specified in the prompt"""
def query_model(model: str, prompt: str, max_tokens: int = 4096) -> dict:
t0 = time.time()
resp = requests.post(f"{API_BASE}/chat/completions", json={
"model": model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
"max_tokens": max_tokens,
"temperature": 0.0,
"top_p": 0.9,
"top_k": 20,
"min_p": 0.05,
"repetition_penalty": 1.0,
"presence_penalty": 0.0,
"frequency_penalty": 0.0,
"stop": ["<|im_end|>", "<|endoftext|>"],
}, timeout=300)
latency_ms = (time.time() - t0) * 1000
data = resp.json()
content = data["choices"][0]["message"]["content"]
usage = data.get("usage", {})
comp = usage.get("completion_tokens", 0)
tps = comp / (latency_ms / 1000) if latency_ms > 0 else 0
return {"content": content, "latency_ms": latency_ms, "completion_tokens": comp, "tps": tps}
def extract_code(text: str) -> str:
# 1. Closed code block
for pat in [r"```python\s*\n(.*?)```", r"```\s*\n(.*?)```"]:
m = re.search(pat, text, re.DOTALL)
if m:
return m.group(1).strip()
# 2. Unclosed code block (model hit max_tokens before closing ```)
for pat in [r"```python\s*\n(.*)", r"```\s*\n(.*)"]:
m = re.search(pat, text, re.DOTALL)
if m:
return m.group(1).strip()
# 3. No code block — extract from first 'def '/'class ' to end
m = re.search(r"^((?:def |class |import |from ).*)", text, re.DOTALL | re.MULTILINE)
if m:
return m.group(1).strip()
return text.strip()
def run_test(code: str, test_code: str, timeout: int = 15) -> tuple[bool, str]:
full = code + "\n\n" + test_code
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(full)
f.flush()
try:
r = subprocess.run(["python3", f.name], capture_output=True, text=True, timeout=timeout)
if r.returncode == 0:
return True, ""
# Full error: stdout + stderr, keep last 800 chars for better debugging
err = (r.stdout + "\n" + r.stderr).strip()
return False, err[-800:]
except subprocess.TimeoutExpired:
return False, "TIMEOUT"
finally:
os.unlink(f.name)
def evaluate(model: str, challenges: list[Challenge]) -> list[Result]:
results = []
for ch in challenges:
print(f" [{ch.id}] {ch.name} ({ch.difficulty})...", end=" ", flush=True)
try:
resp = query_model(model, ch.prompt, ch.max_tokens)
code = extract_code(resp["content"])
passed, error = run_test(code, ch.test_code)
status = "PASS" if passed else "FAIL"
print(f"{status} {resp['latency_ms']:.0f}ms {resp['completion_tokens']}tok {resp['tps']:.1f}t/s")
if not passed:
# Show last 2 lines of error for quick diagnosis
err_lines = [l for l in error.strip().split("\n") if l.strip()]
for el in err_lines[-3:]:
print(f" | {el[:120]}")
results.append(Result(
challenge_id=ch.id, name=ch.name, category=ch.category,
difficulty=ch.difficulty, passed=passed, error=error,
code=code, raw_response=resp["content"],
latency_ms=resp["latency_ms"],
completion_tokens=resp["completion_tokens"],
tokens_per_second=resp["tps"],
))
except Exception as e:
print(f"ERROR: {e}")
results.append(Result(
challenge_id=ch.id, name=ch.name, category=ch.category,
difficulty=ch.difficulty, passed=False, error=str(e),
code="", raw_response="", latency_ms=0,
completion_tokens=0, tokens_per_second=0,
))
return results
+6
View File
@@ -0,0 +1,6 @@
def main():
print("Hello from agent-coding-eval!")
if __name__ == "__main__":
main()
+621
View File
@@ -0,0 +1,621 @@
"""
Agent Coding Evaluation - Script de evaluación de modelos locales
Evalúa capacidades de programación de modelos LLM locales via LM Studio API.
Modelos disponibles:
- qwen/qwen3-coder-next (especializado en código)
- qwen/qwen3.5-9b (general)
- nvidia/nemotron-3-nano-4b (pequeño)
- bitnet-b1.58-2b-4t (ultra-ligero)
"""
import requests
import json
import time
import re
import subprocess
import tempfile
import os
import traceback
from dataclasses import dataclass, field
from typing import Optional
# ── Config ────────────────────────────────────────────────
API_BASE = "http://127.0.0.1:1234/v1"
MODELS = [
"qwen/qwen3-coder-next",
"qwen/qwen3.5-9b",
"nvidia/nemotron-3-nano-4b",
"bitnet-b1.58-2b-4t",
]
# ── Tipos ─────────────────────────────────────────────────
@dataclass
class Challenge:
id: str
name: str
difficulty: str # easy, medium, hard
language: str # python, go, bash
prompt: str
test_code: str # código que valida la respuesta
max_tokens: int = 1024
@dataclass
class Result:
model: str
challenge_id: str
raw_response: str
extracted_code: str
compiled: bool
tests_passed: bool
error: str
latency_ms: float
tokens_used: int
reasoning_tokens: int = 0
completion_tokens: int = 0
prompt_tokens: int = 0
tokens_per_second: float = 0.0 # completion tokens / latency
# ── Helpers ───────────────────────────────────────────────
def query_model(model: str, prompt: str, max_tokens: int = 1024, temperature: float = 0) -> dict:
"""Consulta un modelo via OpenAI-compatible API."""
resp = requests.post(f"{API_BASE}/chat/completions", json={
"model": model,
"messages": [
{"role": "system", "content": "You are a coding assistant. Return ONLY code inside a single code block. No explanations."},
{"role": "user", "content": prompt},
],
"max_tokens": max_tokens,
"temperature": temperature,
}, timeout=120)
resp.raise_for_status()
return resp.json()
def extract_code(text: str, language: str = "python") -> str:
"""Extrae código de un bloque markdown."""
# Buscar bloque con lenguaje específico
patterns = [
rf"```{language}\s*\n(.*?)```",
r"```\s*\n(.*?)```",
rf"```{language}\s*\n(.*?)$",
]
for pat in patterns:
m = re.search(pat, text, re.DOTALL)
if m:
return m.group(1).strip()
# Si no hay bloque, asumir que todo es código
return text.strip()
def run_python(code: str, test_code: str, timeout: int = 10) -> tuple[bool, bool, str]:
"""Ejecuta código Python + tests. Retorna (compiled, tests_passed, error)."""
full_code = code + "\n\n" + test_code
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(full_code)
f.flush()
try:
result = subprocess.run(
["python3", f.name],
capture_output=True, text=True, timeout=timeout
)
if result.returncode == 0:
return True, True, ""
# Distinguir error de compilación vs test
err = result.stderr.strip()
if "SyntaxError" in err or "IndentationError" in err:
return False, False, err[-500:]
return True, False, err[-500:]
except subprocess.TimeoutExpired:
return True, False, "TIMEOUT"
finally:
os.unlink(f.name)
def run_go(code: str, test_code: str, timeout: int = 15) -> tuple[bool, bool, str]:
"""Ejecuta código Go + tests. Auto-detecta imports faltantes."""
with tempfile.TemporaryDirectory() as tmpdir:
# Init module
subprocess.run(["go", "mod", "init", "eval"], cwd=tmpdir, capture_output=True)
# Auto-detect needed stdlib imports from code
stdlib_hints = {
"strings.": "strings", "fmt.": "fmt", "strconv.": "strconv",
"sort.": "sort", "math.": "math", "regexp.": "regexp",
"io.": "io", "os.": "os", "sync.": "sync",
}
needed = set()
for hint, pkg in stdlib_hints.items():
if hint in code:
needed.add(pkg)
# Only add imports if code doesn't already have an import block
import_block = ""
if needed and "import" not in code:
imports = "\n".join(f'\t"{p}"' for p in sorted(needed))
import_block = f"import (\n{imports}\n)\n\n"
main_code = f"package main\n\n{import_block}{code}\n"
with open(os.path.join(tmpdir, "main.go"), "w") as f:
f.write(main_code)
# Test file
test_full = f"package main\n\nimport \"testing\"\n\n{test_code}\n"
with open(os.path.join(tmpdir, "main_test.go"), "w") as f:
f.write(test_full)
try:
# Build check
build = subprocess.run(
["go", "build", "."], cwd=tmpdir,
capture_output=True, text=True, timeout=timeout
)
if build.returncode != 0:
return False, False, build.stderr.strip()[-500:]
# Run tests
test = subprocess.run(
["go", "test", "-v", "."], cwd=tmpdir,
capture_output=True, text=True, timeout=timeout
)
if test.returncode == 0:
return True, True, ""
return True, False, (test.stdout + test.stderr).strip()[-500:]
except subprocess.TimeoutExpired:
return True, False, "TIMEOUT"
def run_bash(code: str, test_code: str, timeout: int = 10) -> tuple[bool, bool, str]:
"""Ejecuta código Bash + tests."""
full_code = code + "\n\n" + test_code
with tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False) as f:
f.write(full_code)
f.flush()
try:
result = subprocess.run(
["bash", f.name],
capture_output=True, text=True, timeout=timeout
)
if result.returncode == 0:
return True, True, ""
err = result.stderr.strip()
if "syntax error" in err.lower():
return False, False, err[-500:]
return True, False, (result.stdout + err)[-500:]
except subprocess.TimeoutExpired:
return True, False, "TIMEOUT"
finally:
os.unlink(f.name)
RUNNERS = {
"python": run_python,
"go": run_go,
"bash": run_bash,
}
# ── Challenges ────────────────────────────────────────────
CHALLENGES = [
# --- EASY ---
Challenge(
id="py_easy_1",
name="Fibonacci",
difficulty="easy",
language="python",
prompt="Write a Python function `fib(n: int) -> int` that returns the nth Fibonacci number (0-indexed). fib(0)=0, fib(1)=1, fib(10)=55.",
test_code="""
assert fib(0) == 0, f"fib(0)={fib(0)}"
assert fib(1) == 1, f"fib(1)={fib(1)}"
assert fib(10) == 55, f"fib(10)={fib(10)}"
assert fib(20) == 6765, f"fib(20)={fib(20)}"
print("PASS: fibonacci")
""",
),
Challenge(
id="py_easy_2",
name="Palindrome check",
difficulty="easy",
language="python",
prompt="Write a Python function `is_palindrome(s: str) -> bool` that checks if a string is a palindrome, ignoring case and non-alphanumeric characters. is_palindrome('A man, a plan, a canal: Panama') == True.",
test_code="""
assert is_palindrome("A man, a plan, a canal: Panama") == True
assert is_palindrome("racecar") == True
assert is_palindrome("hello") == False
assert is_palindrome("") == True
assert is_palindrome("Was it a car or a cat I saw?") == True
print("PASS: palindrome")
""",
),
Challenge(
id="py_easy_3",
name="FizzBuzz list",
difficulty="easy",
language="python",
prompt='Write a Python function `fizzbuzz(n: int) -> list[str]` that returns a list from 1 to n where multiples of 3 are "Fizz", multiples of 5 are "Buzz", multiples of both are "FizzBuzz", and others are the number as string.',
test_code="""
result = fizzbuzz(15)
assert result[0] == "1", f"got {result[0]}"
assert result[2] == "Fizz", f"got {result[2]}"
assert result[4] == "Buzz", f"got {result[4]}"
assert result[14] == "FizzBuzz", f"got {result[14]}"
assert len(result) == 15
print("PASS: fizzbuzz")
""",
),
# --- MEDIUM ---
Challenge(
id="py_med_1",
name="Two Sum",
difficulty="medium",
language="python",
prompt="Write a Python function `two_sum(nums: list[int], target: int) -> tuple[int, int]` that returns indices of two numbers that add up to target. Each input has exactly one solution. You may not use the same element twice. Return indices in ascending order.",
test_code="""
assert two_sum([2, 7, 11, 15], 9) == (0, 1)
assert two_sum([3, 2, 4], 6) == (1, 2)
assert two_sum([3, 3], 6) == (0, 1)
assert two_sum([1, 5, 3, 7], 8) == (1, 2) or two_sum([1, 5, 3, 7], 8) == (0, 3)
print("PASS: two_sum")
""",
),
Challenge(
id="py_med_2",
name="Matrix transpose",
difficulty="medium",
language="python",
prompt="Write a Python function `transpose(matrix: list[list[int]]) -> list[list[int]]` that transposes a matrix. Do NOT use numpy or zip.",
test_code="""
assert transpose([[1,2,3],[4,5,6]]) == [[1,4],[2,5],[3,6]]
assert transpose([[1]]) == [[1]]
assert transpose([[1,2],[3,4],[5,6]]) == [[1,3,5],[2,4,6]]
print("PASS: transpose")
""",
),
Challenge(
id="py_med_3",
name="Balanced parentheses",
difficulty="medium",
language="python",
prompt="Write a Python function `is_balanced(s: str) -> bool` that checks if a string has balanced parentheses, brackets, and braces. Only these characters matter: ()[]{}. Other characters should be ignored.",
test_code="""
assert is_balanced("()[]{}") == True
assert is_balanced("([{}])") == True
assert is_balanced("(]") == False
assert is_balanced("([)]") == False
assert is_balanced("hello (world) [test]") == True
assert is_balanced("{[}]") == False
assert is_balanced("") == True
print("PASS: balanced")
""",
),
Challenge(
id="py_med_4",
name="Group anagrams",
difficulty="medium",
language="python",
prompt='Write a Python function `group_anagrams(words: list[str]) -> list[list[str]]` that groups anagrams together. Each group should be sorted alphabetically, and the groups should be sorted by their first element.',
test_code="""
result = group_anagrams(["eat", "tea", "tan", "ate", "nat", "bat"])
# Sort each group and sort groups by first element for deterministic comparison
result = [sorted(g) for g in result]
result.sort(key=lambda g: g[0])
assert result == [["ate", "eat", "tea"], ["bat"], ["nat", "tan"]], f"got {result}"
print("PASS: group_anagrams")
""",
),
# --- HARD ---
Challenge(
id="py_hard_1",
name="LRU Cache",
difficulty="hard",
language="python",
prompt="""Write a Python class `LRUCache` with:
- `__init__(self, capacity: int)` - Initialize with positive capacity.
- `get(self, key: int) -> int` - Return value if key exists, else -1. Marks as recently used.
- `put(self, key: int, value: int) -> None` - Update or insert. If over capacity, evict least recently used.
Both get and put must run in O(1) average time. Do NOT use functools.lru_cache or collections.OrderedDict.""",
test_code="""
cache = LRUCache(2)
cache.put(1, 1)
cache.put(2, 2)
assert cache.get(1) == 1, f"got {cache.get(1)}"
cache.put(3, 3) # evicts key 2
assert cache.get(2) == -1, f"got {cache.get(2)}"
cache.put(4, 4) # evicts key 1
assert cache.get(1) == -1
assert cache.get(3) == 3
assert cache.get(4) == 4
# Test update
cache2 = LRUCache(2)
cache2.put(1, 10)
cache2.put(1, 20)
assert cache2.get(1) == 20
print("PASS: lru_cache")
""",
max_tokens=1500,
),
Challenge(
id="py_hard_2",
name="Merge intervals",
difficulty="hard",
language="python",
prompt="Write a Python function `merge_intervals(intervals: list[list[int]]) -> list[list[int]]` that merges all overlapping intervals and returns sorted non-overlapping intervals.",
test_code="""
assert merge_intervals([[1,3],[2,6],[8,10],[15,18]]) == [[1,6],[8,10],[15,18]]
assert merge_intervals([[1,4],[4,5]]) == [[1,5]]
assert merge_intervals([[1,4],[0,4]]) == [[0,4]]
assert merge_intervals([[1,4],[2,3]]) == [[1,4]]
assert merge_intervals([]) == []
assert merge_intervals([[1,1]]) == [[1,1]]
print("PASS: merge_intervals")
""",
),
Challenge(
id="py_hard_3",
name="Binary search tree iterator",
difficulty="hard",
language="python",
prompt="""Write Python classes:
1. `TreeNode` with attributes `val`, `left`, `right` (left and right default to None).
2. `BSTIterator` that takes a TreeNode root and implements in-order traversal:
- `has_next() -> bool` - returns True if there is a next element.
- `next_val() -> int` - returns the next smallest number.
Must use O(h) memory where h is tree height (not O(n)). Do not flatten the tree into a list.""",
test_code="""
# Build tree: 7
# / \\
# 3 15
# / \\
# 9 20
root = TreeNode(7, TreeNode(3), TreeNode(15, TreeNode(9), TreeNode(20)))
it = BSTIterator(root)
assert it.has_next() == True
assert it.next_val() == 3
assert it.next_val() == 7
assert it.has_next() == True
assert it.next_val() == 9
assert it.next_val() == 15
assert it.next_val() == 20
assert it.has_next() == False
print("PASS: bst_iterator")
""",
max_tokens=1500,
),
# --- GO ---
Challenge(
id="go_med_1",
name="Reverse words in string",
difficulty="medium",
language="go",
prompt='Write a Go function `ReverseWords(s string) string` that reverses the order of words in a string. Words are separated by spaces. Remove leading/trailing spaces and reduce multiple spaces to single. Example: " hello world " -> "world hello".',
test_code="""
func TestReverseWords(t *testing.T) {
cases := []struct{ in, want string }{
{"hello world", "world hello"},
{" hello world ", "world hello"},
{"a", "a"},
{" Bob Loves Alice ", "Alice Loves Bob"},
}
for _, c := range cases {
got := ReverseWords(c.in)
if got != c.want {
t.Errorf("ReverseWords(%q) = %q, want %q", c.in, got, c.want)
}
}
}
""",
),
# --- BASH ---
Challenge(
id="bash_easy_1",
name="Count lines in files",
difficulty="easy",
language="bash",
prompt='Write a Bash function `count_lines` that takes a filename as argument and prints the number of lines. If the file does not exist, print "ERROR: file not found" to stderr and return 1.',
test_code="""
# Test setup
tmpfile=$(mktemp)
echo -e "line1\\nline2\\nline3" > "$tmpfile"
result=$(count_lines "$tmpfile")
if [ "$result" != "3" ]; then
echo "FAIL: expected 3, got $result"
exit 1
fi
# Test missing file
if count_lines "/nonexistent/file" 2>/dev/null; then
echo "FAIL: should return non-zero for missing file"
exit 1
fi
rm -f "$tmpfile"
echo "PASS: count_lines"
""",
),
]
# ── Evaluator ─────────────────────────────────────────────
def evaluate_model(model: str, challenges: list[Challenge]) -> list[Result]:
"""Evalúa un modelo contra todos los challenges."""
results = []
for ch in challenges:
print(f" [{ch.id}] {ch.name} ({ch.difficulty})...", end=" ", flush=True)
try:
t0 = time.time()
resp = query_model(model, ch.prompt, ch.max_tokens)
latency = (time.time() - t0) * 1000
content = resp["choices"][0]["message"]["content"]
reasoning = resp["choices"][0]["message"].get("reasoning_content", "")
usage = resp.get("usage", {})
tokens = usage.get("total_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
prompt_tokens = usage.get("prompt_tokens", 0)
reasoning_tokens = usage.get("completion_tokens_details", {}).get("reasoning_tokens", 0)
# Velocidad: tokens de completion / tiempo (excluir prompt processing)
tps = (completion_tokens / (latency / 1000)) if latency > 0 else 0
code = extract_code(content, ch.language)
runner = RUNNERS.get(ch.language)
if runner:
compiled, passed, error = runner(code, ch.test_code)
else:
compiled, passed, error = False, False, f"No runner for {ch.language}"
status = "PASS" if passed else ("COMPILE_ERR" if not compiled else "FAIL")
print(f"{status} ({latency:.0f}ms, {completion_tokens}tok, {tps:.1f} tok/s)")
results.append(Result(
model=model,
challenge_id=ch.id,
raw_response=content,
extracted_code=code,
compiled=compiled,
tests_passed=passed,
error=error,
latency_ms=latency,
tokens_used=tokens,
reasoning_tokens=reasoning_tokens,
completion_tokens=completion_tokens,
prompt_tokens=prompt_tokens,
tokens_per_second=tps,
))
except Exception as e:
print(f"ERROR: {e}")
results.append(Result(
model=model,
challenge_id=ch.id,
raw_response="",
extracted_code="",
compiled=False,
tests_passed=False,
error=str(e),
latency_ms=0,
tokens_used=0,
))
return results
def print_summary(all_results: list[Result], challenges: list[Challenge]):
"""Imprime tabla resumen."""
ch_map = {c.id: c for c in challenges}
models = sorted(set(r.model for r in all_results))
# Header
print("\n" + "=" * 90)
print("RESULTADOS - EVALUACIÓN DE CODING")
print("=" * 90)
# Per-model summary
for model in models:
model_results = [r for r in all_results if r.model == model]
passed = sum(1 for r in model_results if r.tests_passed)
compiled = sum(1 for r in model_results if r.compiled)
total = len(model_results)
avg_latency = sum(r.latency_ms for r in model_results) / max(total, 1)
avg_tokens = sum(r.tokens_used for r in model_results) / max(total, 1)
avg_tps = sum(r.tokens_per_second for r in model_results) / max(total, 1)
total_reasoning = sum(r.reasoning_tokens for r in model_results)
print(f"\n{'' * 100}")
print(f" {model}")
print(f" Tests passed: {passed}/{total} ({100*passed/total:.0f}%) | "
f"Compiled: {compiled}/{total} | "
f"Avg latency: {avg_latency:.0f}ms | Avg speed: {avg_tps:.1f} tok/s")
if total_reasoning > 0:
print(f" Reasoning tokens total: {total_reasoning}")
print(f"{'' * 100}")
for diff in ["easy", "medium", "hard"]:
diff_results = [r for r in model_results if ch_map[r.challenge_id].difficulty == diff]
if not diff_results:
continue
dp = sum(1 for r in diff_results if r.tests_passed)
print(f" {diff.upper():8s} {dp}/{len(diff_results)} passed")
for r in diff_results:
ch = ch_map[r.challenge_id]
icon = "" if r.tests_passed else ("✗ compile" if not r.compiled else "✗ test")
err_hint = f" [{r.error[:60]}]" if r.error else ""
reason = f" (R:{r.reasoning_tokens})" if r.reasoning_tokens > 0 else ""
print(f" {icon:12s} {ch.name:30s} {r.latency_ms:6.0f}ms {r.completion_tokens:4d}tok {r.tokens_per_second:5.1f}t/s{reason}{err_hint}")
# Comparison table
print(f"\n{'=' * 90}")
print("COMPARATIVA")
print(f"{'=' * 90}")
header = f"{'Challenge':35s}"
for m in models:
short = m.split("/")[-1][:15]
header += f" {short:>15s}"
print(header)
print("" * (35 + 16 * len(models)))
for ch in challenges:
row = f"{ch.name + ' (' + ch.difficulty[0] + ')':35s}"
for m in models:
r = next((r for r in all_results if r.model == m and r.challenge_id == ch.id), None)
if r and r.tests_passed:
row += f" {'PASS':>15s}"
elif r and r.compiled:
row += f" {'FAIL':>15s}"
elif r:
row += f" {'ERR':>15s}"
else:
row += f" {'---':>15s}"
print(row)
# Speed comparison
print(f"\n{'=' * 90}")
print("VELOCIDAD (tokens/segundo)")
print(f"{'=' * 90}")
header = f"{'Model':35s} {'Avg tok/s':>10s} {'Min tok/s':>10s} {'Max tok/s':>10s} {'Avg ms':>10s}"
print(header)
print("" * 75)
for m in models:
mrs = [r for r in all_results if r.model == m]
if not mrs:
continue
avg_tps = sum(r.tokens_per_second for r in mrs) / len(mrs)
min_tps = min(r.tokens_per_second for r in mrs)
max_tps = max(r.tokens_per_second for r in mrs)
avg_ms = sum(r.latency_ms for r in mrs) / len(mrs)
short = m.split("/")[-1]
print(f"{short:35s} {avg_tps:10.1f} {min_tps:10.1f} {max_tps:10.1f} {avg_ms:10.0f}")
# ── Main ──────────────────────────────────────────────────
def run_eval(models: list[str] = None, difficulties: list[str] = None):
"""Ejecuta la evaluación completa."""
models = models or MODELS
challenges = CHALLENGES
if difficulties:
challenges = [c for c in challenges if c.difficulty in difficulties]
print(f"Evaluando {len(models)} modelos con {len(challenges)} challenges\n")
all_results = []
for model in models:
print(f"\n{'' * 60}")
print(f" MODELO: {model}")
print(f"{'' * 60}")
results = evaluate_model(model, challenges)
all_results.extend(results)
print_summary(all_results, challenges)
return all_results
if __name__ == "__main__":
import sys
# Filtrar modelos por argumento si se pasa
models = None
if len(sys.argv) > 1:
models = [m for m in MODELS if any(arg in m for arg in sys.argv[1:])]
run_eval(models=models)
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+16
View File
@@ -0,0 +1,16 @@
[project]
name = "agent-coding-eval"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"jupyter>=1.1.1",
"jupyter-collaboration>=4.3.0",
"jupyter-mcp-server>=0.4.0",
"jupyterlab>=4.5.6",
"matplotlib>=3.10.8",
"numpy>=2.4.4",
"pandas>=3.0.2",
"requests>=2.33.1",
]
+45
View File
@@ -0,0 +1,45 @@
#!/bin/bash
# Jupyter Lab — modo colaborativo con autodeteccion de puerto
# Generado por write_jupyter_launcher (fn_registry)
find_free_port() {
for port in 8888 8889 8890 8891 8892 8893 8894 8895 8896 8897 8898 8899; do
if ! ss -tln 2>/dev/null | grep -q ":${port} " && \
! lsof -i:"$port" >/dev/null 2>&1; then
echo $port
return
fi
done
echo 8888
}
PORT=${1:-$(find_free_port)}
cd "$(dirname "$0")"
echo $PORT > .jupyter-port
source .venv/bin/activate 2>/dev/null || true
if ! python -c "import jupyter_collaboration" 2>/dev/null; then
echo "ERROR: jupyter-collaboration no esta instalado"
echo "Instala con: uv add jupyter-collaboration"
exit 1
fi
echo "════════════════════════════════════════════════"
echo " Jupyter Lab + Colaboracion en puerto $PORT"
echo "════════════════════════════════════════════════"
echo ""
echo " Abre: http://localhost:$PORT"
echo " Ctrl+C para detener"
echo ""
jupyter lab \
--port=$PORT \
--no-browser \
--ServerApp.token='' \
--ServerApp.password='' \
--ServerApp.disable_check_xsrf=True \
--ServerApp.allow_origin='*' \
--ServerApp.root_dir="$(pwd)" \
--collaborative
Generated
+2551
View File
File diff suppressed because it is too large Load Diff