Files
fn_registry/python/functions/bigquery/tables.py
T
egutierrez 9f5e6791db feat: add BigQuery Python functions and BQClient type
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries,
jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
2026-04-07 18:45:02 +02:00

346 lines
12 KiB
Python

"""CRUD de tablas en Google BigQuery."""
from .client import BQClient
from google.cloud import bigquery
# ---------------------------------------------------------------------------
# Helpers de serializacion
# ---------------------------------------------------------------------------
def _schema_field_to_dict(field: bigquery.SchemaField) -> dict:
"""Convierte un SchemaField a dict serializable."""
result = {
"name": field.name,
"type": field.field_type,
"mode": field.mode,
"description": field.description or "",
}
if field.fields:
result["fields"] = [_schema_field_to_dict(f) for f in field.fields]
return result
def _schema_to_dicts(schema: list) -> list[dict]:
"""Convierte una lista de SchemaField a lista de dicts."""
return [_schema_field_to_dict(f) for f in schema]
def _dict_to_schema_field(d: dict) -> bigquery.SchemaField:
"""Convierte un dict a SchemaField."""
nested = [_dict_to_schema_field(f) for f in d.get("fields", [])]
return bigquery.SchemaField(
name=d["name"],
field_type=d.get("type", "STRING"),
mode=d.get("mode", "NULLABLE"),
description=d.get("description", ""),
fields=nested,
)
def _table_to_dict(table: bigquery.Table) -> dict:
"""Convierte un objeto Table del SDK a dict plano serializable."""
partitioning = None
if table.time_partitioning:
partitioning = {
"type": table.time_partitioning.type_,
"field": table.time_partitioning.field or "",
}
elif table.range_partitioning:
partitioning = {
"type": "RANGE",
"field": table.range_partitioning.field,
}
clustering = None
if table.clustering_fields:
clustering = list(table.clustering_fields)
return {
"table_id": table.table_id,
"dataset_id": table.dataset_id,
"project": table.project,
"full_id": table.full_table_id or f"{table.project}.{table.dataset_id}.{table.table_id}",
"schema": _schema_to_dicts(table.schema or []),
"num_rows": table.num_rows,
"num_bytes": table.num_bytes,
"created": table.created.isoformat() if table.created else None,
"modified": table.modified.isoformat() if table.modified else None,
"type": table.table_type or "TABLE",
"partitioning": partitioning,
"clustering": clustering,
"description": table.description or "",
"labels": dict(table.labels or {}),
}
# ---------------------------------------------------------------------------
# Funciones CRUD
# ---------------------------------------------------------------------------
def bq_create_table(
client: BQClient,
dataset_id: str,
table_id: str,
schema: list[dict],
partitioning: dict | None = None,
clustering: list[str] | None = None,
description: str = "",
labels: dict | None = None,
) -> dict:
"""Crea una tabla en BigQuery con schema, particionamiento y clustering opcionales.
Usa `client._client.create_table()` del SDK oficial.
Args:
client: Cliente autenticado BQClient.
dataset_id: ID del dataset donde crear la tabla.
table_id: ID (nombre) de la tabla a crear.
schema: Lista de dicts con definicion de columnas. Cada dict:
{"name": "col", "type": "STRING", "mode": "NULLABLE", "description": "..."}
Tipos validos: STRING, INTEGER, FLOAT, BOOLEAN, BYTES, DATE, DATETIME,
TIME, TIMESTAMP, RECORD, NUMERIC, BIGNUMERIC, JSON, GEOGRAPHY.
Modos: NULLABLE, REQUIRED, REPEATED.
partitioning: Dict de configuracion de particion o None. Ejemplo:
{"type": "DAY", "field": "created_at"}
Tipos: DAY, MONTH, YEAR, HOUR. Field vacio usa pseudo-columna _PARTITIONTIME.
clustering: Lista de columnas para clustering (max 4) o None.
Ejemplo: ["country", "status"]
description: Descripcion de la tabla.
labels: Etiquetas clave-valor para la tabla. Ejemplo: {"env": "prod"}.
Returns:
Dict con metadata de la tabla creada: table_id, dataset_id, project,
full_id, schema, num_rows, num_bytes, created, modified, type,
partitioning, clustering, description, labels.
Raises:
google.api_core.exceptions.Conflict: Si la tabla ya existe.
google.api_core.exceptions.NotFound: Si el dataset no existe.
google.api_core.exceptions.BadRequest: Si el schema es invalido.
Example:
>>> table = bq_create_table(client, "mi_dataset", "ventas", [
... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"},
... {"name": "fecha", "type": "DATE", "mode": "NULLABLE"},
... {"name": "monto", "type": "FLOAT", "mode": "NULLABLE"},
... ], partitioning={"type": "MONTH", "field": "fecha"},
... clustering=["id"])
>>> print(table["full_id"])
"""
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
bq_schema = [_dict_to_schema_field(f) for f in schema]
table = bigquery.Table(table_ref, schema=bq_schema)
if partitioning:
table.time_partitioning = bigquery.TimePartitioning(
type_=partitioning.get("type", "DAY"),
field=partitioning.get("field") or None,
)
if clustering:
table.clustering_fields = clustering
if description:
table.description = description
if labels:
table.labels = labels
created = client._client.create_table(table)
return _table_to_dict(created)
def bq_get_table(client: BQClient, dataset_id: str, table_id: str) -> dict:
"""Obtiene los metadatos completos de una tabla BigQuery.
Usa `client._client.get_table()` del SDK oficial.
Args:
client: Cliente autenticado BQClient.
dataset_id: ID del dataset que contiene la tabla.
table_id: ID (nombre) de la tabla.
Returns:
Dict con: table_id, dataset_id, project, full_id, schema (lista de dicts),
num_rows, num_bytes, created (ISO 8601), modified (ISO 8601), type
(TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL), partitioning (dict o None),
clustering (lista de strings o None), description, labels.
Raises:
google.api_core.exceptions.NotFound: Si la tabla no existe.
Example:
>>> tabla = bq_get_table(client, "mi_dataset", "ventas")
>>> print(tabla["num_rows"], tabla["schema"])
"""
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
table = client._client.get_table(table_ref)
return _table_to_dict(table)
def bq_list_tables(client: BQClient, dataset_id: str) -> list[dict]:
"""Lista todas las tablas de un dataset BigQuery.
Usa `client._client.list_tables()` del SDK oficial.
Args:
client: Cliente autenticado BQClient.
dataset_id: ID del dataset a listar.
Returns:
Lista de dicts resumidos, uno por tabla. Cada dict contiene:
table_id, full_id, type (TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL).
Raises:
google.api_core.exceptions.NotFound: Si el dataset no existe.
Example:
>>> tablas = bq_list_tables(client, "mi_dataset")
>>> for t in tablas:
... print(t["table_id"], t["type"])
"""
dataset_ref = f"{client.project_id}.{dataset_id}"
tables = client._client.list_tables(dataset_ref)
return [
{
"table_id": t.table_id,
"full_id": f"{t.project}.{t.dataset_id}.{t.table_id}",
"type": t.table_type or "TABLE",
}
for t in tables
]
def bq_update_table(
client: BQClient,
dataset_id: str,
table_id: str,
schema: list[dict] | None = None,
description: str | None = None,
labels: dict | None = None,
) -> dict:
"""Actualiza metadatos de una tabla BigQuery.
Usa `client._client.update_table()` del SDK oficial. Solo modifica los
campos no-None. Para schema, BigQuery SOLO permite agregar columnas nuevas
al final — no se pueden eliminar ni modificar columnas existentes.
Args:
client: Cliente autenticado BQClient.
dataset_id: ID del dataset que contiene la tabla.
table_id: ID (nombre) de la tabla a actualizar.
schema: Lista de dicts con el schema completo nuevo (incluye columnas
existentes + nuevas). Solo se permiten adiciones. None = sin cambios.
description: Nueva descripcion de la tabla. None = sin cambios.
labels: Nuevas etiquetas clave-valor. None = sin cambios.
Returns:
Dict con la metadata actualizada de la tabla (misma estructura que bq_get_table).
Raises:
google.api_core.exceptions.NotFound: Si la tabla no existe.
google.api_core.exceptions.BadRequest: Si se intenta eliminar columnas.
Example:
>>> tabla = bq_update_table(client, "mi_dataset", "ventas",
... description="Tabla de ventas actualizada",
... labels={"env": "prod", "team": "data"})
>>> tabla = bq_update_table(client, "mi_dataset", "ventas",
... schema=[
... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"},
... {"name": "nueva_col", "type": "STRING", "mode": "NULLABLE"},
... ])
"""
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
table = client._client.get_table(table_ref)
fields_to_update = []
if schema is not None:
table.schema = [_dict_to_schema_field(f) for f in schema]
fields_to_update.append("schema")
if description is not None:
table.description = description
fields_to_update.append("description")
if labels is not None:
table.labels = labels
fields_to_update.append("labels")
if not fields_to_update:
return _table_to_dict(table)
updated = client._client.update_table(table, fields_to_update)
return _table_to_dict(updated)
def bq_delete_table(client: BQClient, dataset_id: str, table_id: str) -> None:
"""Elimina permanentemente una tabla de BigQuery.
Usa `client._client.delete_table()` del SDK oficial. IRREVERSIBLE — no hay
papelera de reciclaje. Considerar exportar datos antes de eliminar.
Args:
client: Cliente autenticado BQClient.
dataset_id: ID del dataset que contiene la tabla.
table_id: ID (nombre) de la tabla a eliminar.
Raises:
google.api_core.exceptions.NotFound: Si la tabla no existe.
Example:
>>> bq_delete_table(client, "mi_dataset", "tabla_temporal")
"""
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
client._client.delete_table(table_ref)
def bq_preview_rows(
client: BQClient,
dataset_id: str,
table_id: str,
max_results: int = 10,
) -> dict:
"""Obtiene una muestra de filas de una tabla BigQuery sin ejecutar query.
Usa `client._client.list_rows()` del SDK oficial — no genera costes de
procesamiento de bytes (no es una query SQL). Ideal para vista previa rapida.
Args:
client: Cliente autenticado BQClient.
dataset_id: ID del dataset que contiene la tabla.
table_id: ID (nombre) de la tabla a previsualizar.
max_results: Numero maximo de filas a retornar. Default: 10.
Returns:
Dict con:
- columns: lista de strings con nombres de columnas
- rows: lista de listas con valores de cada fila
- total_rows: numero total de filas en la tabla (no en el preview)
Raises:
google.api_core.exceptions.NotFound: Si la tabla no existe.
Example:
>>> preview = bq_preview_rows(client, "mi_dataset", "ventas", max_results=5)
>>> print(preview["columns"])
>>> for row in preview["rows"]:
... print(row)
>>> print(f"Total en tabla: {preview['total_rows']}")
"""
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
table = client._client.get_table(table_ref)
rows_iter = client._client.list_rows(table, max_results=max_results)
columns = [f.name for f in rows_iter.schema]
rows = [list(row.values()) for row in rows_iter]
return {
"columns": columns,
"rows": rows,
"total_rows": table.num_rows,
}