feat: add BigQuery Python functions and BQClient type
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries, jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
This commit is contained in:
@@ -0,0 +1,345 @@
|
||||
"""CRUD de tablas en Google BigQuery."""
|
||||
|
||||
from .client import BQClient
|
||||
from google.cloud import bigquery
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers de serializacion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _schema_field_to_dict(field: bigquery.SchemaField) -> dict:
|
||||
"""Convierte un SchemaField a dict serializable."""
|
||||
result = {
|
||||
"name": field.name,
|
||||
"type": field.field_type,
|
||||
"mode": field.mode,
|
||||
"description": field.description or "",
|
||||
}
|
||||
if field.fields:
|
||||
result["fields"] = [_schema_field_to_dict(f) for f in field.fields]
|
||||
return result
|
||||
|
||||
|
||||
def _schema_to_dicts(schema: list) -> list[dict]:
|
||||
"""Convierte una lista de SchemaField a lista de dicts."""
|
||||
return [_schema_field_to_dict(f) for f in schema]
|
||||
|
||||
|
||||
def _dict_to_schema_field(d: dict) -> bigquery.SchemaField:
|
||||
"""Convierte un dict a SchemaField."""
|
||||
nested = [_dict_to_schema_field(f) for f in d.get("fields", [])]
|
||||
return bigquery.SchemaField(
|
||||
name=d["name"],
|
||||
field_type=d.get("type", "STRING"),
|
||||
mode=d.get("mode", "NULLABLE"),
|
||||
description=d.get("description", ""),
|
||||
fields=nested,
|
||||
)
|
||||
|
||||
|
||||
def _table_to_dict(table: bigquery.Table) -> dict:
|
||||
"""Convierte un objeto Table del SDK a dict plano serializable."""
|
||||
partitioning = None
|
||||
if table.time_partitioning:
|
||||
partitioning = {
|
||||
"type": table.time_partitioning.type_,
|
||||
"field": table.time_partitioning.field or "",
|
||||
}
|
||||
elif table.range_partitioning:
|
||||
partitioning = {
|
||||
"type": "RANGE",
|
||||
"field": table.range_partitioning.field,
|
||||
}
|
||||
|
||||
clustering = None
|
||||
if table.clustering_fields:
|
||||
clustering = list(table.clustering_fields)
|
||||
|
||||
return {
|
||||
"table_id": table.table_id,
|
||||
"dataset_id": table.dataset_id,
|
||||
"project": table.project,
|
||||
"full_id": table.full_table_id or f"{table.project}.{table.dataset_id}.{table.table_id}",
|
||||
"schema": _schema_to_dicts(table.schema or []),
|
||||
"num_rows": table.num_rows,
|
||||
"num_bytes": table.num_bytes,
|
||||
"created": table.created.isoformat() if table.created else None,
|
||||
"modified": table.modified.isoformat() if table.modified else None,
|
||||
"type": table.table_type or "TABLE",
|
||||
"partitioning": partitioning,
|
||||
"clustering": clustering,
|
||||
"description": table.description or "",
|
||||
"labels": dict(table.labels or {}),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Funciones CRUD
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def bq_create_table(
|
||||
client: BQClient,
|
||||
dataset_id: str,
|
||||
table_id: str,
|
||||
schema: list[dict],
|
||||
partitioning: dict | None = None,
|
||||
clustering: list[str] | None = None,
|
||||
description: str = "",
|
||||
labels: dict | None = None,
|
||||
) -> dict:
|
||||
"""Crea una tabla en BigQuery con schema, particionamiento y clustering opcionales.
|
||||
|
||||
Usa `client._client.create_table()` del SDK oficial.
|
||||
|
||||
Args:
|
||||
client: Cliente autenticado BQClient.
|
||||
dataset_id: ID del dataset donde crear la tabla.
|
||||
table_id: ID (nombre) de la tabla a crear.
|
||||
schema: Lista de dicts con definicion de columnas. Cada dict:
|
||||
{"name": "col", "type": "STRING", "mode": "NULLABLE", "description": "..."}
|
||||
Tipos validos: STRING, INTEGER, FLOAT, BOOLEAN, BYTES, DATE, DATETIME,
|
||||
TIME, TIMESTAMP, RECORD, NUMERIC, BIGNUMERIC, JSON, GEOGRAPHY.
|
||||
Modos: NULLABLE, REQUIRED, REPEATED.
|
||||
partitioning: Dict de configuracion de particion o None. Ejemplo:
|
||||
{"type": "DAY", "field": "created_at"}
|
||||
Tipos: DAY, MONTH, YEAR, HOUR. Field vacio usa pseudo-columna _PARTITIONTIME.
|
||||
clustering: Lista de columnas para clustering (max 4) o None.
|
||||
Ejemplo: ["country", "status"]
|
||||
description: Descripcion de la tabla.
|
||||
labels: Etiquetas clave-valor para la tabla. Ejemplo: {"env": "prod"}.
|
||||
|
||||
Returns:
|
||||
Dict con metadata de la tabla creada: table_id, dataset_id, project,
|
||||
full_id, schema, num_rows, num_bytes, created, modified, type,
|
||||
partitioning, clustering, description, labels.
|
||||
|
||||
Raises:
|
||||
google.api_core.exceptions.Conflict: Si la tabla ya existe.
|
||||
google.api_core.exceptions.NotFound: Si el dataset no existe.
|
||||
google.api_core.exceptions.BadRequest: Si el schema es invalido.
|
||||
|
||||
Example:
|
||||
>>> table = bq_create_table(client, "mi_dataset", "ventas", [
|
||||
... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"},
|
||||
... {"name": "fecha", "type": "DATE", "mode": "NULLABLE"},
|
||||
... {"name": "monto", "type": "FLOAT", "mode": "NULLABLE"},
|
||||
... ], partitioning={"type": "MONTH", "field": "fecha"},
|
||||
... clustering=["id"])
|
||||
>>> print(table["full_id"])
|
||||
"""
|
||||
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
||||
bq_schema = [_dict_to_schema_field(f) for f in schema]
|
||||
|
||||
table = bigquery.Table(table_ref, schema=bq_schema)
|
||||
|
||||
if partitioning:
|
||||
table.time_partitioning = bigquery.TimePartitioning(
|
||||
type_=partitioning.get("type", "DAY"),
|
||||
field=partitioning.get("field") or None,
|
||||
)
|
||||
|
||||
if clustering:
|
||||
table.clustering_fields = clustering
|
||||
|
||||
if description:
|
||||
table.description = description
|
||||
|
||||
if labels:
|
||||
table.labels = labels
|
||||
|
||||
created = client._client.create_table(table)
|
||||
return _table_to_dict(created)
|
||||
|
||||
|
||||
def bq_get_table(client: BQClient, dataset_id: str, table_id: str) -> dict:
|
||||
"""Obtiene los metadatos completos de una tabla BigQuery.
|
||||
|
||||
Usa `client._client.get_table()` del SDK oficial.
|
||||
|
||||
Args:
|
||||
client: Cliente autenticado BQClient.
|
||||
dataset_id: ID del dataset que contiene la tabla.
|
||||
table_id: ID (nombre) de la tabla.
|
||||
|
||||
Returns:
|
||||
Dict con: table_id, dataset_id, project, full_id, schema (lista de dicts),
|
||||
num_rows, num_bytes, created (ISO 8601), modified (ISO 8601), type
|
||||
(TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL), partitioning (dict o None),
|
||||
clustering (lista de strings o None), description, labels.
|
||||
|
||||
Raises:
|
||||
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
||||
|
||||
Example:
|
||||
>>> tabla = bq_get_table(client, "mi_dataset", "ventas")
|
||||
>>> print(tabla["num_rows"], tabla["schema"])
|
||||
"""
|
||||
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
||||
table = client._client.get_table(table_ref)
|
||||
return _table_to_dict(table)
|
||||
|
||||
|
||||
def bq_list_tables(client: BQClient, dataset_id: str) -> list[dict]:
|
||||
"""Lista todas las tablas de un dataset BigQuery.
|
||||
|
||||
Usa `client._client.list_tables()` del SDK oficial.
|
||||
|
||||
Args:
|
||||
client: Cliente autenticado BQClient.
|
||||
dataset_id: ID del dataset a listar.
|
||||
|
||||
Returns:
|
||||
Lista de dicts resumidos, uno por tabla. Cada dict contiene:
|
||||
table_id, full_id, type (TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL).
|
||||
|
||||
Raises:
|
||||
google.api_core.exceptions.NotFound: Si el dataset no existe.
|
||||
|
||||
Example:
|
||||
>>> tablas = bq_list_tables(client, "mi_dataset")
|
||||
>>> for t in tablas:
|
||||
... print(t["table_id"], t["type"])
|
||||
"""
|
||||
dataset_ref = f"{client.project_id}.{dataset_id}"
|
||||
tables = client._client.list_tables(dataset_ref)
|
||||
return [
|
||||
{
|
||||
"table_id": t.table_id,
|
||||
"full_id": f"{t.project}.{t.dataset_id}.{t.table_id}",
|
||||
"type": t.table_type or "TABLE",
|
||||
}
|
||||
for t in tables
|
||||
]
|
||||
|
||||
|
||||
def bq_update_table(
|
||||
client: BQClient,
|
||||
dataset_id: str,
|
||||
table_id: str,
|
||||
schema: list[dict] | None = None,
|
||||
description: str | None = None,
|
||||
labels: dict | None = None,
|
||||
) -> dict:
|
||||
"""Actualiza metadatos de una tabla BigQuery.
|
||||
|
||||
Usa `client._client.update_table()` del SDK oficial. Solo modifica los
|
||||
campos no-None. Para schema, BigQuery SOLO permite agregar columnas nuevas
|
||||
al final — no se pueden eliminar ni modificar columnas existentes.
|
||||
|
||||
Args:
|
||||
client: Cliente autenticado BQClient.
|
||||
dataset_id: ID del dataset que contiene la tabla.
|
||||
table_id: ID (nombre) de la tabla a actualizar.
|
||||
schema: Lista de dicts con el schema completo nuevo (incluye columnas
|
||||
existentes + nuevas). Solo se permiten adiciones. None = sin cambios.
|
||||
description: Nueva descripcion de la tabla. None = sin cambios.
|
||||
labels: Nuevas etiquetas clave-valor. None = sin cambios.
|
||||
|
||||
Returns:
|
||||
Dict con la metadata actualizada de la tabla (misma estructura que bq_get_table).
|
||||
|
||||
Raises:
|
||||
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
||||
google.api_core.exceptions.BadRequest: Si se intenta eliminar columnas.
|
||||
|
||||
Example:
|
||||
>>> tabla = bq_update_table(client, "mi_dataset", "ventas",
|
||||
... description="Tabla de ventas actualizada",
|
||||
... labels={"env": "prod", "team": "data"})
|
||||
>>> tabla = bq_update_table(client, "mi_dataset", "ventas",
|
||||
... schema=[
|
||||
... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"},
|
||||
... {"name": "nueva_col", "type": "STRING", "mode": "NULLABLE"},
|
||||
... ])
|
||||
"""
|
||||
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
||||
table = client._client.get_table(table_ref)
|
||||
|
||||
fields_to_update = []
|
||||
|
||||
if schema is not None:
|
||||
table.schema = [_dict_to_schema_field(f) for f in schema]
|
||||
fields_to_update.append("schema")
|
||||
|
||||
if description is not None:
|
||||
table.description = description
|
||||
fields_to_update.append("description")
|
||||
|
||||
if labels is not None:
|
||||
table.labels = labels
|
||||
fields_to_update.append("labels")
|
||||
|
||||
if not fields_to_update:
|
||||
return _table_to_dict(table)
|
||||
|
||||
updated = client._client.update_table(table, fields_to_update)
|
||||
return _table_to_dict(updated)
|
||||
|
||||
|
||||
def bq_delete_table(client: BQClient, dataset_id: str, table_id: str) -> None:
|
||||
"""Elimina permanentemente una tabla de BigQuery.
|
||||
|
||||
Usa `client._client.delete_table()` del SDK oficial. IRREVERSIBLE — no hay
|
||||
papelera de reciclaje. Considerar exportar datos antes de eliminar.
|
||||
|
||||
Args:
|
||||
client: Cliente autenticado BQClient.
|
||||
dataset_id: ID del dataset que contiene la tabla.
|
||||
table_id: ID (nombre) de la tabla a eliminar.
|
||||
|
||||
Raises:
|
||||
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
||||
|
||||
Example:
|
||||
>>> bq_delete_table(client, "mi_dataset", "tabla_temporal")
|
||||
"""
|
||||
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
||||
client._client.delete_table(table_ref)
|
||||
|
||||
|
||||
def bq_preview_rows(
|
||||
client: BQClient,
|
||||
dataset_id: str,
|
||||
table_id: str,
|
||||
max_results: int = 10,
|
||||
) -> dict:
|
||||
"""Obtiene una muestra de filas de una tabla BigQuery sin ejecutar query.
|
||||
|
||||
Usa `client._client.list_rows()` del SDK oficial — no genera costes de
|
||||
procesamiento de bytes (no es una query SQL). Ideal para vista previa rapida.
|
||||
|
||||
Args:
|
||||
client: Cliente autenticado BQClient.
|
||||
dataset_id: ID del dataset que contiene la tabla.
|
||||
table_id: ID (nombre) de la tabla a previsualizar.
|
||||
max_results: Numero maximo de filas a retornar. Default: 10.
|
||||
|
||||
Returns:
|
||||
Dict con:
|
||||
- columns: lista de strings con nombres de columnas
|
||||
- rows: lista de listas con valores de cada fila
|
||||
- total_rows: numero total de filas en la tabla (no en el preview)
|
||||
|
||||
Raises:
|
||||
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
||||
|
||||
Example:
|
||||
>>> preview = bq_preview_rows(client, "mi_dataset", "ventas", max_results=5)
|
||||
>>> print(preview["columns"])
|
||||
>>> for row in preview["rows"]:
|
||||
... print(row)
|
||||
>>> print(f"Total en tabla: {preview['total_rows']}")
|
||||
"""
|
||||
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
||||
table = client._client.get_table(table_ref)
|
||||
rows_iter = client._client.list_rows(table, max_results=max_results)
|
||||
|
||||
columns = [f.name for f in rows_iter.schema]
|
||||
rows = [list(row.values()) for row in rows_iter]
|
||||
|
||||
return {
|
||||
"columns": columns,
|
||||
"rows": rows,
|
||||
"total_rows": table.num_rows,
|
||||
}
|
||||
Reference in New Issue
Block a user