9f5e6791db
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries, jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
177 lines
5.4 KiB
Python
177 lines
5.4 KiB
Python
"""CRUD de datasets en Google BigQuery."""
|
|
|
|
from .client import BQClient
|
|
from google.cloud import bigquery
|
|
|
|
|
|
def bq_create_dataset(
|
|
client: BQClient,
|
|
dataset_id: str,
|
|
location: str = "US",
|
|
description: str = "",
|
|
labels: dict[str, str] | None = None,
|
|
default_table_expiration_ms: int = 0,
|
|
) -> dict:
|
|
"""Crea un dataset en BigQuery.
|
|
|
|
Args:
|
|
client: Cliente autenticado.
|
|
dataset_id: ID del dataset (solo el nombre, sin proyecto).
|
|
location: Ubicacion geografica (US, EU, us-central1, etc.).
|
|
description: Descripcion opcional.
|
|
labels: Labels key-value opcionales.
|
|
default_table_expiration_ms: Expiracion por defecto de tablas en ms. 0 = sin expiracion.
|
|
|
|
Returns:
|
|
Dict con: dataset_id, project, full_id, location, description, labels, created, modified,
|
|
default_table_expiration_ms.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.Conflict: Si el dataset ya existe (409).
|
|
|
|
Example:
|
|
>>> ds = bq_create_dataset(client, "analytics", location="EU", description="Data warehouse")
|
|
"""
|
|
ref = f"{client.project_id}.{dataset_id}"
|
|
ds = bigquery.Dataset(ref)
|
|
ds.location = location
|
|
if description:
|
|
ds.description = description
|
|
if labels:
|
|
ds.labels = labels
|
|
if default_table_expiration_ms > 0:
|
|
ds.default_table_expiration_ms = default_table_expiration_ms
|
|
created = client._client.create_dataset(ds)
|
|
return _dataset_to_dict(created)
|
|
|
|
|
|
def bq_get_dataset(client: BQClient, dataset_id: str) -> dict:
|
|
"""Obtiene los detalles de un dataset.
|
|
|
|
Args:
|
|
client: Cliente autenticado.
|
|
dataset_id: ID del dataset.
|
|
|
|
Returns:
|
|
Dict con: dataset_id, project, full_id, location, description, labels,
|
|
created, modified, default_table_expiration_ms.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si el dataset no existe (404).
|
|
|
|
Example:
|
|
>>> ds = bq_get_dataset(client, "analytics")
|
|
>>> print(ds["location"], ds["description"])
|
|
"""
|
|
ref = f"{client.project_id}.{dataset_id}"
|
|
ds = client._client.get_dataset(ref)
|
|
return _dataset_to_dict(ds)
|
|
|
|
|
|
def bq_list_datasets(client: BQClient) -> list[dict]:
|
|
"""Lista todos los datasets del proyecto.
|
|
|
|
Args:
|
|
client: Cliente autenticado.
|
|
|
|
Returns:
|
|
Lista de dicts con: dataset_id, project, full_id.
|
|
|
|
Example:
|
|
>>> datasets = bq_list_datasets(client)
|
|
>>> for ds in datasets:
|
|
... print(ds["dataset_id"], ds["full_id"])
|
|
"""
|
|
return [
|
|
{
|
|
"dataset_id": ds.dataset_id,
|
|
"project": ds.project,
|
|
"full_id": f"{ds.project}.{ds.dataset_id}",
|
|
}
|
|
for ds in client._client.list_datasets()
|
|
]
|
|
|
|
|
|
def bq_update_dataset(
|
|
client: BQClient,
|
|
dataset_id: str,
|
|
description: str | None = None,
|
|
labels: dict[str, str] | None = None,
|
|
default_table_expiration_ms: int | None = None,
|
|
) -> dict:
|
|
"""Actualiza campos de un dataset.
|
|
|
|
Solo se modifican los campos pasados (no-None).
|
|
|
|
Args:
|
|
client: Cliente autenticado.
|
|
dataset_id: ID del dataset.
|
|
description: Nueva descripcion (None = no cambiar).
|
|
labels: Nuevos labels (None = no cambiar).
|
|
default_table_expiration_ms: Nueva expiracion de tablas en ms (None = no cambiar).
|
|
|
|
Returns:
|
|
Dict con el dataset actualizado.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si el dataset no existe.
|
|
|
|
Example:
|
|
>>> bq_update_dataset(client, "analytics", description="Updated description")
|
|
"""
|
|
ref = f"{client.project_id}.{dataset_id}"
|
|
ds = client._client.get_dataset(ref)
|
|
fields = []
|
|
if description is not None:
|
|
ds.description = description
|
|
fields.append("description")
|
|
if labels is not None:
|
|
ds.labels = labels
|
|
fields.append("labels")
|
|
if default_table_expiration_ms is not None:
|
|
ds.default_table_expiration_ms = default_table_expiration_ms
|
|
fields.append("default_table_expiration_ms")
|
|
if not fields:
|
|
return _dataset_to_dict(ds)
|
|
updated = client._client.update_dataset(ds, fields)
|
|
return _dataset_to_dict(updated)
|
|
|
|
|
|
def bq_delete_dataset(
|
|
client: BQClient,
|
|
dataset_id: str,
|
|
delete_contents: bool = False,
|
|
) -> None:
|
|
"""Elimina un dataset.
|
|
|
|
Args:
|
|
client: Cliente autenticado.
|
|
dataset_id: ID del dataset.
|
|
delete_contents: Si True, elimina todas las tablas del dataset.
|
|
Si False y el dataset tiene tablas, falla.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si el dataset no existe.
|
|
google.api_core.exceptions.BadRequest: Si tiene tablas y delete_contents=False.
|
|
|
|
Example:
|
|
>>> bq_delete_dataset(client, "temp_analytics", delete_contents=True)
|
|
"""
|
|
ref = f"{client.project_id}.{dataset_id}"
|
|
client._client.delete_dataset(ref, delete_contents=delete_contents)
|
|
|
|
|
|
def _dataset_to_dict(ds) -> dict:
|
|
"""Convierte un objeto Dataset del SDK a dict plano."""
|
|
return {
|
|
"dataset_id": ds.dataset_id,
|
|
"project": ds.project,
|
|
"full_id": f"{ds.project}.{ds.dataset_id}",
|
|
"location": ds.location,
|
|
"description": ds.description or "",
|
|
"labels": dict(ds.labels) if ds.labels else {},
|
|
"created": ds.created.isoformat() if ds.created else None,
|
|
"modified": ds.modified.isoformat() if ds.modified else None,
|
|
"default_table_expiration_ms": ds.default_table_expiration_ms,
|
|
}
|