feat(core): auto-commit con 17 cambios

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-05 17:34:22 +02:00
parent 1f93e9d502
commit 736e019e19
17 changed files with 1539 additions and 1 deletions
+1
View File
@@ -39,3 +39,4 @@ Reglas operativas del proyecto. Cada archivo es una regla independiente.
| 32 | [../../dev/TAXONOMY.md](../../dev/TAXONOMY.md) | Allowlist canonica para dominios/tipos/scopes/estados/prioridades + flow patterns. Aplica a `dev/issues/` y `dev/flows/`. Issues 0100 + 0103 | | 32 | [../../dev/TAXONOMY.md](../../dev/TAXONOMY.md) | Allowlist canonica para dominios/tipos/scopes/estados/prioridades + flow patterns. Aplica a `dev/issues/` y `dev/flows/`. Issues 0100 + 0103 |
| 33 | [project_commands.md](project_commands.md) | Slash commands por project (`.claude/commands/<project>/`) expuestos via symlink. Desde fn_registry: `/<project>:foo`. Desde el project: `/foo`. Sin colision. | | 33 | [project_commands.md](project_commands.md) | Slash commands por project (`.claude/commands/<project>/`) expuestos via symlink. Desde fn_registry: `/<project>:foo`. Desde el project: `/foo`. Sin colision. |
| 34 | [dod_quality.md](dod_quality.md) | DoD Quality Triada: Mecanica + Cobertura (golden + edge + error path con evidencia ejecutable) + Vida util validada (>=7 dias uso real). Cierra anti-criterios contra checkbox vago. Aplica a `dev/flows/` y issues user-facing. | | 34 | [dod_quality.md](dod_quality.md) | DoD Quality Triada: Mecanica + Cobertura (golden + edge + error path con evidencia ejecutable) + Vida util validada (>=7 dias uso real). Cierra anti-criterios contra checkbox vago. Aplica a `dev/flows/` y issues user-facing. |
| 35 | [llm_invocation.md](llm_invocation.md) | Invocacion de LLM: SIEMPRE `ask_llm` (grupo `claude-direct`, API directa, arranque 0), NUNCA `claude -p` (lento, cold start). One-shot/streaming/tool-loop + legacy `claude_stream_go_core` deprecado. |
+50
View File
@@ -0,0 +1,50 @@
## Invocación de LLM: SIEMPRE `ask_llm`, NUNCA `claude -p`
**REGLA DURA.** Para ejecutar un modelo LLM desde cualquier código del ecosistema (scripts, heredocs, apps, pipelines, agentes), usa el grupo `claude-direct` — empezando por `ask_llm_py_core`. **NUNCA** uses `claude -p` ni lances el binario `claude` como subproceso para obtener una respuesta del modelo.
### Por qué
| | `claude -p` | `ask_llm` / `claude-direct` |
|---|---|---|
| Mecanismo | Lanza Claude Code entero (proceso `claude`) | Habla directo a `api.anthropic.com/v1/messages` |
| Arranque | ~7-15s (carga MCP + `CLAUDE.md` ~100k tokens) | **0 — request HTTP directa** |
| Latencia/msg | ~9-15s | **~2.5s** |
| Coste | Alto (re-carga contexto cada vez) | Mínimo (solo tu prompt) |
| Tools | Las de Claude Code (no controlables) | **Las que tú defines** (`run_claude_tool_loop`) |
| Streaming | indirecto | nativo (`stream_anthropic_messages`) |
`claude -p` es lento, caro y arranca todo Claude Code para una completion. `ask_llm` es la API directa: arranque 0, rápido, con tus propias tools. Usa el token OAuth que Claude Code ya guarda en `~/.claude/.credentials.json`.
### Cómo (según el caso)
| Caso | Usa |
|---|---|
| Pregunta/chat one-shot | `fn run ask_llm "..."` o `from core.ask_llm import ask_llm` |
| Streaming de eventos crudos (text/tool_use deltas) | `stream_anthropic_messages_py_core` |
| Agente con TUS tools (tool-use loop) | `run_claude_tool_loop_py_core` (defines `tools` + `dispatch`) |
| Token OAuth | `load_claude_oauth_token_py_core` (automático dentro de las anteriores) |
| Distribuir fuera del registry | `apps/llm_cli/llm.py` (versión standalone autocontenida) |
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from core.ask_llm import ask_llm
respuesta = ask_llm("resume esto en 3 lineas: ...", model="claude-haiku-4-5-20251001", echo=False)
```
### Legacy
`claude_stream_go_core` (lanza `claude -p --output-format stream-json`) es el **camino antiguo**. No usarlo en código nuevo — preferir las funciones `claude-direct`. Queda solo para compatibilidad de consumidores existentes.
### Excepción acotada
Si una tarea necesita **genuinamente las capacidades de Claude Code** (sus tools nativas, los MCP del repo, plan mode, el contexto del proyecto) y no basta con el modelo + tus propias tools via `run_claude_tool_loop`, entonces NO es una "invocación LLM" simple: documenta por qué en el código. El **default sin excepción es `ask_llm`**.
### Telemetría / auditoría
Un `claude -p` o un `subprocess(["claude", "-p", ...])` en código nuevo es un antipatrón auditable: sustituir por `ask_llm` / `claude-direct`. Buscar usos: `grep -rn 'claude -p' --include='*.py' --include='*.sh' --include='*.go'`.
### Relación con otras reglas
- [[registry_calls]] — patrones canónicos de invocación de funciones; esta regla fija el patrón para la sub-tarea "invocar un LLM".
- [[registry_first]] — reusar antes que reescribir; `ask_llm` es la función reutilizable para LLM.
+1
View File
@@ -46,6 +46,7 @@ Indice de grupos de capacidades del registry. Cada grupo agrupa >=3 funciones qu
| [matrix-mas](matrix-mas.md) | 5 | Migración Synapse→MAS: habilitar MSC3861, verificar login flows, parche .well-known, registro clientes OAuth2, syn2mas | | [matrix-mas](matrix-mas.md) | 5 | Migración Synapse→MAS: habilitar MSC3861, verificar login flows, parche .well-known, registro clientes OAuth2, syn2mas |
| [mesh-3d](mesh-3d.md) | 3 | Carga y upload a GPU de meshes 3D (OBJ, GLB/glTF 2.0): loaders CPU + mesh_gpu_upload OpenGL | | [mesh-3d](mesh-3d.md) | 3 | Carga y upload a GPU de meshes 3D (OBJ, GLB/glTF 2.0): loaders CPU + mesh_gpu_upload OpenGL |
| [terminal-capture](terminal-capture.md) | 6 | Automatizar y capturar el texto de una CLI/TUI interactiva via PTY headless: spawn+input scripteado (one-shot y streaming), render del layout 2D (emulador VT), strip ANSI, delta por prefijo, y parseo de la TUI de claude a datos | | [terminal-capture](terminal-capture.md) | 6 | Automatizar y capturar el texto de una CLI/TUI interactiva via PTY headless: spawn+input scripteado (one-shot y streaming), render del layout 2D (emulador VT), strip ANSI, delta por prefijo, y parseo de la TUI de claude a datos |
| [claude-direct](claude-direct.md) | 3 | Hablar directamente con la API de Anthropic Messages usando el token OAuth de Claude Code (Claude Max): leer token, stream SSE, bucle agentico de tool-use |
## Como anadir grupo ## Como anadir grupo
+91
View File
@@ -0,0 +1,91 @@
# Capability: claude-direct
Hablar directamente con `https://api.anthropic.com/v1/messages` usando el token OAuth de Claude Code (Claude Max), sin lanzar la CLI `claude` ni necesitar una API key de pago separada. 3 funciones Python en `domain: core`.
## Funciones
| ID | Firma | Que hace |
|---|---|---|
| `load_claude_oauth_token_py_core` | `def load_claude_oauth_token(credentials_path: str = "", refresh_if_expired: bool = True) -> str` | Lee el access token OAuth desde `~/.claude/.credentials.json`. Verifica expiry (ms-epoch). Intenta refresh best-effort si expirado. |
| `stream_anthropic_messages_py_core` | `def stream_anthropic_messages(messages: list, model: str = "claude-opus-4-8", ...) -> Iterator[dict]` | POST streaming a `/v1/messages`. Yield de eventos normalizados: `text`, `tool_use_start`, `tool_input_delta`, `done`, `error`. Parser SSE puro testeable por separado. |
| `run_claude_tool_loop_py_core` | `def run_claude_tool_loop(messages, tools, dispatch, ...) -> dict` | Bucle agentico tool-use. Llama `stream_anthropic_messages` en loop, despacha tools via `dispatch{name: callable}`, anade `tool_result`, repite hasta `end_turn` o `max_iters`. |
## Ejemplo canonico end-to-end
### Pregunta simple (sin tools)
```python
import sys
sys.path.insert(0, "python/functions/core")
from stream_anthropic_messages import stream_anthropic_messages
text = ""
for event in stream_anthropic_messages(
messages=[{"role": "user", "content": "di solo PONG"}],
model="claude-haiku-4-5-20251001",
max_tokens=32,
):
if event["type"] == "text":
text += event["text"]
print(event["text"], end="", flush=True)
elif event["type"] == "done":
print(f"\n[stop={event['stop_reason']}]")
# Output: PONG
# [stop=end_turn]
```
### Bucle agentico con tool propia
```python
import sys
sys.path.insert(0, "python/functions/core")
from run_claude_tool_loop import run_claude_tool_loop
from datetime import datetime
tools = [
{
"name": "get_time",
"description": "Devuelve la hora actual en formato HH:MM:SS.",
"input_schema": {"type": "object", "properties": {}, "required": []},
}
]
dispatch = {
"get_time": lambda _inp: datetime.now().strftime("%H:%M:%S"),
}
result = run_claude_tool_loop(
messages=[{"role": "user", "content": "que hora es exactamente ahora?"}],
tools=tools,
dispatch=dispatch,
model="claude-haiku-4-5-20251001",
on_text=lambda d: print(d, end="", flush=True),
)
print(f"\n[iters={result['iterations']} stop={result['stop_reason']}]")
# Claude llama a get_time() -> "14:32:07"
# Luego responde: "Ahora son las 14:32:07."
```
### Solo leer el token (para uso manual)
```python
import sys
sys.path.insert(0, "python/functions/core")
from load_claude_oauth_token import load_claude_oauth_token
token = load_claude_oauth_token(refresh_if_expired=False)
# Pasar como header: {"authorization": f"Bearer {token}"}
```
## Fronteras
- **NO cubre** el flujo de refresh OAuth (endpoint no documentado publicamente) — el refresh es best-effort y puede fallar silenciosamente.
- **NO es un cliente completo** de la API de Anthropic: solo `/v1/messages` con streaming. Files, embeddings, etc. quedan fuera.
- **NO reemplaza** el uso de API keys oficiales para produccion — este grupo es exclusivamente para uso local del token OAuth de Claude Max.
- **NO gestiona rate limits** — el caller debe manejar errores `{"type": "error"}` con `429` en el mensaje.
## Prerequisitos
- Claude Code instalado y usuario logueado (`~/.claude/.credentials.json` debe existir).
- `httpx` disponible en el venv: `python/.venv/bin/python3 -c "import httpx"`.
- Token fresco (Claude Code normalmente lo renueva en background mientras esta abierto).
+44
View File
@@ -0,0 +1,44 @@
---
title: <objetivo de la sesion en una frase>
artefacto: <app|analysis|project|registry|none> · <ruta relativa si aplica>
created: DD/MM/AAAA HH:mm
updated: DD/MM/AAAA HH:mm
status: in_progress
related_issues: []
related_flows: []
---
## Objetivo
<que se quiere conseguir, condiciones de done. Una o dos lineas. Esto fija el alcance.>
## Pendiente
- [ ] 1. <tarea> — <nota / dependencia>
- [ ] 2. <tarea>
- [ ] 3. <tarea>
## En curso
- [~] <tarea> — <progreso actual, donde se quedo, siguiente paso concreto>
## Hecho
- [x] <tarea>
- resultado: <que produjo / como se verifico>
- enlace: <url, path de archivo, id de funcion, hash de commit>
## Enlaces
- <descripcion> — <url o path>
## Issues / flows relacionados
- issue <NNNN> — <titulo> — <estado>
- flow <slug> — <titulo> — <estado>
## Notas
- <decision tomada y por que>
- <bloqueo / pregunta abierta>
- <contexto que no cabe en el codigo>
+1 -1
View File
@@ -2,7 +2,7 @@
name: fn_monitoring name: fn_monitoring
description: "Monitoreo y visualizacion del estado del fn_registry. API HTTP read-only sobre las bases de datos SQLite y dashboard ImGui que consume la API." description: "Monitoreo y visualizacion del estado del fn_registry. API HTTP read-only sobre las bases de datos SQLite y dashboard ImGui que consume la API."
tags: [monitoring, api, dashboard, sqlite, visualization] tags: [monitoring, api, dashboard, sqlite, visualization]
repo_url: "" repo_url: "https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/fn_monitoring"
--- ---
## Apps ## Apps
+69
View File
@@ -0,0 +1,69 @@
---
name: ask_llm
kind: function
lang: py
domain: core
version: "1.0.0"
purity: impure
signature: "def ask_llm(prompt: str, model: str = 'claude-haiku-4-5-20251001', system: str = '', max_tokens: int = 4096, echo: bool = True) -> str"
description: "Atajo de una linea para preguntar al modelo via la API directa de Anthropic con el token OAuth de Claude Max. Arranque 0 (sin proceso claude, sin daemon). Stream a stdout y devuelve el texto. Lanzable como CLI o importable."
error_type: error_go_core
tags: ["claude-direct", "llm", "anthropic", "cli", "oauth", "chat"]
uses_functions:
- stream_anthropic_messages_py_core
uses_types: []
params:
- name: prompt
desc: "El mensaje del usuario (string). Si vacio y stdin es un pipe, se lee de stdin."
- name: model
desc: "Id del modelo Anthropic. Default claude-haiku-4-5-20251001 (mas cuota, rapido). Otros: claude-opus-4-8, claude-sonnet-4-6."
- name: system
desc: "System prompt opcional (string vacio = ninguno)."
- name: max_tokens
desc: "Maximo de tokens de salida. Default 4096."
- name: echo
desc: "Si True, escribe la respuesta a stdout segun llega (streaming). Si False, solo la devuelve."
output: "El texto completo de la respuesta del modelo (string). Cadena vacia si hubo error (mensaje a stderr)."
file_path: python/functions/core/ask_llm.py
---
# ask_llm
Atajo CLI para ejecutar el modelo **rapido** desde la terminal o un script, usando la API directa
de Anthropic con el token OAuth de Claude Max. No lanza el proceso `claude` ni mantiene ningun
daemon — el arranque es 0. Es el wrapper conveniente del grupo `claude-direct`.
## Ejemplo
```bash
# Desde la terminal (fn run)
fn run ask_llm "que es un pseudo-terminal en una frase"
# Directo con el venv
python/.venv/bin/python3 python/functions/core/ask_llm.py "explica los punteros en Go" --model claude-opus-4-8
# Por pipe (lee el prompt de stdin)
echo "resume esto en 2 lineas: ..." | python/.venv/bin/python3 python/functions/core/ask_llm.py
# Importable en un script
import sys, os
sys.path.insert(0, "python/functions")
from core.ask_llm import ask_llm
texto = ask_llm("dame 3 ideas", model="claude-haiku-4-5-20251001", echo=False)
```
## Cuando usarla
- Cuando quieras preguntar al modelo **rapido** desde la terminal o un script, sin lanzar claude
ni montar un servidor. Respuesta en streaming, arranque 0.
- Para chat one-shot. Si necesitas **tools / loop agentico**, usa `run_claude_tool_loop_py_core`.
Si necesitas los **eventos crudos** (tool_use, deltas), usa `stream_anthropic_messages_py_core`.
## Gotchas
- **Rate limits**: el plan limita la frecuencia. En rafagas se reciben `HTTP 429`; espacia las
llamadas o usa `claude-haiku-4-5-20251001` (mas cuota) para tareas frecuentes.
- **Nombre del modelo**: `claude-opus-4-8` es valido; los ids con sufijo de fecha que no existen
dan `404 not_found_error`. Default haiku por cuota y velocidad.
- **No es Claude Code**: es el modelo crudo. Sin tools, MCP, contexto del repo ni plan mode, salvo
los que tu pases (via `run_claude_tool_loop`).
+92
View File
@@ -0,0 +1,92 @@
"""One-line CLI to ask the model fast via the direct Anthropic API (grupo claude-direct).
No claude process, no daemon — arranque 0. Reuses stream_anthropic_messages, which
authenticates with the Claude Max OAuth token from ~/.claude/.credentials.json.
Usage:
python ask_llm.py "que es Go en una frase"
python ask_llm.py --model claude-opus-4-8 --system "responde conciso" "explica los punteros"
echo "resume este texto: ..." | python ask_llm.py
fn run ask_llm "tu pregunta"
For tools / agentic loops, use run_claude_tool_loop_py_core directly in a script.
"""
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from stream_anthropic_messages import stream_anthropic_messages # noqa: E402
DEFAULT_MODEL = "claude-haiku-4-5-20251001"
def ask_llm(
prompt: str,
model: str = DEFAULT_MODEL,
system: str = "",
max_tokens: int = 4096,
echo: bool = True,
) -> str:
"""Ask the model and return its full text answer.
Args:
prompt: The user message.
model: Anthropic model id (e.g. claude-haiku-4-5-20251001, claude-opus-4-8).
system: Optional system prompt.
max_tokens: Max output tokens.
echo: If True, stream the answer to stdout as it arrives.
Returns:
The complete answer text. Empty string on error (message goes to stderr).
"""
parts = []
for ev in stream_anthropic_messages(
messages=[{"role": "user", "content": prompt}],
model=model,
system=system,
max_tokens=max_tokens,
):
t = ev.get("type")
if t == "text":
parts.append(ev["text"])
if echo:
sys.stdout.write(ev["text"])
sys.stdout.flush()
elif t == "error":
sys.stderr.write("ask_llm error: " + str(ev.get("message", "")) + "\n")
return ""
if echo:
sys.stdout.write("\n")
sys.stdout.flush()
return "".join(parts)
def _main(argv):
model = DEFAULT_MODEL
system = ""
prompt_parts = []
i = 0
while i < len(argv):
a = argv[i]
if a in ("--model", "-m") and i + 1 < len(argv):
model = argv[i + 1]
i += 2
elif a in ("--system", "-s") and i + 1 < len(argv):
system = argv[i + 1]
i += 2
else:
prompt_parts.append(a)
i += 1
prompt = " ".join(prompt_parts).strip()
if not prompt and not sys.stdin.isatty():
prompt = sys.stdin.read().strip()
if not prompt:
sys.stderr.write('uso: ask_llm "prompt" [--model M] [--system S]\n')
return 2
ask_llm(prompt, model=model, system=system)
return 0
if __name__ == "__main__":
sys.exit(_main(sys.argv[1:]))
@@ -0,0 +1,70 @@
---
name: load_claude_oauth_token
kind: function
lang: py
domain: core
version: "1.0.0"
purity: impure
signature: "def load_claude_oauth_token(credentials_path: str = \"\", refresh_if_expired: bool = True) -> str"
description: "Lee el access token OAuth de Claude Code desde ~/.claude/.credentials.json. Extrae claudeAiOauth.accessToken y verifica expiry (expiresAt en milisegundos). Si el token esta expirado y refresh_if_expired=True intenta un refresh best-effort via POST al endpoint OAuth de Anthropic; si el refresh falla devuelve el token actual con warning a stderr."
tags: [claude-direct, anthropic, oauth, credentials, token, auth]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [json, os, sys, time, pathlib.Path, httpx]
params:
- name: credentials_path
desc: "ruta absoluta o con ~ al archivo de credenciales. Default: ~/.claude/.credentials.json"
- name: refresh_if_expired
desc: "si True, intenta refrescar el token cuando expiresAt <= now. Default: True"
output: "el access token como string (sk-ant-oat-...). Nunca se imprime a stdout"
tested: true
tests:
- "extrae accessToken de un credentials fixture"
- "token no expirado no intenta refresh"
- "token expirado con refresh_if_expired=False devuelve token igual"
- "token expirado con refresh que falla devuelve token y escribe warning a stderr"
test_file_path: "python/functions/core/load_claude_oauth_token_test.py"
file_path: "python/functions/core/load_claude_oauth_token.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions/core")
from load_claude_oauth_token import load_claude_oauth_token
# Lee desde ~/.claude/.credentials.json por defecto
token = load_claude_oauth_token()
print(token[:20] + "...") # sk-ant-oat01-...
# Sin intento de refresh (evita llamada de red)
token = load_claude_oauth_token(refresh_if_expired=False)
# Ruta custom (ej. para testing con fixture)
token = load_claude_oauth_token(credentials_path="/tmp/fake_creds.json")
```
## Cuando usarla
Cuando vayas a hacer requests directos a `https://api.anthropic.com/v1/messages`
usando la cuenta Claude Max del usuario local (sin API key de pago separada).
Llama esta funcion primero; su resultado va en el header
`Authorization: Bearer <token>` de `stream_anthropic_messages`.
## Gotchas
- **Refresh best-effort**: el endpoint de refresh OAuth de Anthropic no esta
documentado publicamente. El intento puede fallar silenciosamente — en ese
caso se devuelve el token actual con un warning a stderr. La CLI `claude`
mantiene el token fresco en background; si tienes Claude Code abierto,
normalmente el token ya estara vigente.
- **expiresAt en milisegundos**: la estructura JSON usa ms-epoch, no s-epoch.
`expiresAt / 1000 <= time.time()` es la comparacion correcta.
- **Nunca imprimir el token a stdout**: el token es una credencial OAuth.
Esta funcion solo lo retorna; el caller decide como usarlo.
- El archivo `.credentials.json` solo existe si Claude Code esta instalado
y el usuario ha hecho login. Si no existe lanza `FileNotFoundError`.
@@ -0,0 +1,99 @@
"""Load the Claude Code OAuth access token from ~/.claude/.credentials.json."""
import json
import os
import sys
import time
from pathlib import Path
_DEFAULT_CREDENTIALS_PATH = "~/.claude/.credentials.json"
def load_claude_oauth_token(
credentials_path: str = "",
refresh_if_expired: bool = True,
) -> str:
"""Return the Claude Code OAuth access token.
Reads ``~/.claude/.credentials.json`` (or the path you supply),
extracts ``claudeAiOauth.accessToken`` and checks expiry.
If ``refresh_if_expired`` is True and the token appears expired a
best-effort refresh via the Anthropic OAuth token endpoint is attempted
using the stored ``refreshToken``. The refresh may silently fail — in
that case a warning is written to *stderr* and the (possibly expired)
access token is returned anyway. The ``claude`` CLI normally refreshes
the token in the background; if you keep Claude Code open the token
will usually already be fresh.
Args:
credentials_path: Absolute or ``~``-relative path to the credentials
file. Defaults to ``~/.claude/.credentials.json``.
refresh_if_expired: Attempt a token refresh when the stored
``expiresAt`` (milliseconds epoch) is in the past. Set to
False to skip the network call entirely.
Returns:
The raw access token string (``sk-ant-oat-...``). Never printed to
stdout.
Raises:
FileNotFoundError: If the credentials file does not exist.
KeyError: If the expected keys are missing from the JSON.
json.JSONDecodeError: If the file is not valid JSON.
"""
path = Path(credentials_path or _DEFAULT_CREDENTIALS_PATH).expanduser()
raw = path.read_text(encoding="utf-8")
data = json.loads(raw)
oauth = data["claudeAiOauth"]
access_token: str = oauth["accessToken"]
expires_at_ms: int = oauth.get("expiresAt", 0)
refresh_token: str = oauth.get("refreshToken", "")
now_ms = int(time.time() * 1000)
is_expired = expires_at_ms > 0 and expires_at_ms <= now_ms
if is_expired and refresh_if_expired and refresh_token:
new_token = _try_refresh(refresh_token)
if new_token:
return new_token
print(
"warning: Claude OAuth token may be expired; refresh failed. "
"Returning current token anyway.",
file=sys.stderr,
)
return access_token
def _try_refresh(refresh_token: str) -> str:
"""Attempt to refresh the OAuth token. Returns empty string on failure.
The Anthropic OAuth refresh endpoint is not publicly documented.
This is a best-effort attempt using standard OAuth 2.0 conventions
(grant_type=refresh_token). It may not work if Anthropic changes
their token endpoint or requires additional client credentials.
"""
try:
import httpx # type: ignore
resp = httpx.post(
"https://auth.anthropic.com/oauth/token",
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
headers={"content-type": "application/x-www-form-urlencoded"},
timeout=10.0,
)
if resp.status_code == 200:
body = resp.json()
new_token: str = body.get("access_token", "")
if new_token:
return new_token
except Exception as exc: # noqa: BLE001
print(f"warning: token refresh attempt failed: {exc}", file=sys.stderr)
return ""
@@ -0,0 +1,85 @@
"""Tests para load_claude_oauth_token."""
import json
import sys
import os
import time
sys.path.insert(0, os.path.dirname(__file__))
from load_claude_oauth_token import load_claude_oauth_token
def _write_creds(path, access_token, expires_at_ms, refresh_token="sk-ant-refresh-fake"):
data = {
"claudeAiOauth": {
"accessToken": access_token,
"refreshToken": refresh_token,
"expiresAt": expires_at_ms,
}
}
with open(path, "w") as f:
json.dump(data, f)
def test_extrae_accessToken_de_un_credentials_fixture(tmp_path):
creds = tmp_path / "credentials.json"
_write_creds(str(creds), "sk-ant-oat-TEST123", expires_at_ms=int(time.time() * 1000) + 3_600_000)
token = load_claude_oauth_token(credentials_path=str(creds), refresh_if_expired=False)
assert token == "sk-ant-oat-TEST123"
def test_token_no_expirado_no_intenta_refresh(tmp_path, monkeypatch):
creds = tmp_path / "credentials.json"
# expiresAt 1 hora en el futuro
future_ms = int(time.time() * 1000) + 3_600_000
_write_creds(str(creds), "sk-ant-oat-FRESH", expires_at_ms=future_ms)
refresh_called = {"n": 0}
import load_claude_oauth_token as mod
original_try_refresh = mod._try_refresh
def fake_refresh(rt):
refresh_called["n"] += 1
return ""
monkeypatch.setattr(mod, "_try_refresh", fake_refresh)
token = load_claude_oauth_token(credentials_path=str(creds), refresh_if_expired=True)
assert token == "sk-ant-oat-FRESH"
assert refresh_called["n"] == 0
def test_token_expirado_con_refresh_if_expired_False_devuelve_token_igual(tmp_path):
creds = tmp_path / "credentials.json"
# expiresAt en el pasado
past_ms = int(time.time() * 1000) - 3_600_000
_write_creds(str(creds), "sk-ant-oat-EXPIRED", expires_at_ms=past_ms)
token = load_claude_oauth_token(credentials_path=str(creds), refresh_if_expired=False)
assert token == "sk-ant-oat-EXPIRED"
def test_token_expirado_refresh_falla_devuelve_token_y_warning_a_stderr(tmp_path, monkeypatch, capsys):
creds = tmp_path / "credentials.json"
past_ms = int(time.time() * 1000) - 3_600_000
_write_creds(str(creds), "sk-ant-oat-OLD", expires_at_ms=past_ms, refresh_token="sk-ant-refresh-old")
import load_claude_oauth_token as mod
def fake_refresh_fails(rt):
return "" # simula fallo silencioso
monkeypatch.setattr(mod, "_try_refresh", fake_refresh_fails)
token = load_claude_oauth_token(credentials_path=str(creds), refresh_if_expired=True)
assert token == "sk-ant-oat-OLD"
captured = capsys.readouterr()
assert "warning" in captured.err.lower()
assert "expired" in captured.err.lower() or "refresh" in captured.err.lower()
@@ -0,0 +1,112 @@
---
name: run_claude_tool_loop
kind: function
lang: py
domain: core
version: "1.0.0"
purity: impure
signature: "def run_claude_tool_loop(messages: list, tools: list, dispatch: dict, model: str = \"claude-opus-4-8\", system: str = \"\", max_tokens: int = 4096, max_iters: int = 8, on_text=None) -> dict"
description: "Bucle agentico de tool-use sobre la API de Anthropic. Llama stream_anthropic_messages en loop: si el modelo responde con tool_use, despacha cada tool via dispatch{name: callable}, anade el tool_result, y continua. Termina cuando stop_reason != tool_use o se alcanza max_iters. Devuelve {messages, final_text, stop_reason, iterations}."
tags: [claude-direct, anthropic, tools, agent, llm, tool-loop, streaming]
uses_functions: [stream_anthropic_messages_py_core, load_claude_oauth_token_py_core]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [json, typing.Callable]
params:
- name: messages
desc: "conversacion en formato Anthropic [{role, content}]. Se modifica in-place anadiendo mensajes assistant y tool_result"
- name: tools
desc: "lista de tool definitions Anthropic [{name, description, input_schema}]"
- name: dispatch
desc: "mapa {tool_name: callable(input_dict) -> result}. El callable puede retornar cualquier valor serializable o lanzar excepcion"
- name: model
desc: "ID del modelo Anthropic. Default: claude-opus-4-8"
- name: system
desc: "system prompt opcional"
- name: max_tokens
desc: "maximo de tokens por llamada API. Default 4096"
- name: max_iters
desc: "maximo de iteraciones del loop (proteccion contra loops infinitos). Default 8"
- name: on_text
desc: "callback opcional llamado con cada text delta: on_text(delta: str). Util para streaming en tiempo real"
output: "dict con keys: messages (lista actualizada), final_text (texto del ultimo turno assistant), stop_reason (end_turn|tool_use|max_tokens|max_iters|error), iterations (int)"
tested: true
tests:
- "tool_use seguido de end_turn ejecuta dispatch y termina"
- "tool dispatch que falla devuelve is_error True en tool_result"
- "tool desconocido devuelve is_error True"
- "on_text callback recibe deltas de texto"
- "max_iters limita el numero de iteraciones"
test_file_path: "python/functions/core/run_claude_tool_loop_test.py"
file_path: "python/functions/core/run_claude_tool_loop.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions/core")
from run_claude_tool_loop import run_claude_tool_loop
# Definir una tool propia
tools = [
{
"name": "add",
"description": "Suma dos numeros enteros y devuelve el resultado.",
"input_schema": {
"type": "object",
"properties": {
"a": {"type": "number", "description": "primer sumando"},
"b": {"type": "number", "description": "segundo sumando"},
},
"required": ["a", "b"],
},
}
]
# Implementacion de la tool (la provee el usuario)
dispatch = {
"add": lambda inp: inp["a"] + inp["b"],
}
messages = [{"role": "user", "content": "cuanto es 17 + 25?"}]
result = run_claude_tool_loop(
messages=messages,
tools=tools,
dispatch=dispatch,
model="claude-haiku-4-5-20251001",
on_text=lambda d: print(d, end="", flush=True),
)
print(f"\n[stop={result['stop_reason']} iters={result['iterations']}]")
# Claude llama a add(17, 25) -> 42, luego responde "El resultado es 42."
```
## Cuando usarla
Cuando quieras que Claude ejecute herramientas Python propias en un bucle
agentico: calculos, consultas a BD, llamadas a APIs externas, lectura de
archivos, etc. El loop gestiona automaticamente la negociacion
tool_use/tool_result con la API. Usa `on_text` para mostrar la respuesta
en tiempo real mientras el modelo piensa entre tool calls.
## Gotchas
- **messages se modifica in-place**: los mensajes assistant y tool_result se
anaden a la lista pasada. Pasa una copia si necesitas preservar el estado
original: `run_claude_tool_loop(list(messages), ...)`.
- **dispatch debe cubrir todos los tools declarados**: si el modelo invoca
una tool que no esta en dispatch, el tool_result contendra `is_error: True`.
Declara exactamente los tools que dispatch puede manejar.
- **Excepciones en dispatch son capturadas**: si tu callable lanza una
excepcion, se convierte en tool_result con `is_error: True` y el mensaje
del error. El loop continua — el modelo puede corregir y reintentar.
- **max_iters como seguro**: sin este limite, un modelo que siempre pide
tools podria iterar indefinidamente. Default 8 es conservador; subelo
para agentes complejos con muchos pasos.
- **final_text es solo el ultimo turno**: si el modelo genero texto en
iteraciones anteriores (raro pero posible), solo se devuelve el del
ultimo turno. El historial completo esta en `messages`.
@@ -0,0 +1,192 @@
"""Agentic tool-use loop over the Anthropic Messages API."""
import json
import sys
import os
from typing import Callable
sys.path.insert(0, os.path.dirname(__file__))
from stream_anthropic_messages import stream_anthropic_messages # noqa: E402
def run_claude_tool_loop(
messages: list,
tools: list,
dispatch: dict,
model: str = "claude-opus-4-8",
system: str = "",
max_tokens: int = 4096,
max_iters: int = 8,
on_text: Callable[[str], None] = None,
) -> dict:
"""Run an agentic tool-use loop against the Anthropic Messages API.
Repeatedly calls ``stream_anthropic_messages`` until the model stops
requesting tools (``stop_reason != "tool_use"``) or ``max_iters`` is
reached. At each iteration:
1. Stream the assistant response, accumulating text and tool_use blocks.
2. Append the assistant message (text + tool_use content blocks) to
``messages``.
3. If ``stop_reason == "tool_use"``: dispatch every requested tool via
the ``dispatch`` mapping, append a ``user`` message with all
``tool_result`` blocks, and continue.
4. Otherwise terminate and return the result dict.
Args:
messages: Conversation so far in Anthropic format. Modified in-place
(tool_result messages are appended). Pass a copy if you want to
preserve the original.
tools: List of tool definitions in Anthropic format::
[{"name": "add", "description": "...",
"input_schema": {"type": "object",
"properties": {"a": {"type": "number"},
"b": {"type": "number"}},
"required": ["a", "b"]}}]
dispatch: Mapping from tool name to callable ``(input_dict) -> result``.
The callable may return any JSON-serialisable value or raise an
exception (which is caught and returned as an error tool_result).
model: Anthropic model ID. Default ``claude-opus-4-8``.
system: Optional system prompt.
max_tokens: Max tokens per API call. Default 4096.
max_iters: Hard cap on tool-use iterations. Default 8.
on_text: Optional callback called with each text delta as it streams.
Useful for real-time display. E.g. ``on_text=lambda d: print(d, end="", flush=True)``.
Returns:
Dict with keys:
- ``messages``: updated conversation (same list, modified in-place).
- ``final_text``: concatenated text from the last assistant turn.
- ``stop_reason``: ``"end_turn"``, ``"tool_use"``, ``"max_tokens"``,
``"max_iters"``, or ``"error"``.
- ``iterations``: number of loop iterations executed.
"""
iterations = 0
final_text = ""
stop_reason = "max_iters"
for _ in range(max_iters):
iterations += 1
text_parts: list[str] = []
# tool_uses: list of {id, name, input_json_parts, index}
tool_uses: list[dict] = []
# index -> tool_use dict (for partial_json accumulation)
index_map: dict[int, dict] = {}
current_stop_reason = "end_turn"
for event in stream_anthropic_messages(
messages=messages,
model=model,
system=system,
tools=tools,
max_tokens=max_tokens,
):
etype = event.get("type", "")
if etype == "text":
delta = event["text"]
text_parts.append(delta)
if on_text is not None:
on_text(delta)
elif etype == "tool_use_start":
entry = {
"id": event["id"],
"name": event["name"],
"index": event["index"],
"partial_json_parts": [],
}
tool_uses.append(entry)
index_map[event["index"]] = entry
elif etype == "tool_input_delta":
idx = event["index"]
if idx in index_map:
index_map[idx]["partial_json_parts"].append(event["partial_json"])
elif etype == "done":
current_stop_reason = event.get("stop_reason", "end_turn")
elif etype == "error":
final_text = "".join(text_parts)
return {
"messages": messages,
"final_text": final_text,
"stop_reason": "error",
"iterations": iterations,
"error": event.get("message", "unknown error"),
}
final_text = "".join(text_parts)
stop_reason = current_stop_reason
# Build the assistant content blocks
assistant_content: list[dict] = []
if final_text:
assistant_content.append({"type": "text", "text": final_text})
for tu in tool_uses:
raw_input = "".join(tu["partial_json_parts"])
try:
parsed_input = json.loads(raw_input) if raw_input else {}
except json.JSONDecodeError:
parsed_input = {"_raw": raw_input}
assistant_content.append({
"type": "tool_use",
"id": tu["id"],
"name": tu["name"],
"input": parsed_input,
})
messages.append({"role": "assistant", "content": assistant_content})
if stop_reason != "tool_use" or not tool_uses:
break
# Dispatch tools and build tool_result message
tool_results: list[dict] = []
for tu in tool_uses:
tool_name = tu["name"]
raw_input = "".join(tu["partial_json_parts"])
try:
parsed_input = json.loads(raw_input) if raw_input else {}
except json.JSONDecodeError:
parsed_input = {"_raw": raw_input}
if tool_name not in dispatch:
result_content = f"Error: tool '{tool_name}' not found in dispatch"
is_error = True
else:
try:
result_value = dispatch[tool_name](parsed_input)
result_content = (
result_value
if isinstance(result_value, str)
else json.dumps(result_value)
)
is_error = False
except Exception as exc: # noqa: BLE001
result_content = f"Error executing {tool_name}: {exc}"
is_error = True
tool_result: dict = {
"type": "tool_result",
"tool_use_id": tu["id"],
"content": result_content,
}
if is_error:
tool_result["is_error"] = True
tool_results.append(tool_result)
messages.append({"role": "user", "content": tool_results})
else:
stop_reason = "max_iters"
return {
"messages": messages,
"final_text": final_text,
"stop_reason": stop_reason,
"iterations": iterations,
}
@@ -0,0 +1,188 @@
"""Tests para run_claude_tool_loop — usa fake de stream_anthropic_messages, sin HTTP."""
import sys
import os
import json
sys.path.insert(0, os.path.dirname(__file__))
# ---------------------------------------------------------------------------
# Fake de stream_anthropic_messages inyectado via monkeypatch
# ---------------------------------------------------------------------------
def _make_text_response(text: str, stop_reason: str = "end_turn"):
"""Genera la secuencia de eventos de una respuesta de texto pura."""
for ch in text:
yield {"type": "text", "text": ch}
yield {"type": "done", "stop_reason": stop_reason}
def _make_tool_use_response(tool_id: str, tool_name: str, tool_input: dict):
"""Genera la secuencia de eventos de un tool_use."""
yield {"type": "tool_use_start", "id": tool_id, "name": tool_name, "index": 0}
raw = json.dumps(tool_input)
# Simula streaming del JSON en dos partes
mid = len(raw) // 2
yield {"type": "tool_input_delta", "index": 0, "partial_json": raw[:mid]}
yield {"type": "tool_input_delta", "index": 0, "partial_json": raw[mid:]}
yield {"type": "done", "stop_reason": "tool_use"}
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_tool_use_seguido_de_end_turn_ejecuta_dispatch_y_termina(monkeypatch):
import run_claude_tool_loop as mod
call_count = {"n": 0}
def fake_stream(messages, model, system, tools, max_tokens):
call_count["n"] += 1
if call_count["n"] == 1:
# Primera llamada: el modelo pide la tool "add"
yield from _make_tool_use_response("toolu_01", "add", {"a": 3, "b": 4})
else:
# Segunda llamada: el modelo responde con el resultado
yield from _make_text_response("El resultado es 7.")
monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream)
dispatch_called = {"input": None}
def add_tool(inp):
dispatch_called["input"] = inp
return inp["a"] + inp["b"]
messages = [{"role": "user", "content": "cuanto es 3 + 4?"}]
result = mod.run_claude_tool_loop(
messages=messages,
tools=[{"name": "add", "description": "suma", "input_schema": {}}],
dispatch={"add": add_tool},
model="claude-haiku-4-5-20251001",
)
assert result["stop_reason"] == "end_turn"
assert result["iterations"] == 2
assert result["final_text"] == "El resultado es 7."
assert dispatch_called["input"] == {"a": 3, "b": 4}
# Verifica que se anado el tool_result al historial
roles = [m["role"] for m in result["messages"]]
assert roles.count("assistant") == 2
assert roles.count("user") >= 2
# El mensaje user con tool_result debe tener is_error=False (ausente)
tool_result_msg = next(
m for m in result["messages"]
if m["role"] == "user" and isinstance(m["content"], list)
and m["content"] and m["content"][0].get("type") == "tool_result"
)
assert tool_result_msg["content"][0].get("is_error") is None or \
tool_result_msg["content"][0].get("is_error") is False
def test_tool_dispatch_que_falla_devuelve_is_error_True_en_tool_result(monkeypatch):
import run_claude_tool_loop as mod
call_count = {"n": 0}
def fake_stream(messages, model, system, tools, max_tokens):
call_count["n"] += 1
if call_count["n"] == 1:
yield from _make_tool_use_response("toolu_02", "divide", {"a": 10, "b": 0})
else:
yield from _make_text_response("No se puede dividir por cero.")
monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream)
def divide_tool(inp):
return inp["a"] / inp["b"] # ZeroDivisionError
messages = [{"role": "user", "content": "divide 10 entre 0"}]
result = mod.run_claude_tool_loop(
messages=messages,
tools=[{"name": "divide", "description": "division", "input_schema": {}}],
dispatch={"divide": divide_tool},
)
# Debe haber un tool_result con is_error=True
tool_result_msg = next(
m for m in result["messages"]
if m["role"] == "user" and isinstance(m["content"], list)
and m["content"] and m["content"][0].get("type") == "tool_result"
)
assert tool_result_msg["content"][0].get("is_error") is True
assert "Error" in tool_result_msg["content"][0]["content"]
def test_tool_desconocido_devuelve_is_error_True(monkeypatch):
import run_claude_tool_loop as mod
call_count = {"n": 0}
def fake_stream(messages, model, system, tools, max_tokens):
call_count["n"] += 1
if call_count["n"] == 1:
yield from _make_tool_use_response("toolu_03", "nonexistent_tool", {"x": 1})
else:
yield from _make_text_response("No pude usar esa tool.")
monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream)
messages = [{"role": "user", "content": "usa nonexistent_tool"}]
result = mod.run_claude_tool_loop(
messages=messages,
tools=[],
dispatch={}, # dispatch vacio — tool no existe
)
tool_result_msg = next(
m for m in result["messages"]
if m["role"] == "user" and isinstance(m["content"], list)
and m["content"] and m["content"][0].get("type") == "tool_result"
)
assert tool_result_msg["content"][0].get("is_error") is True
assert "not found" in tool_result_msg["content"][0]["content"]
def test_on_text_callback_recibe_deltas_de_texto(monkeypatch):
import run_claude_tool_loop as mod
def fake_stream(messages, model, system, tools, max_tokens):
yield from _make_text_response("hola mundo")
monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream)
received = []
messages = [{"role": "user", "content": "saluda"}]
result = mod.run_claude_tool_loop(
messages=messages,
tools=[],
dispatch={},
on_text=lambda d: received.append(d),
)
assert "".join(received) == "hola mundo"
assert result["final_text"] == "hola mundo"
def test_max_iters_limita_el_numero_de_iteraciones(monkeypatch):
import run_claude_tool_loop as mod
# Siempre responde con tool_use para forzar loop infinito
def fake_stream_infinite(messages, model, system, tools, max_tokens):
yield from _make_tool_use_response("toolu_inf", "noop", {})
monkeypatch.setattr(mod, "stream_anthropic_messages", fake_stream_infinite)
messages = [{"role": "user", "content": "loop"}]
result = mod.run_claude_tool_loop(
messages=messages,
tools=[{"name": "noop", "description": "no-op", "input_schema": {}}],
dispatch={"noop": lambda inp: "ok"},
max_iters=3,
)
assert result["iterations"] == 3
assert result["stop_reason"] == "max_iters"
@@ -0,0 +1,86 @@
---
name: stream_anthropic_messages
kind: function
lang: py
domain: core
version: "1.0.0"
purity: impure
signature: "def stream_anthropic_messages(messages: list, model: str = \"claude-opus-4-8\", system: str = \"\", tools: list = None, max_tokens: int = 4096, token: str = \"\") -> Iterator[dict]"
description: "Llama a https://api.anthropic.com/v1/messages con stream:true usando el token OAuth de Claude Code y hace yield de eventos normalizados: text, tool_use_start, tool_input_delta, done, error. Parsea el SSE en tiempo real. Si token vacio llama load_claude_oauth_token automaticamente."
tags: [claude-direct, anthropic, streaming, sse, llm, tools, http]
uses_functions: [load_claude_oauth_token_py_core]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [json, sys, typing.Iterator, httpx]
params:
- name: messages
desc: "lista de mensajes Anthropic [{role: user|assistant, content: ...}]"
- name: model
desc: "ID del modelo Anthropic. Default: claude-opus-4-8"
- name: system
desc: "system prompt opcional. Si vacio no se incluye en el body"
- name: tools
desc: "lista de tool definitions [{name, description, input_schema}]. None para no usar tools"
- name: max_tokens
desc: "maximo de tokens en la respuesta. Default 4096"
- name: token
desc: "access token OAuth. Si vacio se carga automaticamente desde ~/.claude/.credentials.json"
output: "Iterator[dict] — eventos normalizados: {type:text,text}, {type:tool_use_start,id,name,index}, {type:tool_input_delta,index,partial_json}, {type:done,stop_reason}, {type:error,message}"
tested: true
tests:
- "parse_sse_chunk text delta emite evento text"
- "parse_sse_chunk tool_use_start emite evento tool_use_start con id y name"
- "parse_sse_chunk tool_input_delta emite evento tool_input_delta con partial_json"
- "parse_sse_chunk message_delta con stop_reason emite done"
- "parse_sse_chunk secuencia completa text PONG reconstruye el texto"
- "parse_sse_chunk secuencia completa tool_use reconstruye el input JSON"
test_file_path: "python/functions/core/stream_anthropic_messages_test.py"
file_path: "python/functions/core/stream_anthropic_messages.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions/core")
from stream_anthropic_messages import stream_anthropic_messages
text = ""
for event in stream_anthropic_messages(
messages=[{"role": "user", "content": "di solo PONG"}],
model="claude-haiku-4-5-20251001",
max_tokens=32,
):
if event["type"] == "text":
text += event["text"]
print(event["text"], end="", flush=True)
elif event["type"] == "done":
print(f"\n[stop_reason={event['stop_reason']}]")
elif event["type"] == "error":
print(f"ERROR: {event['message']}", file=sys.stderr)
print("respuesta completa:", text)
```
## Cuando usarla
Cuando necesites hablar directamente con la API de Anthropic desde Python
sin lanzar la CLI `claude`, especialmente cuando quieras streaming en tiempo
real del texto o necesites ejecutar tools propias via `run_claude_tool_loop`.
Tambien util para integrar Claude en scripts de analisis donde el bucle
de tools se gestiona en Python.
## Gotchas
- **El parser SSE es puro y testeable**: `parse_sse_chunk` no hace HTTP.
Puedes testear el parseo completo sin mocks de red.
- **Eventos `raw` son pass-through**: `message_start`, `content_block_stop`,
`ping` y otros eventos no mapeados se emiten como `{type: raw, event: ..., data: ...}`.
El consumidor los puede ignorar o inspeccionar.
- **Reconstruccion de tool input**: los `partial_json` de un mismo `index`
deben concatenarse hasta el evento `done`. `run_claude_tool_loop` hace esto
automaticamente.
- **Timeout 120s**: configurable en el codigo si necesitas respuestas muy largas.
- La funcion `parse_sse_chunk` es pure (importable y testeable por separado).
@@ -0,0 +1,195 @@
"""Stream responses from the Anthropic Messages API using a Claude Code OAuth token."""
import json
import sys
from typing import Iterator
_API_URL = "https://api.anthropic.com/v1/messages"
_ANTHROPIC_VERSION = "2023-06-01"
# ---------------------------------------------------------------------------
# Pure SSE parser — fully testable without HTTP
# ---------------------------------------------------------------------------
def parse_sse_chunk(chunk: str) -> Iterator[dict]:
"""Parse one SSE chunk (possibly containing multiple event/data pairs).
Yields normalised event dicts:
- ``{"type": "text", "text": "<delta>"}``
- ``{"type": "tool_use_start", "id": "...", "name": "...", "index": N}``
- ``{"type": "tool_input_delta","index": N, "partial_json": "<delta>"}``
- ``{"type": "done", "stop_reason": "<...>"}``
- ``{"type": "raw", "event": "...", "data": {...}}`` (pass-through)
Args:
chunk: Raw SSE text with ``event:`` / ``data:`` lines, separated by
blank lines between events.
"""
current_event = ""
current_data = ""
for line in chunk.splitlines():
if line.startswith("event:"):
current_event = line[len("event:"):].strip()
elif line.startswith("data:"):
current_data = line[len("data:"):].strip()
elif line == "":
if current_data and current_data != "[DONE]":
try:
data = json.loads(current_data)
except json.JSONDecodeError:
current_event = ""
current_data = ""
continue
yield from _normalise_event(current_event, data)
current_event = ""
current_data = ""
# flush final event without trailing blank line
if current_data and current_data != "[DONE]":
try:
data = json.loads(current_data)
yield from _normalise_event(current_event, data)
except json.JSONDecodeError:
pass
def _normalise_event(event_type: str, data: dict) -> Iterator[dict]:
"""Convert a raw SSE event dict into one or more normalised dicts."""
if event_type == "content_block_start":
block = data.get("content_block", {})
if block.get("type") == "tool_use":
yield {
"type": "tool_use_start",
"id": block.get("id", ""),
"name": block.get("name", ""),
"index": data.get("index", 0),
}
elif event_type == "content_block_delta":
delta = data.get("delta", {})
delta_type = delta.get("type", "")
if delta_type == "text_delta":
yield {"type": "text", "text": delta.get("text", "")}
elif delta_type == "input_json_delta":
yield {
"type": "tool_input_delta",
"index": data.get("index", 0),
"partial_json": delta.get("partial_json", ""),
}
elif event_type == "message_delta":
stop_reason = data.get("delta", {}).get("stop_reason", "")
if stop_reason:
yield {"type": "done", "stop_reason": stop_reason}
elif event_type == "message_stop":
# emitted after message_delta — only yield done if not already seen
pass
else:
# pass-through for message_start, content_block_stop, ping, etc.
yield {"type": "raw", "event": event_type, "data": data}
# ---------------------------------------------------------------------------
# Streaming HTTP function
# ---------------------------------------------------------------------------
def stream_anthropic_messages(
messages: list,
model: str = "claude-opus-4-8",
system: str = "",
tools: list = None,
max_tokens: int = 4096,
token: str = "",
) -> Iterator[dict]:
"""Stream an Anthropic Messages API call using a Claude Code OAuth token.
Posts to ``https://api.anthropic.com/v1/messages`` with ``stream: true``
and yields normalised event dicts as they arrive:
- ``{"type": "text", "text": "<delta>"}``
- ``{"type": "tool_use_start", "id": "...", "name": "...", "index": N}``
- ``{"type": "tool_input_delta","index": N, "partial_json": "<delta>"}``
- ``{"type": "done", "stop_reason": "end_turn|tool_use|..."}``
- ``{"type": "error", "message": "..."}`` on HTTP error / exception
Args:
messages: List of message dicts in Anthropic format
(``[{"role": "user", "content": "..."}]``).
model: Anthropic model ID. Default ``claude-opus-4-8``.
system: Optional system prompt string.
tools: Optional list of tool definition dicts.
max_tokens: Maximum tokens in the response. Default 4096.
token: OAuth access token. If empty, ``load_claude_oauth_token()``
is called automatically.
Yields:
Normalised event dicts (see above).
"""
if not token:
try:
try:
from core.load_claude_oauth_token import load_claude_oauth_token
except ImportError:
from load_claude_oauth_token import load_claude_oauth_token
token = load_claude_oauth_token()
except Exception as exc:
yield {"type": "error", "message": f"failed to load token: {exc}"}
return
body: dict = {
"model": model,
"max_tokens": max_tokens,
"messages": messages,
"stream": True,
}
if system:
body["system"] = system
if tools:
body["tools"] = tools
headers = {
"authorization": f"Bearer {token}",
"anthropic-version": _ANTHROPIC_VERSION,
"content-type": "application/json",
}
try:
import httpx
with httpx.stream(
"POST",
_API_URL,
json=body,
headers=headers,
timeout=120.0,
) as resp:
if resp.status_code != 200:
error_body = resp.read().decode("utf-8", errors="replace")
yield {
"type": "error",
"message": f"HTTP {resp.status_code}: {error_body[:500]}",
}
return
buffer = ""
for chunk in resp.iter_text():
buffer += chunk
# Process complete SSE blocks (separated by double newlines)
while "\n\n" in buffer:
block, buffer = buffer.split("\n\n", 1)
block += "\n\n"
yield from parse_sse_chunk(block)
# Flush any remaining content
if buffer.strip():
yield from parse_sse_chunk(buffer + "\n\n")
except Exception as exc:
yield {"type": "error", "message": str(exc)}
@@ -0,0 +1,163 @@
"""Tests para stream_anthropic_messages — solo el parser SSE puro, sin HTTP."""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from stream_anthropic_messages import parse_sse_chunk
# ---------------------------------------------------------------------------
# Helpers: construir chunks SSE sinteticos (igual a lo que envia Anthropic)
# ---------------------------------------------------------------------------
def sse(event: str, data: dict) -> str:
import json
return f"event: {event}\ndata: {json.dumps(data)}\n\n"
# ---------------------------------------------------------------------------
# Tests de eventos individuales
# ---------------------------------------------------------------------------
def test_parse_sse_chunk_text_delta_emite_evento_text():
chunk = sse("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": "P"},
})
events = list(parse_sse_chunk(chunk))
text_events = [e for e in events if e["type"] == "text"]
assert len(text_events) == 1
assert text_events[0]["text"] == "P"
def test_parse_sse_chunk_tool_use_start_emite_evento_tool_use_start():
chunk = sse("content_block_start", {
"type": "content_block_start",
"index": 1,
"content_block": {"type": "tool_use", "id": "toolu_01", "name": "add"},
})
events = list(parse_sse_chunk(chunk))
tu_events = [e for e in events if e["type"] == "tool_use_start"]
assert len(tu_events) == 1
assert tu_events[0]["id"] == "toolu_01"
assert tu_events[0]["name"] == "add"
assert tu_events[0]["index"] == 1
def test_parse_sse_chunk_tool_input_delta_emite_evento_tool_input_delta():
chunk = sse("content_block_delta", {
"type": "content_block_delta",
"index": 1,
"delta": {"type": "input_json_delta", "partial_json": '{"a":'},
})
events = list(parse_sse_chunk(chunk))
ti_events = [e for e in events if e["type"] == "tool_input_delta"]
assert len(ti_events) == 1
assert ti_events[0]["index"] == 1
assert ti_events[0]["partial_json"] == '{"a":'
def test_parse_sse_chunk_message_delta_con_stop_reason_emite_done():
chunk = sse("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"output_tokens": 5},
})
events = list(parse_sse_chunk(chunk))
done_events = [e for e in events if e["type"] == "done"]
assert len(done_events) == 1
assert done_events[0]["stop_reason"] == "end_turn"
def test_parse_sse_chunk_secuencia_completa_text_PONG_reconstruye_el_texto():
"""Simula la secuencia completa de eventos SSE para una respuesta de texto."""
import json
# Secuencia tipica: message_start, content_block_start text, deltas, stop
chunks = "".join([
sse("message_start", {
"type": "message_start",
"message": {"id": "msg_01", "type": "message", "role": "assistant",
"content": [], "model": "claude-haiku-4-5-20251001",
"stop_reason": None, "usage": {"input_tokens": 5}},
}),
sse("content_block_start", {
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""},
}),
sse("content_block_delta", {
"type": "content_block_delta", "index": 0,
"delta": {"type": "text_delta", "text": "P"},
}),
sse("content_block_delta", {
"type": "content_block_delta", "index": 0,
"delta": {"type": "text_delta", "text": "ON"},
}),
sse("content_block_delta", {
"type": "content_block_delta", "index": 0,
"delta": {"type": "text_delta", "text": "G"},
}),
sse("content_block_stop", {"type": "content_block_stop", "index": 0}),
sse("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn"},
"usage": {"output_tokens": 4},
}),
sse("message_stop", {"type": "message_stop"}),
])
events = list(parse_sse_chunk(chunks))
text = "".join(e["text"] for e in events if e["type"] == "text")
done = [e for e in events if e["type"] == "done"]
assert text == "PONG"
assert len(done) == 1
assert done[0]["stop_reason"] == "end_turn"
def test_parse_sse_chunk_secuencia_completa_tool_use_reconstruye_el_input_JSON():
"""Simula la secuencia SSE para un tool_use con input_json_delta streaming."""
chunks = "".join([
sse("content_block_start", {
"type": "content_block_start",
"index": 0,
"content_block": {"type": "tool_use", "id": "toolu_42", "name": "add"},
}),
sse("content_block_delta", {
"type": "content_block_delta", "index": 0,
"delta": {"type": "input_json_delta", "partial_json": '{"a":'},
}),
sse("content_block_delta", {
"type": "content_block_delta", "index": 0,
"delta": {"type": "input_json_delta", "partial_json": ' 3, "b": 4}'},
}),
sse("content_block_stop", {"type": "content_block_stop", "index": 0}),
sse("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "tool_use"},
"usage": {"output_tokens": 10},
}),
])
events = list(parse_sse_chunk(chunks))
tu_start = [e for e in events if e["type"] == "tool_use_start"]
ti_deltas = [e for e in events if e["type"] == "tool_input_delta"]
done = [e for e in events if e["type"] == "done"]
assert len(tu_start) == 1
assert tu_start[0]["name"] == "add"
assert tu_start[0]["id"] == "toolu_42"
# Reconstruir el input JSON concatenando los partial_json
import json
raw_input = "".join(d["partial_json"] for d in ti_deltas)
parsed = json.loads(raw_input)
assert parsed == {"a": 3, "b": 4}
assert len(done) == 1
assert done[0]["stop_reason"] == "tool_use"