Files
fn_registry/python/functions/infra/clickhouse_insert_rows.py
T
egutierrez fce88032ca chore: auto-commit (43 archivos)
- .mcp.json
- bash/functions/infra/write_mcp_jupyter_config.md
- bash/functions/infra/write_mcp_jupyter_config.sh
- cpp/CMakeLists.txt
- cpp/apps/chart_demo
- cpp/apps/shaders_lab
- cpp/functions/gfx/gl_framebuffer.cpp
- cpp/functions/gfx/gl_framebuffer.h
- cpp/functions/gfx/gl_framebuffer.md
- cpp/functions/gfx/mesh_gpu.md
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 17:28:47 +02:00

75 lines
2.5 KiB
Python

"""Insert rows into ClickHouse via the HTTP interface (port 8123)."""
import json
import urllib.error
import urllib.parse
import urllib.request
def clickhouse_insert_rows(
base_url: str,
table: str,
rows: list[dict],
*,
user: str = "default",
password: str = "",
database: str = "analytics",
timeout: float = 30.0,
) -> int:
"""Insert a list of dicts into a ClickHouse table using JSONEachRow format.
Args:
base_url: ClickHouse HTTP base URL without trailing slash,
e.g. "http://127.0.0.1:18123".
table: Fully-qualified or bare table name, e.g. "analytics.gnula_movies".
rows: List of dicts to insert. Each dict becomes one JSON line.
user: ClickHouse username (default "default").
password: ClickHouse password (default empty string).
database: Target database sent as query param (default "analytics").
timeout: Socket timeout in seconds (default 30.0).
Returns:
Number of rows inserted (len(rows)). Returns 0 if rows is empty
without contacting the server.
Raises:
ValueError: On non-200 HTTP response, with status code and first
500 chars of the response body.
urllib.error.URLError: On network-level errors (connection refused,
DNS failure, timeout).
"""
if not rows:
return 0
query = f"INSERT INTO {table} FORMAT JSONEachRow"
params = urllib.parse.urlencode({"database": database, "query": query})
url = f"{base_url}/?{params}"
body = "\n".join(json.dumps(row, ensure_ascii=False) for row in rows)
body_bytes = body.encode("utf-8")
req = urllib.request.Request(
url,
data=body_bytes,
method="POST",
headers={
"Content-Type": "text/plain",
"X-ClickHouse-User": user,
"X-ClickHouse-Key": password,
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status != 200:
body_preview = resp.read(500).decode("utf-8", errors="replace")
raise ValueError(
f"ClickHouse insert failed: HTTP {resp.status}{body_preview}"
)
return len(rows)
except urllib.error.HTTPError as exc:
body_preview = exc.read(500).decode("utf-8", errors="replace")
raise ValueError(
f"ClickHouse insert failed: HTTP {exc.code}{body_preview}"
) from exc