Files
agent_coding_eval/notebooks/01_coding_eval.py
T
fn-registry agent f60da6fa6f chore: initial sync
2026-04-28 22:13:07 +02:00

622 lines
23 KiB
Python

"""
Agent Coding Evaluation - Script de evaluación de modelos locales
Evalúa capacidades de programación de modelos LLM locales via LM Studio API.
Modelos disponibles:
- qwen/qwen3-coder-next (especializado en código)
- qwen/qwen3.5-9b (general)
- nvidia/nemotron-3-nano-4b (pequeño)
- bitnet-b1.58-2b-4t (ultra-ligero)
"""
import requests
import json
import time
import re
import subprocess
import tempfile
import os
import traceback
from dataclasses import dataclass, field
from typing import Optional
# ── Config ────────────────────────────────────────────────
API_BASE = "http://127.0.0.1:1234/v1"
MODELS = [
"qwen/qwen3-coder-next",
"qwen/qwen3.5-9b",
"nvidia/nemotron-3-nano-4b",
"bitnet-b1.58-2b-4t",
]
# ── Tipos ─────────────────────────────────────────────────
@dataclass
class Challenge:
id: str
name: str
difficulty: str # easy, medium, hard
language: str # python, go, bash
prompt: str
test_code: str # código que valida la respuesta
max_tokens: int = 1024
@dataclass
class Result:
model: str
challenge_id: str
raw_response: str
extracted_code: str
compiled: bool
tests_passed: bool
error: str
latency_ms: float
tokens_used: int
reasoning_tokens: int = 0
completion_tokens: int = 0
prompt_tokens: int = 0
tokens_per_second: float = 0.0 # completion tokens / latency
# ── Helpers ───────────────────────────────────────────────
def query_model(model: str, prompt: str, max_tokens: int = 1024, temperature: float = 0) -> dict:
"""Consulta un modelo via OpenAI-compatible API."""
resp = requests.post(f"{API_BASE}/chat/completions", json={
"model": model,
"messages": [
{"role": "system", "content": "You are a coding assistant. Return ONLY code inside a single code block. No explanations."},
{"role": "user", "content": prompt},
],
"max_tokens": max_tokens,
"temperature": temperature,
}, timeout=120)
resp.raise_for_status()
return resp.json()
def extract_code(text: str, language: str = "python") -> str:
"""Extrae código de un bloque markdown."""
# Buscar bloque con lenguaje específico
patterns = [
rf"```{language}\s*\n(.*?)```",
r"```\s*\n(.*?)```",
rf"```{language}\s*\n(.*?)$",
]
for pat in patterns:
m = re.search(pat, text, re.DOTALL)
if m:
return m.group(1).strip()
# Si no hay bloque, asumir que todo es código
return text.strip()
def run_python(code: str, test_code: str, timeout: int = 10) -> tuple[bool, bool, str]:
"""Ejecuta código Python + tests. Retorna (compiled, tests_passed, error)."""
full_code = code + "\n\n" + test_code
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(full_code)
f.flush()
try:
result = subprocess.run(
["python3", f.name],
capture_output=True, text=True, timeout=timeout
)
if result.returncode == 0:
return True, True, ""
# Distinguir error de compilación vs test
err = result.stderr.strip()
if "SyntaxError" in err or "IndentationError" in err:
return False, False, err[-500:]
return True, False, err[-500:]
except subprocess.TimeoutExpired:
return True, False, "TIMEOUT"
finally:
os.unlink(f.name)
def run_go(code: str, test_code: str, timeout: int = 15) -> tuple[bool, bool, str]:
"""Ejecuta código Go + tests. Auto-detecta imports faltantes."""
with tempfile.TemporaryDirectory() as tmpdir:
# Init module
subprocess.run(["go", "mod", "init", "eval"], cwd=tmpdir, capture_output=True)
# Auto-detect needed stdlib imports from code
stdlib_hints = {
"strings.": "strings", "fmt.": "fmt", "strconv.": "strconv",
"sort.": "sort", "math.": "math", "regexp.": "regexp",
"io.": "io", "os.": "os", "sync.": "sync",
}
needed = set()
for hint, pkg in stdlib_hints.items():
if hint in code:
needed.add(pkg)
# Only add imports if code doesn't already have an import block
import_block = ""
if needed and "import" not in code:
imports = "\n".join(f'\t"{p}"' for p in sorted(needed))
import_block = f"import (\n{imports}\n)\n\n"
main_code = f"package main\n\n{import_block}{code}\n"
with open(os.path.join(tmpdir, "main.go"), "w") as f:
f.write(main_code)
# Test file
test_full = f"package main\n\nimport \"testing\"\n\n{test_code}\n"
with open(os.path.join(tmpdir, "main_test.go"), "w") as f:
f.write(test_full)
try:
# Build check
build = subprocess.run(
["go", "build", "."], cwd=tmpdir,
capture_output=True, text=True, timeout=timeout
)
if build.returncode != 0:
return False, False, build.stderr.strip()[-500:]
# Run tests
test = subprocess.run(
["go", "test", "-v", "."], cwd=tmpdir,
capture_output=True, text=True, timeout=timeout
)
if test.returncode == 0:
return True, True, ""
return True, False, (test.stdout + test.stderr).strip()[-500:]
except subprocess.TimeoutExpired:
return True, False, "TIMEOUT"
def run_bash(code: str, test_code: str, timeout: int = 10) -> tuple[bool, bool, str]:
"""Ejecuta código Bash + tests."""
full_code = code + "\n\n" + test_code
with tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False) as f:
f.write(full_code)
f.flush()
try:
result = subprocess.run(
["bash", f.name],
capture_output=True, text=True, timeout=timeout
)
if result.returncode == 0:
return True, True, ""
err = result.stderr.strip()
if "syntax error" in err.lower():
return False, False, err[-500:]
return True, False, (result.stdout + err)[-500:]
except subprocess.TimeoutExpired:
return True, False, "TIMEOUT"
finally:
os.unlink(f.name)
RUNNERS = {
"python": run_python,
"go": run_go,
"bash": run_bash,
}
# ── Challenges ────────────────────────────────────────────
CHALLENGES = [
# --- EASY ---
Challenge(
id="py_easy_1",
name="Fibonacci",
difficulty="easy",
language="python",
prompt="Write a Python function `fib(n: int) -> int` that returns the nth Fibonacci number (0-indexed). fib(0)=0, fib(1)=1, fib(10)=55.",
test_code="""
assert fib(0) == 0, f"fib(0)={fib(0)}"
assert fib(1) == 1, f"fib(1)={fib(1)}"
assert fib(10) == 55, f"fib(10)={fib(10)}"
assert fib(20) == 6765, f"fib(20)={fib(20)}"
print("PASS: fibonacci")
""",
),
Challenge(
id="py_easy_2",
name="Palindrome check",
difficulty="easy",
language="python",
prompt="Write a Python function `is_palindrome(s: str) -> bool` that checks if a string is a palindrome, ignoring case and non-alphanumeric characters. is_palindrome('A man, a plan, a canal: Panama') == True.",
test_code="""
assert is_palindrome("A man, a plan, a canal: Panama") == True
assert is_palindrome("racecar") == True
assert is_palindrome("hello") == False
assert is_palindrome("") == True
assert is_palindrome("Was it a car or a cat I saw?") == True
print("PASS: palindrome")
""",
),
Challenge(
id="py_easy_3",
name="FizzBuzz list",
difficulty="easy",
language="python",
prompt='Write a Python function `fizzbuzz(n: int) -> list[str]` that returns a list from 1 to n where multiples of 3 are "Fizz", multiples of 5 are "Buzz", multiples of both are "FizzBuzz", and others are the number as string.',
test_code="""
result = fizzbuzz(15)
assert result[0] == "1", f"got {result[0]}"
assert result[2] == "Fizz", f"got {result[2]}"
assert result[4] == "Buzz", f"got {result[4]}"
assert result[14] == "FizzBuzz", f"got {result[14]}"
assert len(result) == 15
print("PASS: fizzbuzz")
""",
),
# --- MEDIUM ---
Challenge(
id="py_med_1",
name="Two Sum",
difficulty="medium",
language="python",
prompt="Write a Python function `two_sum(nums: list[int], target: int) -> tuple[int, int]` that returns indices of two numbers that add up to target. Each input has exactly one solution. You may not use the same element twice. Return indices in ascending order.",
test_code="""
assert two_sum([2, 7, 11, 15], 9) == (0, 1)
assert two_sum([3, 2, 4], 6) == (1, 2)
assert two_sum([3, 3], 6) == (0, 1)
assert two_sum([1, 5, 3, 7], 8) == (1, 2) or two_sum([1, 5, 3, 7], 8) == (0, 3)
print("PASS: two_sum")
""",
),
Challenge(
id="py_med_2",
name="Matrix transpose",
difficulty="medium",
language="python",
prompt="Write a Python function `transpose(matrix: list[list[int]]) -> list[list[int]]` that transposes a matrix. Do NOT use numpy or zip.",
test_code="""
assert transpose([[1,2,3],[4,5,6]]) == [[1,4],[2,5],[3,6]]
assert transpose([[1]]) == [[1]]
assert transpose([[1,2],[3,4],[5,6]]) == [[1,3,5],[2,4,6]]
print("PASS: transpose")
""",
),
Challenge(
id="py_med_3",
name="Balanced parentheses",
difficulty="medium",
language="python",
prompt="Write a Python function `is_balanced(s: str) -> bool` that checks if a string has balanced parentheses, brackets, and braces. Only these characters matter: ()[]{}. Other characters should be ignored.",
test_code="""
assert is_balanced("()[]{}") == True
assert is_balanced("([{}])") == True
assert is_balanced("(]") == False
assert is_balanced("([)]") == False
assert is_balanced("hello (world) [test]") == True
assert is_balanced("{[}]") == False
assert is_balanced("") == True
print("PASS: balanced")
""",
),
Challenge(
id="py_med_4",
name="Group anagrams",
difficulty="medium",
language="python",
prompt='Write a Python function `group_anagrams(words: list[str]) -> list[list[str]]` that groups anagrams together. Each group should be sorted alphabetically, and the groups should be sorted by their first element.',
test_code="""
result = group_anagrams(["eat", "tea", "tan", "ate", "nat", "bat"])
# Sort each group and sort groups by first element for deterministic comparison
result = [sorted(g) for g in result]
result.sort(key=lambda g: g[0])
assert result == [["ate", "eat", "tea"], ["bat"], ["nat", "tan"]], f"got {result}"
print("PASS: group_anagrams")
""",
),
# --- HARD ---
Challenge(
id="py_hard_1",
name="LRU Cache",
difficulty="hard",
language="python",
prompt="""Write a Python class `LRUCache` with:
- `__init__(self, capacity: int)` - Initialize with positive capacity.
- `get(self, key: int) -> int` - Return value if key exists, else -1. Marks as recently used.
- `put(self, key: int, value: int) -> None` - Update or insert. If over capacity, evict least recently used.
Both get and put must run in O(1) average time. Do NOT use functools.lru_cache or collections.OrderedDict.""",
test_code="""
cache = LRUCache(2)
cache.put(1, 1)
cache.put(2, 2)
assert cache.get(1) == 1, f"got {cache.get(1)}"
cache.put(3, 3) # evicts key 2
assert cache.get(2) == -1, f"got {cache.get(2)}"
cache.put(4, 4) # evicts key 1
assert cache.get(1) == -1
assert cache.get(3) == 3
assert cache.get(4) == 4
# Test update
cache2 = LRUCache(2)
cache2.put(1, 10)
cache2.put(1, 20)
assert cache2.get(1) == 20
print("PASS: lru_cache")
""",
max_tokens=1500,
),
Challenge(
id="py_hard_2",
name="Merge intervals",
difficulty="hard",
language="python",
prompt="Write a Python function `merge_intervals(intervals: list[list[int]]) -> list[list[int]]` that merges all overlapping intervals and returns sorted non-overlapping intervals.",
test_code="""
assert merge_intervals([[1,3],[2,6],[8,10],[15,18]]) == [[1,6],[8,10],[15,18]]
assert merge_intervals([[1,4],[4,5]]) == [[1,5]]
assert merge_intervals([[1,4],[0,4]]) == [[0,4]]
assert merge_intervals([[1,4],[2,3]]) == [[1,4]]
assert merge_intervals([]) == []
assert merge_intervals([[1,1]]) == [[1,1]]
print("PASS: merge_intervals")
""",
),
Challenge(
id="py_hard_3",
name="Binary search tree iterator",
difficulty="hard",
language="python",
prompt="""Write Python classes:
1. `TreeNode` with attributes `val`, `left`, `right` (left and right default to None).
2. `BSTIterator` that takes a TreeNode root and implements in-order traversal:
- `has_next() -> bool` - returns True if there is a next element.
- `next_val() -> int` - returns the next smallest number.
Must use O(h) memory where h is tree height (not O(n)). Do not flatten the tree into a list.""",
test_code="""
# Build tree: 7
# / \\
# 3 15
# / \\
# 9 20
root = TreeNode(7, TreeNode(3), TreeNode(15, TreeNode(9), TreeNode(20)))
it = BSTIterator(root)
assert it.has_next() == True
assert it.next_val() == 3
assert it.next_val() == 7
assert it.has_next() == True
assert it.next_val() == 9
assert it.next_val() == 15
assert it.next_val() == 20
assert it.has_next() == False
print("PASS: bst_iterator")
""",
max_tokens=1500,
),
# --- GO ---
Challenge(
id="go_med_1",
name="Reverse words in string",
difficulty="medium",
language="go",
prompt='Write a Go function `ReverseWords(s string) string` that reverses the order of words in a string. Words are separated by spaces. Remove leading/trailing spaces and reduce multiple spaces to single. Example: " hello world " -> "world hello".',
test_code="""
func TestReverseWords(t *testing.T) {
cases := []struct{ in, want string }{
{"hello world", "world hello"},
{" hello world ", "world hello"},
{"a", "a"},
{" Bob Loves Alice ", "Alice Loves Bob"},
}
for _, c := range cases {
got := ReverseWords(c.in)
if got != c.want {
t.Errorf("ReverseWords(%q) = %q, want %q", c.in, got, c.want)
}
}
}
""",
),
# --- BASH ---
Challenge(
id="bash_easy_1",
name="Count lines in files",
difficulty="easy",
language="bash",
prompt='Write a Bash function `count_lines` that takes a filename as argument and prints the number of lines. If the file does not exist, print "ERROR: file not found" to stderr and return 1.',
test_code="""
# Test setup
tmpfile=$(mktemp)
echo -e "line1\\nline2\\nline3" > "$tmpfile"
result=$(count_lines "$tmpfile")
if [ "$result" != "3" ]; then
echo "FAIL: expected 3, got $result"
exit 1
fi
# Test missing file
if count_lines "/nonexistent/file" 2>/dev/null; then
echo "FAIL: should return non-zero for missing file"
exit 1
fi
rm -f "$tmpfile"
echo "PASS: count_lines"
""",
),
]
# ── Evaluator ─────────────────────────────────────────────
def evaluate_model(model: str, challenges: list[Challenge]) -> list[Result]:
"""Evalúa un modelo contra todos los challenges."""
results = []
for ch in challenges:
print(f" [{ch.id}] {ch.name} ({ch.difficulty})...", end=" ", flush=True)
try:
t0 = time.time()
resp = query_model(model, ch.prompt, ch.max_tokens)
latency = (time.time() - t0) * 1000
content = resp["choices"][0]["message"]["content"]
reasoning = resp["choices"][0]["message"].get("reasoning_content", "")
usage = resp.get("usage", {})
tokens = usage.get("total_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
prompt_tokens = usage.get("prompt_tokens", 0)
reasoning_tokens = usage.get("completion_tokens_details", {}).get("reasoning_tokens", 0)
# Velocidad: tokens de completion / tiempo (excluir prompt processing)
tps = (completion_tokens / (latency / 1000)) if latency > 0 else 0
code = extract_code(content, ch.language)
runner = RUNNERS.get(ch.language)
if runner:
compiled, passed, error = runner(code, ch.test_code)
else:
compiled, passed, error = False, False, f"No runner for {ch.language}"
status = "PASS" if passed else ("COMPILE_ERR" if not compiled else "FAIL")
print(f"{status} ({latency:.0f}ms, {completion_tokens}tok, {tps:.1f} tok/s)")
results.append(Result(
model=model,
challenge_id=ch.id,
raw_response=content,
extracted_code=code,
compiled=compiled,
tests_passed=passed,
error=error,
latency_ms=latency,
tokens_used=tokens,
reasoning_tokens=reasoning_tokens,
completion_tokens=completion_tokens,
prompt_tokens=prompt_tokens,
tokens_per_second=tps,
))
except Exception as e:
print(f"ERROR: {e}")
results.append(Result(
model=model,
challenge_id=ch.id,
raw_response="",
extracted_code="",
compiled=False,
tests_passed=False,
error=str(e),
latency_ms=0,
tokens_used=0,
))
return results
def print_summary(all_results: list[Result], challenges: list[Challenge]):
"""Imprime tabla resumen."""
ch_map = {c.id: c for c in challenges}
models = sorted(set(r.model for r in all_results))
# Header
print("\n" + "=" * 90)
print("RESULTADOS - EVALUACIÓN DE CODING")
print("=" * 90)
# Per-model summary
for model in models:
model_results = [r for r in all_results if r.model == model]
passed = sum(1 for r in model_results if r.tests_passed)
compiled = sum(1 for r in model_results if r.compiled)
total = len(model_results)
avg_latency = sum(r.latency_ms for r in model_results) / max(total, 1)
avg_tokens = sum(r.tokens_used for r in model_results) / max(total, 1)
avg_tps = sum(r.tokens_per_second for r in model_results) / max(total, 1)
total_reasoning = sum(r.reasoning_tokens for r in model_results)
print(f"\n{'' * 100}")
print(f" {model}")
print(f" Tests passed: {passed}/{total} ({100*passed/total:.0f}%) | "
f"Compiled: {compiled}/{total} | "
f"Avg latency: {avg_latency:.0f}ms | Avg speed: {avg_tps:.1f} tok/s")
if total_reasoning > 0:
print(f" Reasoning tokens total: {total_reasoning}")
print(f"{'' * 100}")
for diff in ["easy", "medium", "hard"]:
diff_results = [r for r in model_results if ch_map[r.challenge_id].difficulty == diff]
if not diff_results:
continue
dp = sum(1 for r in diff_results if r.tests_passed)
print(f" {diff.upper():8s} {dp}/{len(diff_results)} passed")
for r in diff_results:
ch = ch_map[r.challenge_id]
icon = "" if r.tests_passed else ("✗ compile" if not r.compiled else "✗ test")
err_hint = f" [{r.error[:60]}]" if r.error else ""
reason = f" (R:{r.reasoning_tokens})" if r.reasoning_tokens > 0 else ""
print(f" {icon:12s} {ch.name:30s} {r.latency_ms:6.0f}ms {r.completion_tokens:4d}tok {r.tokens_per_second:5.1f}t/s{reason}{err_hint}")
# Comparison table
print(f"\n{'=' * 90}")
print("COMPARATIVA")
print(f"{'=' * 90}")
header = f"{'Challenge':35s}"
for m in models:
short = m.split("/")[-1][:15]
header += f" {short:>15s}"
print(header)
print("" * (35 + 16 * len(models)))
for ch in challenges:
row = f"{ch.name + ' (' + ch.difficulty[0] + ')':35s}"
for m in models:
r = next((r for r in all_results if r.model == m and r.challenge_id == ch.id), None)
if r and r.tests_passed:
row += f" {'PASS':>15s}"
elif r and r.compiled:
row += f" {'FAIL':>15s}"
elif r:
row += f" {'ERR':>15s}"
else:
row += f" {'---':>15s}"
print(row)
# Speed comparison
print(f"\n{'=' * 90}")
print("VELOCIDAD (tokens/segundo)")
print(f"{'=' * 90}")
header = f"{'Model':35s} {'Avg tok/s':>10s} {'Min tok/s':>10s} {'Max tok/s':>10s} {'Avg ms':>10s}"
print(header)
print("" * 75)
for m in models:
mrs = [r for r in all_results if r.model == m]
if not mrs:
continue
avg_tps = sum(r.tokens_per_second for r in mrs) / len(mrs)
min_tps = min(r.tokens_per_second for r in mrs)
max_tps = max(r.tokens_per_second for r in mrs)
avg_ms = sum(r.latency_ms for r in mrs) / len(mrs)
short = m.split("/")[-1]
print(f"{short:35s} {avg_tps:10.1f} {min_tps:10.1f} {max_tps:10.1f} {avg_ms:10.0f}")
# ── Main ──────────────────────────────────────────────────
def run_eval(models: list[str] = None, difficulties: list[str] = None):
"""Ejecuta la evaluación completa."""
models = models or MODELS
challenges = CHALLENGES
if difficulties:
challenges = [c for c in challenges if c.difficulty in difficulties]
print(f"Evaluando {len(models)} modelos con {len(challenges)} challenges\n")
all_results = []
for model in models:
print(f"\n{'' * 60}")
print(f" MODELO: {model}")
print(f"{'' * 60}")
results = evaluate_model(model, challenges)
all_results.extend(results)
print_summary(all_results, challenges)
return all_results
if __name__ == "__main__":
import sys
# Filtrar modelos por argumento si se pasa
models = None
if len(sys.argv) > 1:
models = [m for m in MODELS if any(arg in m for arg in sys.argv[1:])]
run_eval(models=models)