diff --git a/.claude/rules/INDEX.md b/.claude/rules/INDEX.md index e3c5144a..18b83247 100644 --- a/.claude/rules/INDEX.md +++ b/.claude/rules/INDEX.md @@ -39,3 +39,4 @@ Reglas operativas del proyecto. Cada archivo es una regla independiente. | 32 | [../../dev/TAXONOMY.md](../../dev/TAXONOMY.md) | Allowlist canonica para dominios/tipos/scopes/estados/prioridades + flow patterns. Aplica a `dev/issues/` y `dev/flows/`. Issues 0100 + 0103 | | 33 | [project_commands.md](project_commands.md) | Slash commands por project (`.claude/commands//`) expuestos via symlink. Desde fn_registry: `/:foo`. Desde el project: `/foo`. Sin colision. | | 34 | [dod_quality.md](dod_quality.md) | DoD Quality Triada: Mecanica + Cobertura (golden + edge + error path con evidencia ejecutable) + Vida util validada (>=7 dias uso real). Cierra anti-criterios contra checkbox vago. Aplica a `dev/flows/` y issues user-facing. | +| 35 | [llm_invocation.md](llm_invocation.md) | Invocacion de LLM: SIEMPRE `ask_llm` (grupo `claude-direct`, API directa, arranque 0), NUNCA `claude -p` (lento, cold start). One-shot/streaming/tool-loop + legacy `claude_stream_go_core` deprecado. | diff --git a/.claude/rules/llm_invocation.md b/.claude/rules/llm_invocation.md new file mode 100644 index 00000000..4f064644 --- /dev/null +++ b/.claude/rules/llm_invocation.md @@ -0,0 +1,50 @@ +## Invocación de LLM: SIEMPRE `ask_llm`, NUNCA `claude -p` + +**REGLA DURA.** Para ejecutar un modelo LLM desde cualquier código del ecosistema (scripts, heredocs, apps, pipelines, agentes), usa el grupo `claude-direct` — empezando por `ask_llm_py_core`. **NUNCA** uses `claude -p` ni lances el binario `claude` como subproceso para obtener una respuesta del modelo. + +### Por qué + +| | `claude -p` | `ask_llm` / `claude-direct` | +|---|---|---| +| Mecanismo | Lanza Claude Code entero (proceso `claude`) | Habla directo a `api.anthropic.com/v1/messages` | +| Arranque | ~7-15s (carga MCP + `CLAUDE.md` ~100k tokens) | **0 — request HTTP directa** | +| Latencia/msg | ~9-15s | **~2.5s** | +| Coste | Alto (re-carga contexto cada vez) | Mínimo (solo tu prompt) | +| Tools | Las de Claude Code (no controlables) | **Las que tú defines** (`run_claude_tool_loop`) | +| Streaming | indirecto | nativo (`stream_anthropic_messages`) | + +`claude -p` es lento, caro y arranca todo Claude Code para una completion. `ask_llm` es la API directa: arranque 0, rápido, con tus propias tools. Usa el token OAuth que Claude Code ya guarda en `~/.claude/.credentials.json`. + +### Cómo (según el caso) + +| Caso | Usa | +|---|---| +| Pregunta/chat one-shot | `fn run ask_llm "..."` o `from core.ask_llm import ask_llm` | +| Streaming de eventos crudos (text/tool_use deltas) | `stream_anthropic_messages_py_core` | +| Agente con TUS tools (tool-use loop) | `run_claude_tool_loop_py_core` (defines `tools` + `dispatch`) | +| Token OAuth | `load_claude_oauth_token_py_core` (automático dentro de las anteriores) | +| Distribuir fuera del registry | `apps/llm_cli/llm.py` (versión standalone autocontenida) | + +```python +import sys, os +sys.path.insert(0, os.path.join("python", "functions")) +from core.ask_llm import ask_llm +respuesta = ask_llm("resume esto en 3 lineas: ...", model="claude-haiku-4-5-20251001", echo=False) +``` + +### Legacy + +`claude_stream_go_core` (lanza `claude -p --output-format stream-json`) es el **camino antiguo**. No usarlo en código nuevo — preferir las funciones `claude-direct`. Queda solo para compatibilidad de consumidores existentes. + +### Excepción acotada + +Si una tarea necesita **genuinamente las capacidades de Claude Code** (sus tools nativas, los MCP del repo, plan mode, el contexto del proyecto) y no basta con el modelo + tus propias tools via `run_claude_tool_loop`, entonces NO es una "invocación LLM" simple: documenta por qué en el código. El **default sin excepción es `ask_llm`**. + +### Telemetría / auditoría + +Un `claude -p` o un `subprocess(["claude", "-p", ...])` en código nuevo es un antipatrón auditable: sustituir por `ask_llm` / `claude-direct`. Buscar usos: `grep -rn 'claude -p' --include='*.py' --include='*.sh' --include='*.go'`. + +### Relación con otras reglas + +- [[registry_calls]] — patrones canónicos de invocación de funciones; esta regla fija el patrón para la sub-tarea "invocar un LLM". +- [[registry_first]] — reusar antes que reescribir; `ask_llm` es la función reutilizable para LLM. diff --git a/docs/capabilities/INDEX.md b/docs/capabilities/INDEX.md index 4451e000..a5127338 100644 --- a/docs/capabilities/INDEX.md +++ b/docs/capabilities/INDEX.md @@ -46,6 +46,7 @@ Indice de grupos de capacidades del registry. Cada grupo agrupa >=3 funciones qu | [matrix-mas](matrix-mas.md) | 5 | Migración Synapse→MAS: habilitar MSC3861, verificar login flows, parche .well-known, registro clientes OAuth2, syn2mas | | [mesh-3d](mesh-3d.md) | 3 | Carga y upload a GPU de meshes 3D (OBJ, GLB/glTF 2.0): loaders CPU + mesh_gpu_upload OpenGL | | [terminal-capture](terminal-capture.md) | 6 | Automatizar y capturar el texto de una CLI/TUI interactiva via PTY headless: spawn+input scripteado (one-shot y streaming), render del layout 2D (emulador VT), strip ANSI, delta por prefijo, y parseo de la TUI de claude a datos | +| [claude-direct](claude-direct.md) | 3 | Hablar directamente con la API de Anthropic Messages usando el token OAuth de Claude Code (Claude Max): leer token, stream SSE, bucle agentico de tool-use | ## Como anadir grupo diff --git a/docs/capabilities/claude-direct.md b/docs/capabilities/claude-direct.md new file mode 100644 index 00000000..c8a04633 --- /dev/null +++ b/docs/capabilities/claude-direct.md @@ -0,0 +1,91 @@ +# Capability: claude-direct + +Hablar directamente con `https://api.anthropic.com/v1/messages` usando el token OAuth de Claude Code (Claude Max), sin lanzar la CLI `claude` ni necesitar una API key de pago separada. 3 funciones Python en `domain: core`. + +## Funciones + +| ID | Firma | Que hace | +|---|---|---| +| `load_claude_oauth_token_py_core` | `def load_claude_oauth_token(credentials_path: str = "", refresh_if_expired: bool = True) -> str` | Lee el access token OAuth desde `~/.claude/.credentials.json`. Verifica expiry (ms-epoch). Intenta refresh best-effort si expirado. | +| `stream_anthropic_messages_py_core` | `def stream_anthropic_messages(messages: list, model: str = "claude-opus-4-8", ...) -> Iterator[dict]` | POST streaming a `/v1/messages`. Yield de eventos normalizados: `text`, `tool_use_start`, `tool_input_delta`, `done`, `error`. Parser SSE puro testeable por separado. | +| `run_claude_tool_loop_py_core` | `def run_claude_tool_loop(messages, tools, dispatch, ...) -> dict` | Bucle agentico tool-use. Llama `stream_anthropic_messages` en loop, despacha tools via `dispatch{name: callable}`, anade `tool_result`, repite hasta `end_turn` o `max_iters`. | + +## Ejemplo canonico end-to-end + +### Pregunta simple (sin tools) + +```python +import sys +sys.path.insert(0, "python/functions/core") +from stream_anthropic_messages import stream_anthropic_messages + +text = "" +for event in stream_anthropic_messages( + messages=[{"role": "user", "content": "di solo PONG"}], + model="claude-haiku-4-5-20251001", + max_tokens=32, +): + if event["type"] == "text": + text += event["text"] + print(event["text"], end="", flush=True) + elif event["type"] == "done": + print(f"\n[stop={event['stop_reason']}]") +# Output: PONG +# [stop=end_turn] +``` + +### Bucle agentico con tool propia + +```python +import sys +sys.path.insert(0, "python/functions/core") +from run_claude_tool_loop import run_claude_tool_loop +from datetime import datetime + +tools = [ + { + "name": "get_time", + "description": "Devuelve la hora actual en formato HH:MM:SS.", + "input_schema": {"type": "object", "properties": {}, "required": []}, + } +] + +dispatch = { + "get_time": lambda _inp: datetime.now().strftime("%H:%M:%S"), +} + +result = run_claude_tool_loop( + messages=[{"role": "user", "content": "que hora es exactamente ahora?"}], + tools=tools, + dispatch=dispatch, + model="claude-haiku-4-5-20251001", + on_text=lambda d: print(d, end="", flush=True), +) +print(f"\n[iters={result['iterations']} stop={result['stop_reason']}]") +# Claude llama a get_time() -> "14:32:07" +# Luego responde: "Ahora son las 14:32:07." +``` + +### Solo leer el token (para uso manual) + +```python +import sys +sys.path.insert(0, "python/functions/core") +from load_claude_oauth_token import load_claude_oauth_token + +token = load_claude_oauth_token(refresh_if_expired=False) +# Pasar como header: {"authorization": f"Bearer {token}"} +``` + +## Fronteras + +- **NO cubre** el flujo de refresh OAuth (endpoint no documentado publicamente) — el refresh es best-effort y puede fallar silenciosamente. +- **NO es un cliente completo** de la API de Anthropic: solo `/v1/messages` con streaming. Files, embeddings, etc. quedan fuera. +- **NO reemplaza** el uso de API keys oficiales para produccion — este grupo es exclusivamente para uso local del token OAuth de Claude Max. +- **NO gestiona rate limits** — el caller debe manejar errores `{"type": "error"}` con `429` en el mensaje. + +## Prerequisitos + +- Claude Code instalado y usuario logueado (`~/.claude/.credentials.json` debe existir). +- `httpx` disponible en el venv: `python/.venv/bin/python3 -c "import httpx"`. +- Token fresco (Claude Code normalmente lo renueva en background mientras esta abierto). diff --git a/docs/templates/todo.md b/docs/templates/todo.md new file mode 100644 index 00000000..66f08234 --- /dev/null +++ b/docs/templates/todo.md @@ -0,0 +1,44 @@ +--- +title: +artefacto: · +created: DD/MM/AAAA HH:mm +updated: DD/MM/AAAA HH:mm +status: in_progress +related_issues: [] +related_flows: [] +--- + +## Objetivo + + + +## Pendiente + +- [ ] 1. +- [ ] 2. +- [ ] 3. + +## En curso + +- [~] + +## Hecho + +- [x] + - resultado: + - enlace: + +## Enlaces + +- + +## Issues / flows relacionados + +- issue +- flow + +## Notas + +- +- +- diff --git a/projects/fn_monitoring/project.md b/projects/fn_monitoring/project.md index b3f1d29d..d84be5d6 100644 --- a/projects/fn_monitoring/project.md +++ b/projects/fn_monitoring/project.md @@ -2,7 +2,7 @@ name: fn_monitoring description: "Monitoreo y visualizacion del estado del fn_registry. API HTTP read-only sobre las bases de datos SQLite y dashboard ImGui que consume la API." tags: [monitoring, api, dashboard, sqlite, visualization] -repo_url: "" +repo_url: "https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/fn_monitoring" --- ## Apps diff --git a/python/functions/core/ask_llm.md b/python/functions/core/ask_llm.md new file mode 100644 index 00000000..f188ae97 --- /dev/null +++ b/python/functions/core/ask_llm.md @@ -0,0 +1,69 @@ +--- +name: ask_llm +kind: function +lang: py +domain: core +version: "1.0.0" +purity: impure +signature: "def ask_llm(prompt: str, model: str = 'claude-haiku-4-5-20251001', system: str = '', max_tokens: int = 4096, echo: bool = True) -> str" +description: "Atajo de una linea para preguntar al modelo via la API directa de Anthropic con el token OAuth de Claude Max. Arranque 0 (sin proceso claude, sin daemon). Stream a stdout y devuelve el texto. Lanzable como CLI o importable." +error_type: error_go_core +tags: ["claude-direct", "llm", "anthropic", "cli", "oauth", "chat"] +uses_functions: + - stream_anthropic_messages_py_core +uses_types: [] +params: + - name: prompt + desc: "El mensaje del usuario (string). Si vacio y stdin es un pipe, se lee de stdin." + - name: model + desc: "Id del modelo Anthropic. Default claude-haiku-4-5-20251001 (mas cuota, rapido). Otros: claude-opus-4-8, claude-sonnet-4-6." + - name: system + desc: "System prompt opcional (string vacio = ninguno)." + - name: max_tokens + desc: "Maximo de tokens de salida. Default 4096." + - name: echo + desc: "Si True, escribe la respuesta a stdout segun llega (streaming). Si False, solo la devuelve." +output: "El texto completo de la respuesta del modelo (string). Cadena vacia si hubo error (mensaje a stderr)." +file_path: python/functions/core/ask_llm.py +--- + +# ask_llm + +Atajo CLI para ejecutar el modelo **rapido** desde la terminal o un script, usando la API directa +de Anthropic con el token OAuth de Claude Max. No lanza el proceso `claude` ni mantiene ningun +daemon — el arranque es 0. Es el wrapper conveniente del grupo `claude-direct`. + +## Ejemplo + +```bash +# Desde la terminal (fn run) +fn run ask_llm "que es un pseudo-terminal en una frase" + +# Directo con el venv +python/.venv/bin/python3 python/functions/core/ask_llm.py "explica los punteros en Go" --model claude-opus-4-8 + +# Por pipe (lee el prompt de stdin) +echo "resume esto en 2 lineas: ..." | python/.venv/bin/python3 python/functions/core/ask_llm.py + +# Importable en un script +import sys, os +sys.path.insert(0, "python/functions") +from core.ask_llm import ask_llm +texto = ask_llm("dame 3 ideas", model="claude-haiku-4-5-20251001", echo=False) +``` + +## Cuando usarla + +- Cuando quieras preguntar al modelo **rapido** desde la terminal o un script, sin lanzar claude + ni montar un servidor. Respuesta en streaming, arranque 0. +- Para chat one-shot. Si necesitas **tools / loop agentico**, usa `run_claude_tool_loop_py_core`. + Si necesitas los **eventos crudos** (tool_use, deltas), usa `stream_anthropic_messages_py_core`. + +## Gotchas + +- **Rate limits**: el plan limita la frecuencia. En rafagas se reciben `HTTP 429`; espacia las + llamadas o usa `claude-haiku-4-5-20251001` (mas cuota) para tareas frecuentes. +- **Nombre del modelo**: `claude-opus-4-8` es valido; los ids con sufijo de fecha que no existen + dan `404 not_found_error`. Default haiku por cuota y velocidad. +- **No es Claude Code**: es el modelo crudo. Sin tools, MCP, contexto del repo ni plan mode, salvo + los que tu pases (via `run_claude_tool_loop`). diff --git a/python/functions/core/ask_llm.py b/python/functions/core/ask_llm.py new file mode 100644 index 00000000..ffcee5d9 --- /dev/null +++ b/python/functions/core/ask_llm.py @@ -0,0 +1,92 @@ +"""One-line CLI to ask the model fast via the direct Anthropic API (grupo claude-direct). + +No claude process, no daemon — arranque 0. Reuses stream_anthropic_messages, which +authenticates with the Claude Max OAuth token from ~/.claude/.credentials.json. + +Usage: + python ask_llm.py "que es Go en una frase" + python ask_llm.py --model claude-opus-4-8 --system "responde conciso" "explica los punteros" + echo "resume este texto: ..." | python ask_llm.py + fn run ask_llm "tu pregunta" + +For tools / agentic loops, use run_claude_tool_loop_py_core directly in a script. +""" +import os +import sys + +sys.path.insert(0, os.path.dirname(__file__)) + +from stream_anthropic_messages import stream_anthropic_messages # noqa: E402 + +DEFAULT_MODEL = "claude-haiku-4-5-20251001" + + +def ask_llm( + prompt: str, + model: str = DEFAULT_MODEL, + system: str = "", + max_tokens: int = 4096, + echo: bool = True, +) -> str: + """Ask the model and return its full text answer. + + Args: + prompt: The user message. + model: Anthropic model id (e.g. claude-haiku-4-5-20251001, claude-opus-4-8). + system: Optional system prompt. + max_tokens: Max output tokens. + echo: If True, stream the answer to stdout as it arrives. + + Returns: + The complete answer text. Empty string on error (message goes to stderr). + """ + parts = [] + for ev in stream_anthropic_messages( + messages=[{"role": "user", "content": prompt}], + model=model, + system=system, + max_tokens=max_tokens, + ): + t = ev.get("type") + if t == "text": + parts.append(ev["text"]) + if echo: + sys.stdout.write(ev["text"]) + sys.stdout.flush() + elif t == "error": + sys.stderr.write("ask_llm error: " + str(ev.get("message", "")) + "\n") + return "" + if echo: + sys.stdout.write("\n") + sys.stdout.flush() + return "".join(parts) + + +def _main(argv): + model = DEFAULT_MODEL + system = "" + prompt_parts = [] + i = 0 + while i < len(argv): + a = argv[i] + if a in ("--model", "-m") and i + 1 < len(argv): + model = argv[i + 1] + i += 2 + elif a in ("--system", "-s") and i + 1 < len(argv): + system = argv[i + 1] + i += 2 + else: + prompt_parts.append(a) + i += 1 + prompt = " ".join(prompt_parts).strip() + if not prompt and not sys.stdin.isatty(): + prompt = sys.stdin.read().strip() + if not prompt: + sys.stderr.write('uso: ask_llm "prompt" [--model M] [--system S]\n') + return 2 + ask_llm(prompt, model=model, system=system) + return 0 + + +if __name__ == "__main__": + sys.exit(_main(sys.argv[1:])) diff --git a/python/functions/core/load_claude_oauth_token.md b/python/functions/core/load_claude_oauth_token.md new file mode 100644 index 00000000..3592dbaf --- /dev/null +++ b/python/functions/core/load_claude_oauth_token.md @@ -0,0 +1,70 @@ +--- +name: load_claude_oauth_token +kind: function +lang: py +domain: core +version: "1.0.0" +purity: impure +signature: "def load_claude_oauth_token(credentials_path: str = \"\", refresh_if_expired: bool = True) -> str" +description: "Lee el access token OAuth de Claude Code desde ~/.claude/.credentials.json. Extrae claudeAiOauth.accessToken y verifica expiry (expiresAt en milisegundos). Si el token esta expirado y refresh_if_expired=True intenta un refresh best-effort via POST al endpoint OAuth de Anthropic; si el refresh falla devuelve el token actual con warning a stderr." +tags: [claude-direct, anthropic, oauth, credentials, token, auth] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [json, os, sys, time, pathlib.Path, httpx] +params: + - name: credentials_path + desc: "ruta absoluta o con ~ al archivo de credenciales. Default: ~/.claude/.credentials.json" + - name: refresh_if_expired + desc: "si True, intenta refrescar el token cuando expiresAt <= now. Default: True" +output: "el access token como string (sk-ant-oat-...). Nunca se imprime a stdout" +tested: true +tests: + - "extrae accessToken de un credentials fixture" + - "token no expirado no intenta refresh" + - "token expirado con refresh_if_expired=False devuelve token igual" + - "token expirado con refresh que falla devuelve token y escribe warning a stderr" +test_file_path: "python/functions/core/load_claude_oauth_token_test.py" +file_path: "python/functions/core/load_claude_oauth_token.py" +--- + +## Ejemplo + +```python +import sys +sys.path.insert(0, "python/functions/core") +from load_claude_oauth_token import load_claude_oauth_token + +# Lee desde ~/.claude/.credentials.json por defecto +token = load_claude_oauth_token() +print(token[:20] + "...") # sk-ant-oat01-... + +# Sin intento de refresh (evita llamada de red) +token = load_claude_oauth_token(refresh_if_expired=False) + +# Ruta custom (ej. para testing con fixture) +token = load_claude_oauth_token(credentials_path="/tmp/fake_creds.json") +``` + +## Cuando usarla + +Cuando vayas a hacer requests directos a `https://api.anthropic.com/v1/messages` +usando la cuenta Claude Max del usuario local (sin API key de pago separada). +Llama esta funcion primero; su resultado va en el header +`Authorization: Bearer ` de `stream_anthropic_messages`. + +## Gotchas + +- **Refresh best-effort**: el endpoint de refresh OAuth de Anthropic no esta + documentado publicamente. El intento puede fallar silenciosamente — en ese + caso se devuelve el token actual con un warning a stderr. La CLI `claude` + mantiene el token fresco en background; si tienes Claude Code abierto, + normalmente el token ya estara vigente. +- **expiresAt en milisegundos**: la estructura JSON usa ms-epoch, no s-epoch. + `expiresAt / 1000 <= time.time()` es la comparacion correcta. +- **Nunca imprimir el token a stdout**: el token es una credencial OAuth. + Esta funcion solo lo retorna; el caller decide como usarlo. +- El archivo `.credentials.json` solo existe si Claude Code esta instalado + y el usuario ha hecho login. Si no existe lanza `FileNotFoundError`. diff --git a/python/functions/core/load_claude_oauth_token.py b/python/functions/core/load_claude_oauth_token.py new file mode 100644 index 00000000..879674a3 --- /dev/null +++ b/python/functions/core/load_claude_oauth_token.py @@ -0,0 +1,99 @@ +"""Load the Claude Code OAuth access token from ~/.claude/.credentials.json.""" + +import json +import os +import sys +import time +from pathlib import Path + + +_DEFAULT_CREDENTIALS_PATH = "~/.claude/.credentials.json" + + +def load_claude_oauth_token( + credentials_path: str = "", + refresh_if_expired: bool = True, +) -> str: + """Return the Claude Code OAuth access token. + + Reads ``~/.claude/.credentials.json`` (or the path you supply), + extracts ``claudeAiOauth.accessToken`` and checks expiry. + + If ``refresh_if_expired`` is True and the token appears expired a + best-effort refresh via the Anthropic OAuth token endpoint is attempted + using the stored ``refreshToken``. The refresh may silently fail — in + that case a warning is written to *stderr* and the (possibly expired) + access token is returned anyway. The ``claude`` CLI normally refreshes + the token in the background; if you keep Claude Code open the token + will usually already be fresh. + + Args: + credentials_path: Absolute or ``~``-relative path to the credentials + file. Defaults to ``~/.claude/.credentials.json``. + refresh_if_expired: Attempt a token refresh when the stored + ``expiresAt`` (milliseconds epoch) is in the past. Set to + False to skip the network call entirely. + + Returns: + The raw access token string (``sk-ant-oat-...``). Never printed to + stdout. + + Raises: + FileNotFoundError: If the credentials file does not exist. + KeyError: If the expected keys are missing from the JSON. + json.JSONDecodeError: If the file is not valid JSON. + """ + path = Path(credentials_path or _DEFAULT_CREDENTIALS_PATH).expanduser() + raw = path.read_text(encoding="utf-8") + data = json.loads(raw) + + oauth = data["claudeAiOauth"] + access_token: str = oauth["accessToken"] + expires_at_ms: int = oauth.get("expiresAt", 0) + refresh_token: str = oauth.get("refreshToken", "") + + now_ms = int(time.time() * 1000) + is_expired = expires_at_ms > 0 and expires_at_ms <= now_ms + + if is_expired and refresh_if_expired and refresh_token: + new_token = _try_refresh(refresh_token) + if new_token: + return new_token + print( + "warning: Claude OAuth token may be expired; refresh failed. " + "Returning current token anyway.", + file=sys.stderr, + ) + + return access_token + + +def _try_refresh(refresh_token: str) -> str: + """Attempt to refresh the OAuth token. Returns empty string on failure. + + The Anthropic OAuth refresh endpoint is not publicly documented. + This is a best-effort attempt using standard OAuth 2.0 conventions + (grant_type=refresh_token). It may not work if Anthropic changes + their token endpoint or requires additional client credentials. + """ + try: + import httpx # type: ignore + + resp = httpx.post( + "https://auth.anthropic.com/oauth/token", + data={ + "grant_type": "refresh_token", + "refresh_token": refresh_token, + }, + headers={"content-type": "application/x-www-form-urlencoded"}, + timeout=10.0, + ) + if resp.status_code == 200: + body = resp.json() + new_token: str = body.get("access_token", "") + if new_token: + return new_token + except Exception as exc: # noqa: BLE001 + print(f"warning: token refresh attempt failed: {exc}", file=sys.stderr) + + return "" diff --git a/python/functions/core/load_claude_oauth_token_test.py b/python/functions/core/load_claude_oauth_token_test.py new file mode 100644 index 00000000..5efff775 --- /dev/null +++ b/python/functions/core/load_claude_oauth_token_test.py @@ -0,0 +1,85 @@ +"""Tests para load_claude_oauth_token.""" + +import json +import sys +import os +import time + +sys.path.insert(0, os.path.dirname(__file__)) + +from load_claude_oauth_token import load_claude_oauth_token + + +def _write_creds(path, access_token, expires_at_ms, refresh_token="sk-ant-refresh-fake"): + data = { + "claudeAiOauth": { + "accessToken": access_token, + "refreshToken": refresh_token, + "expiresAt": expires_at_ms, + } + } + with open(path, "w") as f: + json.dump(data, f) + + +def test_extrae_accessToken_de_un_credentials_fixture(tmp_path): + creds = tmp_path / "credentials.json" + _write_creds(str(creds), "sk-ant-oat-TEST123", expires_at_ms=int(time.time() * 1000) + 3_600_000) + + token = load_claude_oauth_token(credentials_path=str(creds), refresh_if_expired=False) + + assert token == "sk-ant-oat-TEST123" + + +def test_token_no_expirado_no_intenta_refresh(tmp_path, monkeypatch): + creds = tmp_path / "credentials.json" + # expiresAt 1 hora en el futuro + future_ms = int(time.time() * 1000) + 3_600_000 + _write_creds(str(creds), "sk-ant-oat-FRESH", expires_at_ms=future_ms) + + refresh_called = {"n": 0} + + import load_claude_oauth_token as mod + original_try_refresh = mod._try_refresh + + def fake_refresh(rt): + refresh_called["n"] += 1 + return "" + + monkeypatch.setattr(mod, "_try_refresh", fake_refresh) + + token = load_claude_oauth_token(credentials_path=str(creds), refresh_if_expired=True) + + assert token == "sk-ant-oat-FRESH" + assert refresh_called["n"] == 0 + + +def test_token_expirado_con_refresh_if_expired_False_devuelve_token_igual(tmp_path): + creds = tmp_path / "credentials.json" + # expiresAt en el pasado + past_ms = int(time.time() * 1000) - 3_600_000 + _write_creds(str(creds), "sk-ant-oat-EXPIRED", expires_at_ms=past_ms) + + token = load_claude_oauth_token(credentials_path=str(creds), refresh_if_expired=False) + + assert token == "sk-ant-oat-EXPIRED" + + +def test_token_expirado_refresh_falla_devuelve_token_y_warning_a_stderr(tmp_path, monkeypatch, capsys): + creds = tmp_path / "credentials.json" + past_ms = int(time.time() * 1000) - 3_600_000 + _write_creds(str(creds), "sk-ant-oat-OLD", expires_at_ms=past_ms, refresh_token="sk-ant-refresh-old") + + import load_claude_oauth_token as mod + + def fake_refresh_fails(rt): + return "" # simula fallo silencioso + + monkeypatch.setattr(mod, "_try_refresh", fake_refresh_fails) + + token = load_claude_oauth_token(credentials_path=str(creds), refresh_if_expired=True) + + assert token == "sk-ant-oat-OLD" + captured = capsys.readouterr() + assert "warning" in captured.err.lower() + assert "expired" in captured.err.lower() or "refresh" in captured.err.lower() diff --git a/python/functions/core/run_claude_tool_loop.md b/python/functions/core/run_claude_tool_loop.md new file mode 100644 index 00000000..a1da1199 --- /dev/null +++ b/python/functions/core/run_claude_tool_loop.md @@ -0,0 +1,112 @@ +--- +name: run_claude_tool_loop +kind: function +lang: py +domain: core +version: "1.0.0" +purity: impure +signature: "def run_claude_tool_loop(messages: list, tools: list, dispatch: dict, model: str = \"claude-opus-4-8\", system: str = \"\", max_tokens: int = 4096, max_iters: int = 8, on_text=None) -> dict" +description: "Bucle agentico de tool-use sobre la API de Anthropic. Llama stream_anthropic_messages en loop: si el modelo responde con tool_use, despacha cada tool via dispatch{name: callable}, anade el tool_result, y continua. Termina cuando stop_reason != tool_use o se alcanza max_iters. Devuelve {messages, final_text, stop_reason, iterations}." +tags: [claude-direct, anthropic, tools, agent, llm, tool-loop, streaming] +uses_functions: [stream_anthropic_messages_py_core, load_claude_oauth_token_py_core] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [json, typing.Callable] +params: + - name: messages + desc: "conversacion en formato Anthropic [{role, content}]. Se modifica in-place anadiendo mensajes assistant y tool_result" + - name: tools + desc: "lista de tool definitions Anthropic [{name, description, input_schema}]" + - name: dispatch + desc: "mapa {tool_name: callable(input_dict) -> result}. El callable puede retornar cualquier valor serializable o lanzar excepcion" + - name: model + desc: "ID del modelo Anthropic. Default: claude-opus-4-8" + - name: system + desc: "system prompt opcional" + - name: max_tokens + desc: "maximo de tokens por llamada API. Default 4096" + - name: max_iters + desc: "maximo de iteraciones del loop (proteccion contra loops infinitos). Default 8" + - name: on_text + desc: "callback opcional llamado con cada text delta: on_text(delta: str). Util para streaming en tiempo real" +output: "dict con keys: messages (lista actualizada), final_text (texto del ultimo turno assistant), stop_reason (end_turn|tool_use|max_tokens|max_iters|error), iterations (int)" +tested: true +tests: + - "tool_use seguido de end_turn ejecuta dispatch y termina" + - "tool dispatch que falla devuelve is_error True en tool_result" + - "tool desconocido devuelve is_error True" + - "on_text callback recibe deltas de texto" + - "max_iters limita el numero de iteraciones" +test_file_path: "python/functions/core/run_claude_tool_loop_test.py" +file_path: "python/functions/core/run_claude_tool_loop.py" +--- + +## Ejemplo + +```python +import sys +sys.path.insert(0, "python/functions/core") +from run_claude_tool_loop import run_claude_tool_loop + +# Definir una tool propia +tools = [ + { + "name": "add", + "description": "Suma dos numeros enteros y devuelve el resultado.", + "input_schema": { + "type": "object", + "properties": { + "a": {"type": "number", "description": "primer sumando"}, + "b": {"type": "number", "description": "segundo sumando"}, + }, + "required": ["a", "b"], + }, + } +] + +# Implementacion de la tool (la provee el usuario) +dispatch = { + "add": lambda inp: inp["a"] + inp["b"], +} + +messages = [{"role": "user", "content": "cuanto es 17 + 25?"}] + +result = run_claude_tool_loop( + messages=messages, + tools=tools, + dispatch=dispatch, + model="claude-haiku-4-5-20251001", + on_text=lambda d: print(d, end="", flush=True), +) + +print(f"\n[stop={result['stop_reason']} iters={result['iterations']}]") +# Claude llama a add(17, 25) -> 42, luego responde "El resultado es 42." +``` + +## Cuando usarla + +Cuando quieras que Claude ejecute herramientas Python propias en un bucle +agentico: calculos, consultas a BD, llamadas a APIs externas, lectura de +archivos, etc. El loop gestiona automaticamente la negociacion +tool_use/tool_result con la API. Usa `on_text` para mostrar la respuesta +en tiempo real mientras el modelo piensa entre tool calls. + +## Gotchas + +- **messages se modifica in-place**: los mensajes assistant y tool_result se + anaden a la lista pasada. Pasa una copia si necesitas preservar el estado + original: `run_claude_tool_loop(list(messages), ...)`. +- **dispatch debe cubrir todos los tools declarados**: si el modelo invoca + una tool que no esta en dispatch, el tool_result contendra `is_error: True`. + Declara exactamente los tools que dispatch puede manejar. +- **Excepciones en dispatch son capturadas**: si tu callable lanza una + excepcion, se convierte en tool_result con `is_error: True` y el mensaje + del error. El loop continua — el modelo puede corregir y reintentar. +- **max_iters como seguro**: sin este limite, un modelo que siempre pide + tools podria iterar indefinidamente. Default 8 es conservador; subelo + para agentes complejos con muchos pasos. +- **final_text es solo el ultimo turno**: si el modelo genero texto en + iteraciones anteriores (raro pero posible), solo se devuelve el del + ultimo turno. El historial completo esta en `messages`. diff --git a/python/functions/core/run_claude_tool_loop.py b/python/functions/core/run_claude_tool_loop.py new file mode 100644 index 00000000..a44c0225 --- /dev/null +++ b/python/functions/core/run_claude_tool_loop.py @@ -0,0 +1,192 @@ +"""Agentic tool-use loop over the Anthropic Messages API.""" + +import json +import sys +import os +from typing import Callable + +sys.path.insert(0, os.path.dirname(__file__)) + +from stream_anthropic_messages import stream_anthropic_messages # noqa: E402 + + +def run_claude_tool_loop( + messages: list, + tools: list, + dispatch: dict, + model: str = "claude-opus-4-8", + system: str = "", + max_tokens: int = 4096, + max_iters: int = 8, + on_text: Callable[[str], None] = None, +) -> dict: + """Run an agentic tool-use loop against the Anthropic Messages API. + + Repeatedly calls ``stream_anthropic_messages`` until the model stops + requesting tools (``stop_reason != "tool_use"``) or ``max_iters`` is + reached. At each iteration: + + 1. Stream the assistant response, accumulating text and tool_use blocks. + 2. Append the assistant message (text + tool_use content blocks) to + ``messages``. + 3. If ``stop_reason == "tool_use"``: dispatch every requested tool via + the ``dispatch`` mapping, append a ``user`` message with all + ``tool_result`` blocks, and continue. + 4. Otherwise terminate and return the result dict. + + Args: + messages: Conversation so far in Anthropic format. Modified in-place + (tool_result messages are appended). Pass a copy if you want to + preserve the original. + tools: List of tool definitions in Anthropic format:: + + [{"name": "add", "description": "...", + "input_schema": {"type": "object", + "properties": {"a": {"type": "number"}, + "b": {"type": "number"}}, + "required": ["a", "b"]}}] + + dispatch: Mapping from tool name to callable ``(input_dict) -> result``. + The callable may return any JSON-serialisable value or raise an + exception (which is caught and returned as an error tool_result). + model: Anthropic model ID. Default ``claude-opus-4-8``. + system: Optional system prompt. + max_tokens: Max tokens per API call. Default 4096. + max_iters: Hard cap on tool-use iterations. Default 8. + on_text: Optional callback called with each text delta as it streams. + Useful for real-time display. E.g. ``on_text=lambda d: print(d, end="", flush=True)``. + + Returns: + Dict with keys: + - ``messages``: updated conversation (same list, modified in-place). + - ``final_text``: concatenated text from the last assistant turn. + - ``stop_reason``: ``"end_turn"``, ``"tool_use"``, ``"max_tokens"``, + ``"max_iters"``, or ``"error"``. + - ``iterations``: number of loop iterations executed. + """ + iterations = 0 + final_text = "" + stop_reason = "max_iters" + + for _ in range(max_iters): + iterations += 1 + text_parts: list[str] = [] + # tool_uses: list of {id, name, input_json_parts, index} + tool_uses: list[dict] = [] + # index -> tool_use dict (for partial_json accumulation) + index_map: dict[int, dict] = {} + current_stop_reason = "end_turn" + + for event in stream_anthropic_messages( + messages=messages, + model=model, + system=system, + tools=tools, + max_tokens=max_tokens, + ): + etype = event.get("type", "") + + if etype == "text": + delta = event["text"] + text_parts.append(delta) + if on_text is not None: + on_text(delta) + + elif etype == "tool_use_start": + entry = { + "id": event["id"], + "name": event["name"], + "index": event["index"], + "partial_json_parts": [], + } + tool_uses.append(entry) + index_map[event["index"]] = entry + + elif etype == "tool_input_delta": + idx = event["index"] + if idx in index_map: + index_map[idx]["partial_json_parts"].append(event["partial_json"]) + + elif etype == "done": + current_stop_reason = event.get("stop_reason", "end_turn") + + elif etype == "error": + final_text = "".join(text_parts) + return { + "messages": messages, + "final_text": final_text, + "stop_reason": "error", + "iterations": iterations, + "error": event.get("message", "unknown error"), + } + + final_text = "".join(text_parts) + stop_reason = current_stop_reason + + # Build the assistant content blocks + assistant_content: list[dict] = [] + if final_text: + assistant_content.append({"type": "text", "text": final_text}) + for tu in tool_uses: + raw_input = "".join(tu["partial_json_parts"]) + try: + parsed_input = json.loads(raw_input) if raw_input else {} + except json.JSONDecodeError: + parsed_input = {"_raw": raw_input} + assistant_content.append({ + "type": "tool_use", + "id": tu["id"], + "name": tu["name"], + "input": parsed_input, + }) + + messages.append({"role": "assistant", "content": assistant_content}) + + if stop_reason != "tool_use" or not tool_uses: + break + + # Dispatch tools and build tool_result message + tool_results: list[dict] = [] + for tu in tool_uses: + tool_name = tu["name"] + raw_input = "".join(tu["partial_json_parts"]) + try: + parsed_input = json.loads(raw_input) if raw_input else {} + except json.JSONDecodeError: + parsed_input = {"_raw": raw_input} + + if tool_name not in dispatch: + result_content = f"Error: tool '{tool_name}' not found in dispatch" + is_error = True + else: + try: + result_value = dispatch[tool_name](parsed_input) + result_content = ( + result_value + if isinstance(result_value, str) + else json.dumps(result_value) + ) + is_error = False + except Exception as exc: # noqa: BLE001 + result_content = f"Error executing {tool_name}: {exc}" + is_error = True + + tool_result: dict = { + "type": "tool_result", + "tool_use_id": tu["id"], + "content": result_content, + } + if is_error: + tool_result["is_error"] = True + tool_results.append(tool_result) + + messages.append({"role": "user", "content": tool_results}) + else: + stop_reason = "max_iters" + + return { + "messages": messages, + "final_text": final_text, + "stop_reason": stop_reason, + "iterations": iterations, + } diff --git a/python/functions/core/run_claude_tool_loop_test.py b/python/functions/core/run_claude_tool_loop_test.py new file mode 100644 index 00000000..4c9ae56f --- /dev/null +++ b/python/functions/core/run_claude_tool_loop_test.py @@ -0,0 +1,188 @@ +"""Tests para run_claude_tool_loop — usa fake de stream_anthropic_messages, sin HTTP.""" + +import sys +import os +import json + +sys.path.insert(0, os.path.dirname(__file__)) + + +# --------------------------------------------------------------------------- +# Fake de stream_anthropic_messages inyectado via monkeypatch +# --------------------------------------------------------------------------- + +def _make_text_response(text: str, stop_reason: str = "end_turn"): + """Genera la secuencia de eventos de una respuesta de texto pura.""" + for ch in text: + yield {"type": "text", "text": ch} + yield {"type": "done", "stop_reason": stop_reason} + + +def _make_tool_use_response(tool_id: str, tool_name: str, tool_input: dict): + """Genera la secuencia de eventos de un tool_use.""" + yield {"type": "tool_use_start", "id": tool_id, "name": tool_name, "index": 0} + raw = json.dumps(tool_input) + # Simula streaming del JSON en dos partes + mid = len(raw) // 2 + yield {"type": "tool_input_delta", "index": 0, "partial_json": raw[:mid]} + yield {"type": "tool_input_delta", "index": 0, "partial_json": raw[mid:]} + yield {"type": "done", "stop_reason": "tool_use"} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +def test_tool_use_seguido_de_end_turn_ejecuta_dispatch_y_termina(monkeypatch): + import run_claude_tool_loop as mod + + call_count = {"n": 0} + + def fake_stream(messages, model, system, tools, max_tokens): + call_count["n"] += 1 + if call_count["n"] == 1: + # Primera llamada: el modelo pide la tool "add" + yield from _make_tool_use_response("toolu_01", "add", {"a": 3, "b": 4}) + else: + # Segunda llamada: el modelo responde con el resultado + yield from _make_text_response("El resultado es 7.") + + monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream) + + dispatch_called = {"input": None} + def add_tool(inp): + dispatch_called["input"] = inp + return inp["a"] + inp["b"] + + messages = [{"role": "user", "content": "cuanto es 3 + 4?"}] + result = mod.run_claude_tool_loop( + messages=messages, + tools=[{"name": "add", "description": "suma", "input_schema": {}}], + dispatch={"add": add_tool}, + model="claude-haiku-4-5-20251001", + ) + + assert result["stop_reason"] == "end_turn" + assert result["iterations"] == 2 + assert result["final_text"] == "El resultado es 7." + assert dispatch_called["input"] == {"a": 3, "b": 4} + + # Verifica que se anado el tool_result al historial + roles = [m["role"] for m in result["messages"]] + assert roles.count("assistant") == 2 + assert roles.count("user") >= 2 + + # El mensaje user con tool_result debe tener is_error=False (ausente) + tool_result_msg = next( + m for m in result["messages"] + if m["role"] == "user" and isinstance(m["content"], list) + and m["content"] and m["content"][0].get("type") == "tool_result" + ) + assert tool_result_msg["content"][0].get("is_error") is None or \ + tool_result_msg["content"][0].get("is_error") is False + + +def test_tool_dispatch_que_falla_devuelve_is_error_True_en_tool_result(monkeypatch): + import run_claude_tool_loop as mod + + call_count = {"n": 0} + + def fake_stream(messages, model, system, tools, max_tokens): + call_count["n"] += 1 + if call_count["n"] == 1: + yield from _make_tool_use_response("toolu_02", "divide", {"a": 10, "b": 0}) + else: + yield from _make_text_response("No se puede dividir por cero.") + + monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream) + + def divide_tool(inp): + return inp["a"] / inp["b"] # ZeroDivisionError + + messages = [{"role": "user", "content": "divide 10 entre 0"}] + result = mod.run_claude_tool_loop( + messages=messages, + tools=[{"name": "divide", "description": "division", "input_schema": {}}], + dispatch={"divide": divide_tool}, + ) + + # Debe haber un tool_result con is_error=True + tool_result_msg = next( + m for m in result["messages"] + if m["role"] == "user" and isinstance(m["content"], list) + and m["content"] and m["content"][0].get("type") == "tool_result" + ) + assert tool_result_msg["content"][0].get("is_error") is True + assert "Error" in tool_result_msg["content"][0]["content"] + + +def test_tool_desconocido_devuelve_is_error_True(monkeypatch): + import run_claude_tool_loop as mod + + call_count = {"n": 0} + + def fake_stream(messages, model, system, tools, max_tokens): + call_count["n"] += 1 + if call_count["n"] == 1: + yield from _make_tool_use_response("toolu_03", "nonexistent_tool", {"x": 1}) + else: + yield from _make_text_response("No pude usar esa tool.") + + monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream) + + messages = [{"role": "user", "content": "usa nonexistent_tool"}] + result = mod.run_claude_tool_loop( + messages=messages, + tools=[], + dispatch={}, # dispatch vacio — tool no existe + ) + + tool_result_msg = next( + m for m in result["messages"] + if m["role"] == "user" and isinstance(m["content"], list) + and m["content"] and m["content"][0].get("type") == "tool_result" + ) + assert tool_result_msg["content"][0].get("is_error") is True + assert "not found" in tool_result_msg["content"][0]["content"] + + +def test_on_text_callback_recibe_deltas_de_texto(monkeypatch): + import run_claude_tool_loop as mod + + def fake_stream(messages, model, system, tools, max_tokens): + yield from _make_text_response("hola mundo") + + monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream) + + received = [] + messages = [{"role": "user", "content": "saluda"}] + result = mod.run_claude_tool_loop( + messages=messages, + tools=[], + dispatch={}, + on_text=lambda d: received.append(d), + ) + + assert "".join(received) == "hola mundo" + assert result["final_text"] == "hola mundo" + + +def test_max_iters_limita_el_numero_de_iteraciones(monkeypatch): + import run_claude_tool_loop as mod + + # Siempre responde con tool_use para forzar loop infinito + def fake_stream_infinite(messages, model, system, tools, max_tokens): + yield from _make_tool_use_response("toolu_inf", "noop", {}) + + monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream_infinite) + + messages = [{"role": "user", "content": "loop"}] + result = mod.run_claude_tool_loop( + messages=messages, + tools=[{"name": "noop", "description": "no-op", "input_schema": {}}], + dispatch={"noop": lambda inp: "ok"}, + max_iters=3, + ) + + assert result["iterations"] == 3 + assert result["stop_reason"] == "max_iters" diff --git a/python/functions/core/stream_anthropic_messages.md b/python/functions/core/stream_anthropic_messages.md new file mode 100644 index 00000000..8c811237 --- /dev/null +++ b/python/functions/core/stream_anthropic_messages.md @@ -0,0 +1,86 @@ +--- +name: stream_anthropic_messages +kind: function +lang: py +domain: core +version: "1.0.0" +purity: impure +signature: "def stream_anthropic_messages(messages: list, model: str = \"claude-opus-4-8\", system: str = \"\", tools: list = None, max_tokens: int = 4096, token: str = \"\") -> Iterator[dict]" +description: "Llama a https://api.anthropic.com/v1/messages con stream:true usando el token OAuth de Claude Code y hace yield de eventos normalizados: text, tool_use_start, tool_input_delta, done, error. Parsea el SSE en tiempo real. Si token vacio llama load_claude_oauth_token automaticamente." +tags: [claude-direct, anthropic, streaming, sse, llm, tools, http] +uses_functions: [load_claude_oauth_token_py_core] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [json, sys, typing.Iterator, httpx] +params: + - name: messages + desc: "lista de mensajes Anthropic [{role: user|assistant, content: ...}]" + - name: model + desc: "ID del modelo Anthropic. Default: claude-opus-4-8" + - name: system + desc: "system prompt opcional. Si vacio no se incluye en el body" + - name: tools + desc: "lista de tool definitions [{name, description, input_schema}]. None para no usar tools" + - name: max_tokens + desc: "maximo de tokens en la respuesta. Default 4096" + - name: token + desc: "access token OAuth. Si vacio se carga automaticamente desde ~/.claude/.credentials.json" +output: "Iterator[dict] — eventos normalizados: {type:text,text}, {type:tool_use_start,id,name,index}, {type:tool_input_delta,index,partial_json}, {type:done,stop_reason}, {type:error,message}" +tested: true +tests: + - "parse_sse_chunk text delta emite evento text" + - "parse_sse_chunk tool_use_start emite evento tool_use_start con id y name" + - "parse_sse_chunk tool_input_delta emite evento tool_input_delta con partial_json" + - "parse_sse_chunk message_delta con stop_reason emite done" + - "parse_sse_chunk secuencia completa text PONG reconstruye el texto" + - "parse_sse_chunk secuencia completa tool_use reconstruye el input JSON" +test_file_path: "python/functions/core/stream_anthropic_messages_test.py" +file_path: "python/functions/core/stream_anthropic_messages.py" +--- + +## Ejemplo + +```python +import sys +sys.path.insert(0, "python/functions/core") +from stream_anthropic_messages import stream_anthropic_messages + +text = "" +for event in stream_anthropic_messages( + messages=[{"role": "user", "content": "di solo PONG"}], + model="claude-haiku-4-5-20251001", + max_tokens=32, +): + if event["type"] == "text": + text += event["text"] + print(event["text"], end="", flush=True) + elif event["type"] == "done": + print(f"\n[stop_reason={event['stop_reason']}]") + elif event["type"] == "error": + print(f"ERROR: {event['message']}", file=sys.stderr) + +print("respuesta completa:", text) +``` + +## Cuando usarla + +Cuando necesites hablar directamente con la API de Anthropic desde Python +sin lanzar la CLI `claude`, especialmente cuando quieras streaming en tiempo +real del texto o necesites ejecutar tools propias via `run_claude_tool_loop`. +Tambien util para integrar Claude en scripts de analisis donde el bucle +de tools se gestiona en Python. + +## Gotchas + +- **El parser SSE es puro y testeable**: `parse_sse_chunk` no hace HTTP. + Puedes testear el parseo completo sin mocks de red. +- **Eventos `raw` son pass-through**: `message_start`, `content_block_stop`, + `ping` y otros eventos no mapeados se emiten como `{type: raw, event: ..., data: ...}`. + El consumidor los puede ignorar o inspeccionar. +- **Reconstruccion de tool input**: los `partial_json` de un mismo `index` + deben concatenarse hasta el evento `done`. `run_claude_tool_loop` hace esto + automaticamente. +- **Timeout 120s**: configurable en el codigo si necesitas respuestas muy largas. +- La funcion `parse_sse_chunk` es pure (importable y testeable por separado). diff --git a/python/functions/core/stream_anthropic_messages.py b/python/functions/core/stream_anthropic_messages.py new file mode 100644 index 00000000..053ae61b --- /dev/null +++ b/python/functions/core/stream_anthropic_messages.py @@ -0,0 +1,195 @@ +"""Stream responses from the Anthropic Messages API using a Claude Code OAuth token.""" + +import json +import sys +from typing import Iterator + + +_API_URL = "https://api.anthropic.com/v1/messages" +_ANTHROPIC_VERSION = "2023-06-01" + + +# --------------------------------------------------------------------------- +# Pure SSE parser — fully testable without HTTP +# --------------------------------------------------------------------------- + +def parse_sse_chunk(chunk: str) -> Iterator[dict]: + """Parse one SSE chunk (possibly containing multiple event/data pairs). + + Yields normalised event dicts: + - ``{"type": "text", "text": ""}`` + - ``{"type": "tool_use_start", "id": "...", "name": "...", "index": N}`` + - ``{"type": "tool_input_delta","index": N, "partial_json": ""}`` + - ``{"type": "done", "stop_reason": "<...>"}`` + - ``{"type": "raw", "event": "...", "data": {...}}`` (pass-through) + + Args: + chunk: Raw SSE text with ``event:`` / ``data:`` lines, separated by + blank lines between events. + """ + current_event = "" + current_data = "" + + for line in chunk.splitlines(): + if line.startswith("event:"): + current_event = line[len("event:"):].strip() + elif line.startswith("data:"): + current_data = line[len("data:"):].strip() + elif line == "": + if current_data and current_data != "[DONE]": + try: + data = json.loads(current_data) + except json.JSONDecodeError: + current_event = "" + current_data = "" + continue + + yield from _normalise_event(current_event, data) + + current_event = "" + current_data = "" + + # flush final event without trailing blank line + if current_data and current_data != "[DONE]": + try: + data = json.loads(current_data) + yield from _normalise_event(current_event, data) + except json.JSONDecodeError: + pass + + +def _normalise_event(event_type: str, data: dict) -> Iterator[dict]: + """Convert a raw SSE event dict into one or more normalised dicts.""" + if event_type == "content_block_start": + block = data.get("content_block", {}) + if block.get("type") == "tool_use": + yield { + "type": "tool_use_start", + "id": block.get("id", ""), + "name": block.get("name", ""), + "index": data.get("index", 0), + } + + elif event_type == "content_block_delta": + delta = data.get("delta", {}) + delta_type = delta.get("type", "") + if delta_type == "text_delta": + yield {"type": "text", "text": delta.get("text", "")} + elif delta_type == "input_json_delta": + yield { + "type": "tool_input_delta", + "index": data.get("index", 0), + "partial_json": delta.get("partial_json", ""), + } + + elif event_type == "message_delta": + stop_reason = data.get("delta", {}).get("stop_reason", "") + if stop_reason: + yield {"type": "done", "stop_reason": stop_reason} + + elif event_type == "message_stop": + # emitted after message_delta — only yield done if not already seen + pass + + else: + # pass-through for message_start, content_block_stop, ping, etc. + yield {"type": "raw", "event": event_type, "data": data} + + +# --------------------------------------------------------------------------- +# Streaming HTTP function +# --------------------------------------------------------------------------- + +def stream_anthropic_messages( + messages: list, + model: str = "claude-opus-4-8", + system: str = "", + tools: list = None, + max_tokens: int = 4096, + token: str = "", +) -> Iterator[dict]: + """Stream an Anthropic Messages API call using a Claude Code OAuth token. + + Posts to ``https://api.anthropic.com/v1/messages`` with ``stream: true`` + and yields normalised event dicts as they arrive: + + - ``{"type": "text", "text": ""}`` + - ``{"type": "tool_use_start", "id": "...", "name": "...", "index": N}`` + - ``{"type": "tool_input_delta","index": N, "partial_json": ""}`` + - ``{"type": "done", "stop_reason": "end_turn|tool_use|..."}`` + - ``{"type": "error", "message": "..."}`` on HTTP error / exception + + Args: + messages: List of message dicts in Anthropic format + (``[{"role": "user", "content": "..."}]``). + model: Anthropic model ID. Default ``claude-opus-4-8``. + system: Optional system prompt string. + tools: Optional list of tool definition dicts. + max_tokens: Maximum tokens in the response. Default 4096. + token: OAuth access token. If empty, ``load_claude_oauth_token()`` + is called automatically. + + Yields: + Normalised event dicts (see above). + """ + if not token: + try: + try: + from core.load_claude_oauth_token import load_claude_oauth_token + except ImportError: + from load_claude_oauth_token import load_claude_oauth_token + token = load_claude_oauth_token() + except Exception as exc: + yield {"type": "error", "message": f"failed to load token: {exc}"} + return + + body: dict = { + "model": model, + "max_tokens": max_tokens, + "messages": messages, + "stream": True, + } + if system: + body["system"] = system + if tools: + body["tools"] = tools + + headers = { + "authorization": f"Bearer {token}", + "anthropic-version": _ANTHROPIC_VERSION, + "content-type": "application/json", + } + + try: + import httpx + + with httpx.stream( + "POST", + _API_URL, + json=body, + headers=headers, + timeout=120.0, + ) as resp: + if resp.status_code != 200: + error_body = resp.read().decode("utf-8", errors="replace") + yield { + "type": "error", + "message": f"HTTP {resp.status_code}: {error_body[:500]}", + } + return + + buffer = "" + for chunk in resp.iter_text(): + buffer += chunk + # Process complete SSE blocks (separated by double newlines) + while "\n\n" in buffer: + block, buffer = buffer.split("\n\n", 1) + block += "\n\n" + yield from parse_sse_chunk(block) + + # Flush any remaining content + if buffer.strip(): + yield from parse_sse_chunk(buffer + "\n\n") + + except Exception as exc: + yield {"type": "error", "message": str(exc)} diff --git a/python/functions/core/stream_anthropic_messages_test.py b/python/functions/core/stream_anthropic_messages_test.py new file mode 100644 index 00000000..aaadd704 --- /dev/null +++ b/python/functions/core/stream_anthropic_messages_test.py @@ -0,0 +1,163 @@ +"""Tests para stream_anthropic_messages — solo el parser SSE puro, sin HTTP.""" + +import sys +import os + +sys.path.insert(0, os.path.dirname(__file__)) + +from stream_anthropic_messages import parse_sse_chunk + + +# --------------------------------------------------------------------------- +# Helpers: construir chunks SSE sinteticos (igual a lo que envia Anthropic) +# --------------------------------------------------------------------------- + +def sse(event: str, data: dict) -> str: + import json + return f"event: {event}\ndata: {json.dumps(data)}\n\n" + + +# --------------------------------------------------------------------------- +# Tests de eventos individuales +# --------------------------------------------------------------------------- + +def test_parse_sse_chunk_text_delta_emite_evento_text(): + chunk = sse("content_block_delta", { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "text_delta", "text": "P"}, + }) + events = list(parse_sse_chunk(chunk)) + text_events = [e for e in events if e["type"] == "text"] + assert len(text_events) == 1 + assert text_events[0]["text"] == "P" + + +def test_parse_sse_chunk_tool_use_start_emite_evento_tool_use_start(): + chunk = sse("content_block_start", { + "type": "content_block_start", + "index": 1, + "content_block": {"type": "tool_use", "id": "toolu_01", "name": "add"}, + }) + events = list(parse_sse_chunk(chunk)) + tu_events = [e for e in events if e["type"] == "tool_use_start"] + assert len(tu_events) == 1 + assert tu_events[0]["id"] == "toolu_01" + assert tu_events[0]["name"] == "add" + assert tu_events[0]["index"] == 1 + + +def test_parse_sse_chunk_tool_input_delta_emite_evento_tool_input_delta(): + chunk = sse("content_block_delta", { + "type": "content_block_delta", + "index": 1, + "delta": {"type": "input_json_delta", "partial_json": '{"a":'}, + }) + events = list(parse_sse_chunk(chunk)) + ti_events = [e for e in events if e["type"] == "tool_input_delta"] + assert len(ti_events) == 1 + assert ti_events[0]["index"] == 1 + assert ti_events[0]["partial_json"] == '{"a":' + + +def test_parse_sse_chunk_message_delta_con_stop_reason_emite_done(): + chunk = sse("message_delta", { + "type": "message_delta", + "delta": {"stop_reason": "end_turn", "stop_sequence": None}, + "usage": {"output_tokens": 5}, + }) + events = list(parse_sse_chunk(chunk)) + done_events = [e for e in events if e["type"] == "done"] + assert len(done_events) == 1 + assert done_events[0]["stop_reason"] == "end_turn" + + +def test_parse_sse_chunk_secuencia_completa_text_PONG_reconstruye_el_texto(): + """Simula la secuencia completa de eventos SSE para una respuesta de texto.""" + import json + + # Secuencia tipica: message_start, content_block_start text, deltas, stop + chunks = "".join([ + sse("message_start", { + "type": "message_start", + "message": {"id": "msg_01", "type": "message", "role": "assistant", + "content": [], "model": "claude-haiku-4-5-20251001", + "stop_reason": None, "usage": {"input_tokens": 5}}, + }), + sse("content_block_start", { + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text", "text": ""}, + }), + sse("content_block_delta", { + "type": "content_block_delta", "index": 0, + "delta": {"type": "text_delta", "text": "P"}, + }), + sse("content_block_delta", { + "type": "content_block_delta", "index": 0, + "delta": {"type": "text_delta", "text": "ON"}, + }), + sse("content_block_delta", { + "type": "content_block_delta", "index": 0, + "delta": {"type": "text_delta", "text": "G"}, + }), + sse("content_block_stop", {"type": "content_block_stop", "index": 0}), + sse("message_delta", { + "type": "message_delta", + "delta": {"stop_reason": "end_turn"}, + "usage": {"output_tokens": 4}, + }), + sse("message_stop", {"type": "message_stop"}), + ]) + + events = list(parse_sse_chunk(chunks)) + text = "".join(e["text"] for e in events if e["type"] == "text") + done = [e for e in events if e["type"] == "done"] + + assert text == "PONG" + assert len(done) == 1 + assert done[0]["stop_reason"] == "end_turn" + + +def test_parse_sse_chunk_secuencia_completa_tool_use_reconstruye_el_input_JSON(): + """Simula la secuencia SSE para un tool_use con input_json_delta streaming.""" + chunks = "".join([ + sse("content_block_start", { + "type": "content_block_start", + "index": 0, + "content_block": {"type": "tool_use", "id": "toolu_42", "name": "add"}, + }), + sse("content_block_delta", { + "type": "content_block_delta", "index": 0, + "delta": {"type": "input_json_delta", "partial_json": '{"a":'}, + }), + sse("content_block_delta", { + "type": "content_block_delta", "index": 0, + "delta": {"type": "input_json_delta", "partial_json": ' 3, "b": 4}'}, + }), + sse("content_block_stop", {"type": "content_block_stop", "index": 0}), + sse("message_delta", { + "type": "message_delta", + "delta": {"stop_reason": "tool_use"}, + "usage": {"output_tokens": 10}, + }), + ]) + + events = list(parse_sse_chunk(chunks)) + + tu_start = [e for e in events if e["type"] == "tool_use_start"] + ti_deltas = [e for e in events if e["type"] == "tool_input_delta"] + done = [e for e in events if e["type"] == "done"] + + assert len(tu_start) == 1 + assert tu_start[0]["name"] == "add" + assert tu_start[0]["id"] == "toolu_42" + + # Reconstruir el input JSON concatenando los partial_json + import json + raw_input = "".join(d["partial_json"] for d in ti_deltas) + parsed = json.loads(raw_input) + assert parsed == {"a": 3, "b": 4} + + assert len(done) == 1 + assert done[0]["stop_reason"] == "tool_use"