9f5e6791db
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries, jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
346 lines
12 KiB
Python
346 lines
12 KiB
Python
"""CRUD de tablas en Google BigQuery."""
|
|
|
|
from .client import BQClient
|
|
from google.cloud import bigquery
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers de serializacion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _schema_field_to_dict(field: bigquery.SchemaField) -> dict:
|
|
"""Convierte un SchemaField a dict serializable."""
|
|
result = {
|
|
"name": field.name,
|
|
"type": field.field_type,
|
|
"mode": field.mode,
|
|
"description": field.description or "",
|
|
}
|
|
if field.fields:
|
|
result["fields"] = [_schema_field_to_dict(f) for f in field.fields]
|
|
return result
|
|
|
|
|
|
def _schema_to_dicts(schema: list) -> list[dict]:
|
|
"""Convierte una lista de SchemaField a lista de dicts."""
|
|
return [_schema_field_to_dict(f) for f in schema]
|
|
|
|
|
|
def _dict_to_schema_field(d: dict) -> bigquery.SchemaField:
|
|
"""Convierte un dict a SchemaField."""
|
|
nested = [_dict_to_schema_field(f) for f in d.get("fields", [])]
|
|
return bigquery.SchemaField(
|
|
name=d["name"],
|
|
field_type=d.get("type", "STRING"),
|
|
mode=d.get("mode", "NULLABLE"),
|
|
description=d.get("description", ""),
|
|
fields=nested,
|
|
)
|
|
|
|
|
|
def _table_to_dict(table: bigquery.Table) -> dict:
|
|
"""Convierte un objeto Table del SDK a dict plano serializable."""
|
|
partitioning = None
|
|
if table.time_partitioning:
|
|
partitioning = {
|
|
"type": table.time_partitioning.type_,
|
|
"field": table.time_partitioning.field or "",
|
|
}
|
|
elif table.range_partitioning:
|
|
partitioning = {
|
|
"type": "RANGE",
|
|
"field": table.range_partitioning.field,
|
|
}
|
|
|
|
clustering = None
|
|
if table.clustering_fields:
|
|
clustering = list(table.clustering_fields)
|
|
|
|
return {
|
|
"table_id": table.table_id,
|
|
"dataset_id": table.dataset_id,
|
|
"project": table.project,
|
|
"full_id": table.full_table_id or f"{table.project}.{table.dataset_id}.{table.table_id}",
|
|
"schema": _schema_to_dicts(table.schema or []),
|
|
"num_rows": table.num_rows,
|
|
"num_bytes": table.num_bytes,
|
|
"created": table.created.isoformat() if table.created else None,
|
|
"modified": table.modified.isoformat() if table.modified else None,
|
|
"type": table.table_type or "TABLE",
|
|
"partitioning": partitioning,
|
|
"clustering": clustering,
|
|
"description": table.description or "",
|
|
"labels": dict(table.labels or {}),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Funciones CRUD
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def bq_create_table(
|
|
client: BQClient,
|
|
dataset_id: str,
|
|
table_id: str,
|
|
schema: list[dict],
|
|
partitioning: dict | None = None,
|
|
clustering: list[str] | None = None,
|
|
description: str = "",
|
|
labels: dict | None = None,
|
|
) -> dict:
|
|
"""Crea una tabla en BigQuery con schema, particionamiento y clustering opcionales.
|
|
|
|
Usa `client._client.create_table()` del SDK oficial.
|
|
|
|
Args:
|
|
client: Cliente autenticado BQClient.
|
|
dataset_id: ID del dataset donde crear la tabla.
|
|
table_id: ID (nombre) de la tabla a crear.
|
|
schema: Lista de dicts con definicion de columnas. Cada dict:
|
|
{"name": "col", "type": "STRING", "mode": "NULLABLE", "description": "..."}
|
|
Tipos validos: STRING, INTEGER, FLOAT, BOOLEAN, BYTES, DATE, DATETIME,
|
|
TIME, TIMESTAMP, RECORD, NUMERIC, BIGNUMERIC, JSON, GEOGRAPHY.
|
|
Modos: NULLABLE, REQUIRED, REPEATED.
|
|
partitioning: Dict de configuracion de particion o None. Ejemplo:
|
|
{"type": "DAY", "field": "created_at"}
|
|
Tipos: DAY, MONTH, YEAR, HOUR. Field vacio usa pseudo-columna _PARTITIONTIME.
|
|
clustering: Lista de columnas para clustering (max 4) o None.
|
|
Ejemplo: ["country", "status"]
|
|
description: Descripcion de la tabla.
|
|
labels: Etiquetas clave-valor para la tabla. Ejemplo: {"env": "prod"}.
|
|
|
|
Returns:
|
|
Dict con metadata de la tabla creada: table_id, dataset_id, project,
|
|
full_id, schema, num_rows, num_bytes, created, modified, type,
|
|
partitioning, clustering, description, labels.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.Conflict: Si la tabla ya existe.
|
|
google.api_core.exceptions.NotFound: Si el dataset no existe.
|
|
google.api_core.exceptions.BadRequest: Si el schema es invalido.
|
|
|
|
Example:
|
|
>>> table = bq_create_table(client, "mi_dataset", "ventas", [
|
|
... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"},
|
|
... {"name": "fecha", "type": "DATE", "mode": "NULLABLE"},
|
|
... {"name": "monto", "type": "FLOAT", "mode": "NULLABLE"},
|
|
... ], partitioning={"type": "MONTH", "field": "fecha"},
|
|
... clustering=["id"])
|
|
>>> print(table["full_id"])
|
|
"""
|
|
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
|
bq_schema = [_dict_to_schema_field(f) for f in schema]
|
|
|
|
table = bigquery.Table(table_ref, schema=bq_schema)
|
|
|
|
if partitioning:
|
|
table.time_partitioning = bigquery.TimePartitioning(
|
|
type_=partitioning.get("type", "DAY"),
|
|
field=partitioning.get("field") or None,
|
|
)
|
|
|
|
if clustering:
|
|
table.clustering_fields = clustering
|
|
|
|
if description:
|
|
table.description = description
|
|
|
|
if labels:
|
|
table.labels = labels
|
|
|
|
created = client._client.create_table(table)
|
|
return _table_to_dict(created)
|
|
|
|
|
|
def bq_get_table(client: BQClient, dataset_id: str, table_id: str) -> dict:
|
|
"""Obtiene los metadatos completos de una tabla BigQuery.
|
|
|
|
Usa `client._client.get_table()` del SDK oficial.
|
|
|
|
Args:
|
|
client: Cliente autenticado BQClient.
|
|
dataset_id: ID del dataset que contiene la tabla.
|
|
table_id: ID (nombre) de la tabla.
|
|
|
|
Returns:
|
|
Dict con: table_id, dataset_id, project, full_id, schema (lista de dicts),
|
|
num_rows, num_bytes, created (ISO 8601), modified (ISO 8601), type
|
|
(TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL), partitioning (dict o None),
|
|
clustering (lista de strings o None), description, labels.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
|
|
|
Example:
|
|
>>> tabla = bq_get_table(client, "mi_dataset", "ventas")
|
|
>>> print(tabla["num_rows"], tabla["schema"])
|
|
"""
|
|
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
|
table = client._client.get_table(table_ref)
|
|
return _table_to_dict(table)
|
|
|
|
|
|
def bq_list_tables(client: BQClient, dataset_id: str) -> list[dict]:
|
|
"""Lista todas las tablas de un dataset BigQuery.
|
|
|
|
Usa `client._client.list_tables()` del SDK oficial.
|
|
|
|
Args:
|
|
client: Cliente autenticado BQClient.
|
|
dataset_id: ID del dataset a listar.
|
|
|
|
Returns:
|
|
Lista de dicts resumidos, uno por tabla. Cada dict contiene:
|
|
table_id, full_id, type (TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL).
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si el dataset no existe.
|
|
|
|
Example:
|
|
>>> tablas = bq_list_tables(client, "mi_dataset")
|
|
>>> for t in tablas:
|
|
... print(t["table_id"], t["type"])
|
|
"""
|
|
dataset_ref = f"{client.project_id}.{dataset_id}"
|
|
tables = client._client.list_tables(dataset_ref)
|
|
return [
|
|
{
|
|
"table_id": t.table_id,
|
|
"full_id": f"{t.project}.{t.dataset_id}.{t.table_id}",
|
|
"type": t.table_type or "TABLE",
|
|
}
|
|
for t in tables
|
|
]
|
|
|
|
|
|
def bq_update_table(
|
|
client: BQClient,
|
|
dataset_id: str,
|
|
table_id: str,
|
|
schema: list[dict] | None = None,
|
|
description: str | None = None,
|
|
labels: dict | None = None,
|
|
) -> dict:
|
|
"""Actualiza metadatos de una tabla BigQuery.
|
|
|
|
Usa `client._client.update_table()` del SDK oficial. Solo modifica los
|
|
campos no-None. Para schema, BigQuery SOLO permite agregar columnas nuevas
|
|
al final — no se pueden eliminar ni modificar columnas existentes.
|
|
|
|
Args:
|
|
client: Cliente autenticado BQClient.
|
|
dataset_id: ID del dataset que contiene la tabla.
|
|
table_id: ID (nombre) de la tabla a actualizar.
|
|
schema: Lista de dicts con el schema completo nuevo (incluye columnas
|
|
existentes + nuevas). Solo se permiten adiciones. None = sin cambios.
|
|
description: Nueva descripcion de la tabla. None = sin cambios.
|
|
labels: Nuevas etiquetas clave-valor. None = sin cambios.
|
|
|
|
Returns:
|
|
Dict con la metadata actualizada de la tabla (misma estructura que bq_get_table).
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
|
google.api_core.exceptions.BadRequest: Si se intenta eliminar columnas.
|
|
|
|
Example:
|
|
>>> tabla = bq_update_table(client, "mi_dataset", "ventas",
|
|
... description="Tabla de ventas actualizada",
|
|
... labels={"env": "prod", "team": "data"})
|
|
>>> tabla = bq_update_table(client, "mi_dataset", "ventas",
|
|
... schema=[
|
|
... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"},
|
|
... {"name": "nueva_col", "type": "STRING", "mode": "NULLABLE"},
|
|
... ])
|
|
"""
|
|
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
|
table = client._client.get_table(table_ref)
|
|
|
|
fields_to_update = []
|
|
|
|
if schema is not None:
|
|
table.schema = [_dict_to_schema_field(f) for f in schema]
|
|
fields_to_update.append("schema")
|
|
|
|
if description is not None:
|
|
table.description = description
|
|
fields_to_update.append("description")
|
|
|
|
if labels is not None:
|
|
table.labels = labels
|
|
fields_to_update.append("labels")
|
|
|
|
if not fields_to_update:
|
|
return _table_to_dict(table)
|
|
|
|
updated = client._client.update_table(table, fields_to_update)
|
|
return _table_to_dict(updated)
|
|
|
|
|
|
def bq_delete_table(client: BQClient, dataset_id: str, table_id: str) -> None:
|
|
"""Elimina permanentemente una tabla de BigQuery.
|
|
|
|
Usa `client._client.delete_table()` del SDK oficial. IRREVERSIBLE — no hay
|
|
papelera de reciclaje. Considerar exportar datos antes de eliminar.
|
|
|
|
Args:
|
|
client: Cliente autenticado BQClient.
|
|
dataset_id: ID del dataset que contiene la tabla.
|
|
table_id: ID (nombre) de la tabla a eliminar.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
|
|
|
Example:
|
|
>>> bq_delete_table(client, "mi_dataset", "tabla_temporal")
|
|
"""
|
|
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
|
client._client.delete_table(table_ref)
|
|
|
|
|
|
def bq_preview_rows(
|
|
client: BQClient,
|
|
dataset_id: str,
|
|
table_id: str,
|
|
max_results: int = 10,
|
|
) -> dict:
|
|
"""Obtiene una muestra de filas de una tabla BigQuery sin ejecutar query.
|
|
|
|
Usa `client._client.list_rows()` del SDK oficial — no genera costes de
|
|
procesamiento de bytes (no es una query SQL). Ideal para vista previa rapida.
|
|
|
|
Args:
|
|
client: Cliente autenticado BQClient.
|
|
dataset_id: ID del dataset que contiene la tabla.
|
|
table_id: ID (nombre) de la tabla a previsualizar.
|
|
max_results: Numero maximo de filas a retornar. Default: 10.
|
|
|
|
Returns:
|
|
Dict con:
|
|
- columns: lista de strings con nombres de columnas
|
|
- rows: lista de listas con valores de cada fila
|
|
- total_rows: numero total de filas en la tabla (no en el preview)
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
|
|
|
Example:
|
|
>>> preview = bq_preview_rows(client, "mi_dataset", "ventas", max_results=5)
|
|
>>> print(preview["columns"])
|
|
>>> for row in preview["rows"]:
|
|
... print(row)
|
|
>>> print(f"Total en tabla: {preview['total_rows']}")
|
|
"""
|
|
table_ref = f"{client.project_id}.{dataset_id}.{table_id}"
|
|
table = client._client.get_table(table_ref)
|
|
rows_iter = client._client.list_rows(table, max_results=max_results)
|
|
|
|
columns = [f.name for f in rows_iter.schema]
|
|
rows = [list(row.values()) for row in rows_iter]
|
|
|
|
return {
|
|
"columns": columns,
|
|
"rows": rows,
|
|
"total_rows": table.num_rows,
|
|
}
|