Files
fn_registry/python/functions/bigquery/datasets.py
T
egutierrez 690e68a542 feat: add BigQuery Python functions and BQClient type
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries,
jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
2026-04-07 18:45:02 +02:00

177 lines
5.4 KiB
Python

"""CRUD de datasets en Google BigQuery."""
from .client import BQClient
from google.cloud import bigquery
def bq_create_dataset(
client: BQClient,
dataset_id: str,
location: str = "US",
description: str = "",
labels: dict[str, str] | None = None,
default_table_expiration_ms: int = 0,
) -> dict:
"""Crea un dataset en BigQuery.
Args:
client: Cliente autenticado.
dataset_id: ID del dataset (solo el nombre, sin proyecto).
location: Ubicacion geografica (US, EU, us-central1, etc.).
description: Descripcion opcional.
labels: Labels key-value opcionales.
default_table_expiration_ms: Expiracion por defecto de tablas en ms. 0 = sin expiracion.
Returns:
Dict con: dataset_id, project, full_id, location, description, labels, created, modified,
default_table_expiration_ms.
Raises:
google.api_core.exceptions.Conflict: Si el dataset ya existe (409).
Example:
>>> ds = bq_create_dataset(client, "analytics", location="EU", description="Data warehouse")
"""
ref = f"{client.project_id}.{dataset_id}"
ds = bigquery.Dataset(ref)
ds.location = location
if description:
ds.description = description
if labels:
ds.labels = labels
if default_table_expiration_ms > 0:
ds.default_table_expiration_ms = default_table_expiration_ms
created = client._client.create_dataset(ds)
return _dataset_to_dict(created)
def bq_get_dataset(client: BQClient, dataset_id: str) -> dict:
"""Obtiene los detalles de un dataset.
Args:
client: Cliente autenticado.
dataset_id: ID del dataset.
Returns:
Dict con: dataset_id, project, full_id, location, description, labels,
created, modified, default_table_expiration_ms.
Raises:
google.api_core.exceptions.NotFound: Si el dataset no existe (404).
Example:
>>> ds = bq_get_dataset(client, "analytics")
>>> print(ds["location"], ds["description"])
"""
ref = f"{client.project_id}.{dataset_id}"
ds = client._client.get_dataset(ref)
return _dataset_to_dict(ds)
def bq_list_datasets(client: BQClient) -> list[dict]:
"""Lista todos los datasets del proyecto.
Args:
client: Cliente autenticado.
Returns:
Lista de dicts con: dataset_id, project, full_id.
Example:
>>> datasets = bq_list_datasets(client)
>>> for ds in datasets:
... print(ds["dataset_id"], ds["full_id"])
"""
return [
{
"dataset_id": ds.dataset_id,
"project": ds.project,
"full_id": f"{ds.project}.{ds.dataset_id}",
}
for ds in client._client.list_datasets()
]
def bq_update_dataset(
client: BQClient,
dataset_id: str,
description: str | None = None,
labels: dict[str, str] | None = None,
default_table_expiration_ms: int | None = None,
) -> dict:
"""Actualiza campos de un dataset.
Solo se modifican los campos pasados (no-None).
Args:
client: Cliente autenticado.
dataset_id: ID del dataset.
description: Nueva descripcion (None = no cambiar).
labels: Nuevos labels (None = no cambiar).
default_table_expiration_ms: Nueva expiracion de tablas en ms (None = no cambiar).
Returns:
Dict con el dataset actualizado.
Raises:
google.api_core.exceptions.NotFound: Si el dataset no existe.
Example:
>>> bq_update_dataset(client, "analytics", description="Updated description")
"""
ref = f"{client.project_id}.{dataset_id}"
ds = client._client.get_dataset(ref)
fields = []
if description is not None:
ds.description = description
fields.append("description")
if labels is not None:
ds.labels = labels
fields.append("labels")
if default_table_expiration_ms is not None:
ds.default_table_expiration_ms = default_table_expiration_ms
fields.append("default_table_expiration_ms")
if not fields:
return _dataset_to_dict(ds)
updated = client._client.update_dataset(ds, fields)
return _dataset_to_dict(updated)
def bq_delete_dataset(
client: BQClient,
dataset_id: str,
delete_contents: bool = False,
) -> None:
"""Elimina un dataset.
Args:
client: Cliente autenticado.
dataset_id: ID del dataset.
delete_contents: Si True, elimina todas las tablas del dataset.
Si False y el dataset tiene tablas, falla.
Raises:
google.api_core.exceptions.NotFound: Si el dataset no existe.
google.api_core.exceptions.BadRequest: Si tiene tablas y delete_contents=False.
Example:
>>> bq_delete_dataset(client, "temp_analytics", delete_contents=True)
"""
ref = f"{client.project_id}.{dataset_id}"
client._client.delete_dataset(ref, delete_contents=delete_contents)
def _dataset_to_dict(ds) -> dict:
"""Convierte un objeto Dataset del SDK a dict plano."""
return {
"dataset_id": ds.dataset_id,
"project": ds.project,
"full_id": f"{ds.project}.{ds.dataset_id}",
"location": ds.location,
"description": ds.description or "",
"labels": dict(ds.labels) if ds.labels else {},
"created": ds.created.isoformat() if ds.created else None,
"modified": ds.modified.isoformat() if ds.modified else None,
"default_table_expiration_ms": ds.default_table_expiration_ms,
}