"""CRUD de tablas en Google BigQuery.""" from .client import BQClient from google.cloud import bigquery # --------------------------------------------------------------------------- # Helpers de serializacion # --------------------------------------------------------------------------- def _schema_field_to_dict(field: bigquery.SchemaField) -> dict: """Convierte un SchemaField a dict serializable.""" result = { "name": field.name, "type": field.field_type, "mode": field.mode, "description": field.description or "", } if field.fields: result["fields"] = [_schema_field_to_dict(f) for f in field.fields] return result def _schema_to_dicts(schema: list) -> list[dict]: """Convierte una lista de SchemaField a lista de dicts.""" return [_schema_field_to_dict(f) for f in schema] def _dict_to_schema_field(d: dict) -> bigquery.SchemaField: """Convierte un dict a SchemaField.""" nested = [_dict_to_schema_field(f) for f in d.get("fields", [])] return bigquery.SchemaField( name=d["name"], field_type=d.get("type", "STRING"), mode=d.get("mode", "NULLABLE"), description=d.get("description", ""), fields=nested, ) def _table_to_dict(table: bigquery.Table) -> dict: """Convierte un objeto Table del SDK a dict plano serializable.""" partitioning = None if table.time_partitioning: partitioning = { "type": table.time_partitioning.type_, "field": table.time_partitioning.field or "", } elif table.range_partitioning: partitioning = { "type": "RANGE", "field": table.range_partitioning.field, } clustering = None if table.clustering_fields: clustering = list(table.clustering_fields) return { "table_id": table.table_id, "dataset_id": table.dataset_id, "project": table.project, "full_id": table.full_table_id or f"{table.project}.{table.dataset_id}.{table.table_id}", "schema": _schema_to_dicts(table.schema or []), "num_rows": table.num_rows, "num_bytes": table.num_bytes, "created": table.created.isoformat() if table.created else None, "modified": table.modified.isoformat() if table.modified else None, "type": table.table_type or "TABLE", "partitioning": partitioning, "clustering": clustering, "description": table.description or "", "labels": dict(table.labels or {}), } # --------------------------------------------------------------------------- # Funciones CRUD # --------------------------------------------------------------------------- def bq_create_table( client: BQClient, dataset_id: str, table_id: str, schema: list[dict], partitioning: dict | None = None, clustering: list[str] | None = None, description: str = "", labels: dict | None = None, ) -> dict: """Crea una tabla en BigQuery con schema, particionamiento y clustering opcionales. Usa `client._client.create_table()` del SDK oficial. Args: client: Cliente autenticado BQClient. dataset_id: ID del dataset donde crear la tabla. table_id: ID (nombre) de la tabla a crear. schema: Lista de dicts con definicion de columnas. Cada dict: {"name": "col", "type": "STRING", "mode": "NULLABLE", "description": "..."} Tipos validos: STRING, INTEGER, FLOAT, BOOLEAN, BYTES, DATE, DATETIME, TIME, TIMESTAMP, RECORD, NUMERIC, BIGNUMERIC, JSON, GEOGRAPHY. Modos: NULLABLE, REQUIRED, REPEATED. partitioning: Dict de configuracion de particion o None. Ejemplo: {"type": "DAY", "field": "created_at"} Tipos: DAY, MONTH, YEAR, HOUR. Field vacio usa pseudo-columna _PARTITIONTIME. clustering: Lista de columnas para clustering (max 4) o None. Ejemplo: ["country", "status"] description: Descripcion de la tabla. labels: Etiquetas clave-valor para la tabla. Ejemplo: {"env": "prod"}. Returns: Dict con metadata de la tabla creada: table_id, dataset_id, project, full_id, schema, num_rows, num_bytes, created, modified, type, partitioning, clustering, description, labels. Raises: google.api_core.exceptions.Conflict: Si la tabla ya existe. google.api_core.exceptions.NotFound: Si el dataset no existe. google.api_core.exceptions.BadRequest: Si el schema es invalido. Example: >>> table = bq_create_table(client, "mi_dataset", "ventas", [ ... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, ... {"name": "fecha", "type": "DATE", "mode": "NULLABLE"}, ... {"name": "monto", "type": "FLOAT", "mode": "NULLABLE"}, ... ], partitioning={"type": "MONTH", "field": "fecha"}, ... clustering=["id"]) >>> print(table["full_id"]) """ table_ref = f"{client.project_id}.{dataset_id}.{table_id}" bq_schema = [_dict_to_schema_field(f) for f in schema] table = bigquery.Table(table_ref, schema=bq_schema) if partitioning: table.time_partitioning = bigquery.TimePartitioning( type_=partitioning.get("type", "DAY"), field=partitioning.get("field") or None, ) if clustering: table.clustering_fields = clustering if description: table.description = description if labels: table.labels = labels created = client._client.create_table(table) return _table_to_dict(created) def bq_get_table(client: BQClient, dataset_id: str, table_id: str) -> dict: """Obtiene los metadatos completos de una tabla BigQuery. Usa `client._client.get_table()` del SDK oficial. Args: client: Cliente autenticado BQClient. dataset_id: ID del dataset que contiene la tabla. table_id: ID (nombre) de la tabla. Returns: Dict con: table_id, dataset_id, project, full_id, schema (lista de dicts), num_rows, num_bytes, created (ISO 8601), modified (ISO 8601), type (TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL), partitioning (dict o None), clustering (lista de strings o None), description, labels. Raises: google.api_core.exceptions.NotFound: Si la tabla no existe. Example: >>> tabla = bq_get_table(client, "mi_dataset", "ventas") >>> print(tabla["num_rows"], tabla["schema"]) """ table_ref = f"{client.project_id}.{dataset_id}.{table_id}" table = client._client.get_table(table_ref) return _table_to_dict(table) def bq_list_tables(client: BQClient, dataset_id: str) -> list[dict]: """Lista todas las tablas de un dataset BigQuery. Usa `client._client.list_tables()` del SDK oficial. Args: client: Cliente autenticado BQClient. dataset_id: ID del dataset a listar. Returns: Lista de dicts resumidos, uno por tabla. Cada dict contiene: table_id, full_id, type (TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL). Raises: google.api_core.exceptions.NotFound: Si el dataset no existe. Example: >>> tablas = bq_list_tables(client, "mi_dataset") >>> for t in tablas: ... print(t["table_id"], t["type"]) """ dataset_ref = f"{client.project_id}.{dataset_id}" tables = client._client.list_tables(dataset_ref) return [ { "table_id": t.table_id, "full_id": f"{t.project}.{t.dataset_id}.{t.table_id}", "type": t.table_type or "TABLE", } for t in tables ] def bq_update_table( client: BQClient, dataset_id: str, table_id: str, schema: list[dict] | None = None, description: str | None = None, labels: dict | None = None, ) -> dict: """Actualiza metadatos de una tabla BigQuery. Usa `client._client.update_table()` del SDK oficial. Solo modifica los campos no-None. Para schema, BigQuery SOLO permite agregar columnas nuevas al final — no se pueden eliminar ni modificar columnas existentes. Args: client: Cliente autenticado BQClient. dataset_id: ID del dataset que contiene la tabla. table_id: ID (nombre) de la tabla a actualizar. schema: Lista de dicts con el schema completo nuevo (incluye columnas existentes + nuevas). Solo se permiten adiciones. None = sin cambios. description: Nueva descripcion de la tabla. None = sin cambios. labels: Nuevas etiquetas clave-valor. None = sin cambios. Returns: Dict con la metadata actualizada de la tabla (misma estructura que bq_get_table). Raises: google.api_core.exceptions.NotFound: Si la tabla no existe. google.api_core.exceptions.BadRequest: Si se intenta eliminar columnas. Example: >>> tabla = bq_update_table(client, "mi_dataset", "ventas", ... description="Tabla de ventas actualizada", ... labels={"env": "prod", "team": "data"}) >>> tabla = bq_update_table(client, "mi_dataset", "ventas", ... schema=[ ... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, ... {"name": "nueva_col", "type": "STRING", "mode": "NULLABLE"}, ... ]) """ table_ref = f"{client.project_id}.{dataset_id}.{table_id}" table = client._client.get_table(table_ref) fields_to_update = [] if schema is not None: table.schema = [_dict_to_schema_field(f) for f in schema] fields_to_update.append("schema") if description is not None: table.description = description fields_to_update.append("description") if labels is not None: table.labels = labels fields_to_update.append("labels") if not fields_to_update: return _table_to_dict(table) updated = client._client.update_table(table, fields_to_update) return _table_to_dict(updated) def bq_delete_table(client: BQClient, dataset_id: str, table_id: str) -> None: """Elimina permanentemente una tabla de BigQuery. Usa `client._client.delete_table()` del SDK oficial. IRREVERSIBLE — no hay papelera de reciclaje. Considerar exportar datos antes de eliminar. Args: client: Cliente autenticado BQClient. dataset_id: ID del dataset que contiene la tabla. table_id: ID (nombre) de la tabla a eliminar. Raises: google.api_core.exceptions.NotFound: Si la tabla no existe. Example: >>> bq_delete_table(client, "mi_dataset", "tabla_temporal") """ table_ref = f"{client.project_id}.{dataset_id}.{table_id}" client._client.delete_table(table_ref) def bq_preview_rows( client: BQClient, dataset_id: str, table_id: str, max_results: int = 10, ) -> dict: """Obtiene una muestra de filas de una tabla BigQuery sin ejecutar query. Usa `client._client.list_rows()` del SDK oficial — no genera costes de procesamiento de bytes (no es una query SQL). Ideal para vista previa rapida. Args: client: Cliente autenticado BQClient. dataset_id: ID del dataset que contiene la tabla. table_id: ID (nombre) de la tabla a previsualizar. max_results: Numero maximo de filas a retornar. Default: 10. Returns: Dict con: - columns: lista de strings con nombres de columnas - rows: lista de listas con valores de cada fila - total_rows: numero total de filas en la tabla (no en el preview) Raises: google.api_core.exceptions.NotFound: Si la tabla no existe. Example: >>> preview = bq_preview_rows(client, "mi_dataset", "ventas", max_results=5) >>> print(preview["columns"]) >>> for row in preview["rows"]: ... print(row) >>> print(f"Total en tabla: {preview['total_rows']}") """ table_ref = f"{client.project_id}.{dataset_id}.{table_id}" table = client._client.get_table(table_ref) rows_iter = client._client.list_rows(table, max_results=max_results) columns = [f.name for f in rows_iter.schema] rows = [list(row.values()) for row in rows_iter] return { "columns": columns, "rows": rows, "total_rows": table.num_rows, }