feat: llm_cli — CLI autocontenida para chatear con Claude
Un solo archivo (llm.py) que habla directo con la API de Anthropic Messages usando el token OAuth que Claude Code guarda en ~/.claude/.credentials.json. Sin servidor, sin arranque: chat interactivo con memoria, one-shot, pipe, y bucle de tools propias (run_tool_loop). Empaqueta standalone la logica del grupo claude-direct del registry para poder distribuirla (basta llm.py + README.md, el receptor solo necesita httpx + Claude Code logueado).
This commit is contained in:
@@ -0,0 +1,344 @@
|
||||
#!/usr/bin/env python3
|
||||
"""llm — a tiny, self-contained CLI to chat with Claude from the terminal.
|
||||
|
||||
It talks straight to the Anthropic Messages API using the OAuth token that Claude
|
||||
Code already stores on your machine (~/.claude/.credentials.json), so there is
|
||||
nothing to configure: install httpx, run it, type.
|
||||
|
||||
Usage:
|
||||
python3 llm.py # interactive chat (with memory)
|
||||
python3 llm.py "your question" # one-shot answer
|
||||
echo "summarize this" | python3 llm.py
|
||||
python3 llm.py --model claude-opus-4-8 "explain pointers"
|
||||
|
||||
Interactive commands:
|
||||
/model <id> switch model (e.g. claude-opus-4-8, claude-haiku-4-5-20251001)
|
||||
/system <text> set a system prompt
|
||||
/reset start a fresh conversation
|
||||
/exit quit
|
||||
|
||||
Requirements:
|
||||
- Python 3.9+
|
||||
- httpx (pip install httpx)
|
||||
- Claude Code installed and logged in (so ~/.claude/.credentials.json exists)
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Callable, Iterator
|
||||
|
||||
API_URL = "https://api.anthropic.com/v1/messages"
|
||||
ANTHROPIC_VERSION = "2023-06-01"
|
||||
DEFAULT_MODEL = "claude-haiku-4-5-20251001"
|
||||
CREDENTIALS_PATH = "~/.claude/.credentials.json"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Token: read (and best-effort refresh) the Claude Code OAuth access token
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_oauth_token(credentials_path: str = "", refresh_if_expired: bool = True) -> str:
|
||||
"""Return the Claude Code OAuth access token from the local credentials file."""
|
||||
path = Path(credentials_path or CREDENTIALS_PATH).expanduser()
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"No se encontro {path}. Instala Claude Code e inicia sesion "
|
||||
"(`claude`) para generar las credenciales."
|
||||
)
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
oauth = data["claudeAiOauth"]
|
||||
access_token = oauth["accessToken"]
|
||||
expires_at_ms = oauth.get("expiresAt", 0)
|
||||
refresh_token = oauth.get("refreshToken", "")
|
||||
|
||||
now_ms = int(time.time() * 1000)
|
||||
if expires_at_ms and expires_at_ms <= now_ms and refresh_if_expired and refresh_token:
|
||||
new_token = _try_refresh(refresh_token)
|
||||
if new_token:
|
||||
return new_token
|
||||
print("warning: token may be expired; refresh failed.", file=sys.stderr)
|
||||
return access_token
|
||||
|
||||
|
||||
def _try_refresh(refresh_token: str) -> str:
|
||||
"""Best-effort OAuth refresh. Returns '' on failure (claude normally refreshes)."""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
resp = httpx.post(
|
||||
"https://auth.anthropic.com/oauth/token",
|
||||
data={"grant_type": "refresh_token", "refresh_token": refresh_token},
|
||||
headers={"content-type": "application/x-www-form-urlencoded"},
|
||||
timeout=10.0,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
return resp.json().get("access_token", "")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
print(f"warning: refresh failed: {exc}", file=sys.stderr)
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SSE parsing (pure)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _parse_sse_chunk(chunk: str) -> Iterator[dict]:
|
||||
event = ""
|
||||
data = ""
|
||||
for line in chunk.splitlines():
|
||||
if line.startswith("event:"):
|
||||
event = line[len("event:"):].strip()
|
||||
elif line.startswith("data:"):
|
||||
data = line[len("data:"):].strip()
|
||||
elif line == "":
|
||||
if data and data != "[DONE]":
|
||||
try:
|
||||
yield from _normalise(event, json.loads(data))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
event = data = ""
|
||||
if data and data != "[DONE]":
|
||||
try:
|
||||
yield from _normalise(event, json.loads(data))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
|
||||
def _normalise(event_type: str, data: dict) -> Iterator[dict]:
|
||||
if event_type == "content_block_start":
|
||||
block = data.get("content_block", {})
|
||||
if block.get("type") == "tool_use":
|
||||
yield {"type": "tool_use_start", "id": block.get("id", ""),
|
||||
"name": block.get("name", ""), "index": data.get("index", 0)}
|
||||
elif event_type == "content_block_delta":
|
||||
delta = data.get("delta", {})
|
||||
if delta.get("type") == "text_delta":
|
||||
yield {"type": "text", "text": delta.get("text", "")}
|
||||
elif delta.get("type") == "input_json_delta":
|
||||
yield {"type": "tool_input_delta", "index": data.get("index", 0),
|
||||
"partial_json": delta.get("partial_json", "")}
|
||||
elif event_type == "message_delta":
|
||||
stop = data.get("delta", {}).get("stop_reason", "")
|
||||
if stop:
|
||||
yield {"type": "done", "stop_reason": stop}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Streaming call
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def stream_messages(messages: list, model: str = DEFAULT_MODEL, system: str = "",
|
||||
tools: list = None, max_tokens: int = 4096, token: str = "") -> Iterator[dict]:
|
||||
"""Stream an Anthropic Messages API call. Yields text/tool_use/done/error events."""
|
||||
if not token:
|
||||
try:
|
||||
token = load_oauth_token()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
yield {"type": "error", "message": f"failed to load token: {exc}"}
|
||||
return
|
||||
|
||||
body = {"model": model, "max_tokens": max_tokens, "messages": messages, "stream": True}
|
||||
if system:
|
||||
body["system"] = system
|
||||
if tools:
|
||||
body["tools"] = tools
|
||||
headers = {"authorization": f"Bearer {token}", "anthropic-version": ANTHROPIC_VERSION,
|
||||
"content-type": "application/json"}
|
||||
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
yield {"type": "error", "message": "falta httpx. Instala con: pip install httpx"}
|
||||
return
|
||||
|
||||
try:
|
||||
with httpx.stream("POST", API_URL, json=body, headers=headers, timeout=120.0) as resp:
|
||||
if resp.status_code != 200:
|
||||
err = resp.read().decode("utf-8", errors="replace")
|
||||
yield {"type": "error", "message": f"HTTP {resp.status_code}: {err[:400]}"}
|
||||
return
|
||||
buf = ""
|
||||
for chunk in resp.iter_text():
|
||||
buf += chunk
|
||||
while "\n\n" in buf:
|
||||
block, buf = buf.split("\n\n", 1)
|
||||
yield from _parse_sse_chunk(block + "\n\n")
|
||||
if buf.strip():
|
||||
yield from _parse_sse_chunk(buf + "\n\n")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
yield {"type": "error", "message": str(exc)}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agentic tool loop (define your own tools)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def run_tool_loop(messages: list, tools: list, dispatch: dict, model: str = DEFAULT_MODEL,
|
||||
system: str = "", max_tokens: int = 4096, max_iters: int = 8,
|
||||
on_text: Callable[[str], None] = None) -> dict:
|
||||
"""Run a tool-use loop. `tools` are Anthropic tool defs; `dispatch` maps name->callable."""
|
||||
iterations = 0
|
||||
final_text = ""
|
||||
stop_reason = "max_iters"
|
||||
|
||||
for _ in range(max_iters):
|
||||
iterations += 1
|
||||
text_parts = []
|
||||
tool_uses = []
|
||||
index_map = {}
|
||||
cur_stop = "end_turn"
|
||||
|
||||
for ev in stream_messages(messages, model=model, system=system, tools=tools, max_tokens=max_tokens):
|
||||
t = ev.get("type")
|
||||
if t == "text":
|
||||
text_parts.append(ev["text"])
|
||||
if on_text:
|
||||
on_text(ev["text"])
|
||||
elif t == "tool_use_start":
|
||||
entry = {"id": ev["id"], "name": ev["name"], "index": ev["index"], "parts": []}
|
||||
tool_uses.append(entry)
|
||||
index_map[ev["index"]] = entry
|
||||
elif t == "tool_input_delta":
|
||||
if ev["index"] in index_map:
|
||||
index_map[ev["index"]]["parts"].append(ev["partial_json"])
|
||||
elif t == "done":
|
||||
cur_stop = ev.get("stop_reason", "end_turn")
|
||||
elif t == "error":
|
||||
return {"messages": messages, "final_text": "".join(text_parts),
|
||||
"stop_reason": "error", "iterations": iterations,
|
||||
"error": ev.get("message", "")}
|
||||
|
||||
final_text = "".join(text_parts)
|
||||
stop_reason = cur_stop
|
||||
|
||||
content = []
|
||||
if final_text:
|
||||
content.append({"type": "text", "text": final_text})
|
||||
for tu in tool_uses:
|
||||
raw = "".join(tu["parts"])
|
||||
try:
|
||||
parsed = json.loads(raw) if raw else {}
|
||||
except json.JSONDecodeError:
|
||||
parsed = {"_raw": raw}
|
||||
content.append({"type": "tool_use", "id": tu["id"], "name": tu["name"], "input": parsed})
|
||||
messages.append({"role": "assistant", "content": content})
|
||||
|
||||
if stop_reason != "tool_use" or not tool_uses:
|
||||
break
|
||||
|
||||
results = []
|
||||
for tu in tool_uses:
|
||||
raw = "".join(tu["parts"])
|
||||
try:
|
||||
parsed = json.loads(raw) if raw else {}
|
||||
except json.JSONDecodeError:
|
||||
parsed = {"_raw": raw}
|
||||
if tu["name"] not in dispatch:
|
||||
rc, is_err = f"Error: tool '{tu['name']}' not found", True
|
||||
else:
|
||||
try:
|
||||
rv = dispatch[tu["name"]](parsed)
|
||||
rc, is_err = (rv if isinstance(rv, str) else json.dumps(rv)), False
|
||||
except Exception as exc: # noqa: BLE001
|
||||
rc, is_err = f"Error executing {tu['name']}: {exc}", True
|
||||
r = {"type": "tool_result", "tool_use_id": tu["id"], "content": rc}
|
||||
if is_err:
|
||||
r["is_error"] = True
|
||||
results.append(r)
|
||||
messages.append({"role": "user", "content": results})
|
||||
else:
|
||||
stop_reason = "max_iters"
|
||||
|
||||
return {"messages": messages, "final_text": final_text,
|
||||
"stop_reason": stop_reason, "iterations": iterations}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ask_once(messages: list, model: str, system: str) -> str:
|
||||
"""Stream one assistant turn to stdout, append it to messages, return the text."""
|
||||
parts = []
|
||||
for ev in stream_messages(messages, model=model, system=system):
|
||||
if ev["type"] == "text":
|
||||
parts.append(ev["text"])
|
||||
sys.stdout.write(ev["text"])
|
||||
sys.stdout.flush()
|
||||
elif ev["type"] == "error":
|
||||
sys.stderr.write("\n[error] " + str(ev.get("message", "")) + "\n")
|
||||
return ""
|
||||
sys.stdout.write("\n")
|
||||
text = "".join(parts)
|
||||
if text:
|
||||
messages.append({"role": "assistant", "content": text})
|
||||
return text
|
||||
|
||||
|
||||
def _repl(model: str, system: str):
|
||||
print(f"llm · {model} · escribe tu mensaje (/model, /system, /reset, /exit)")
|
||||
messages = []
|
||||
while True:
|
||||
try:
|
||||
line = input("\n> ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print()
|
||||
return
|
||||
if not line:
|
||||
continue
|
||||
if line in ("/exit", "/quit", "/q"):
|
||||
return
|
||||
if line == "/reset":
|
||||
messages = []
|
||||
print("— conversacion nueva —")
|
||||
continue
|
||||
if line.startswith("/model"):
|
||||
parts = line.split(maxsplit=1)
|
||||
if len(parts) == 2:
|
||||
model = parts[1].strip()
|
||||
print(f"modelo: {model}")
|
||||
else:
|
||||
print(f"modelo actual: {model}")
|
||||
continue
|
||||
if line.startswith("/system"):
|
||||
parts = line.split(maxsplit=1)
|
||||
system = parts[1].strip() if len(parts) == 2 else ""
|
||||
print(f"system: {system or '(ninguno)'}")
|
||||
continue
|
||||
messages.append({"role": "user", "content": line})
|
||||
_ask_once(messages, model, system)
|
||||
|
||||
|
||||
def main(argv):
|
||||
model = DEFAULT_MODEL
|
||||
system = ""
|
||||
rest = []
|
||||
i = 0
|
||||
while i < len(argv):
|
||||
a = argv[i]
|
||||
if a in ("--model", "-m") and i + 1 < len(argv):
|
||||
model = argv[i + 1]; i += 2
|
||||
elif a in ("--system", "-s") and i + 1 < len(argv):
|
||||
system = argv[i + 1]; i += 2
|
||||
elif a in ("--help", "-h"):
|
||||
print(__doc__); return 0
|
||||
else:
|
||||
rest.append(a); i += 1
|
||||
|
||||
prompt = " ".join(rest).strip()
|
||||
if not prompt and not sys.stdin.isatty():
|
||||
prompt = sys.stdin.read().strip()
|
||||
|
||||
if prompt: # one-shot
|
||||
messages = [{"role": "user", "content": prompt}]
|
||||
_ask_once(messages, model, system)
|
||||
return 0
|
||||
|
||||
_repl(model, system) # interactive
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main(sys.argv[1:]))
|
||||
Reference in New Issue
Block a user