diff --git a/python/functions/bigquery/__init__.py b/python/functions/bigquery/__init__.py new file mode 100644 index 00000000..65c2c11b --- /dev/null +++ b/python/functions/bigquery/__init__.py @@ -0,0 +1,28 @@ +from .client import BQClient, bq_auth +from .tables import ( + bq_create_table, + bq_get_table, + bq_list_tables, + bq_update_table, + bq_delete_table, + bq_preview_rows, +) +from .datasets import ( + bq_create_dataset, + bq_get_dataset, + bq_list_datasets, + bq_update_dataset, + bq_delete_dataset, +) +from .queries import bq_query, bq_insert_rows, bq_load_from_gcs, bq_load_from_file, bq_export_to_gcs, bq_copy_table +from .jobs import bq_get_job, bq_list_jobs, bq_cancel_job +from .routines import bq_create_routine, bq_list_routines, bq_delete_routine + +__all__ = [ + "BQClient", "bq_auth", + "bq_create_table", "bq_get_table", "bq_list_tables", "bq_update_table", "bq_delete_table", "bq_preview_rows", + "bq_create_dataset", "bq_get_dataset", "bq_list_datasets", "bq_update_dataset", "bq_delete_dataset", + "bq_query", "bq_insert_rows", "bq_load_from_gcs", "bq_load_from_file", "bq_export_to_gcs", "bq_copy_table", + "bq_get_job", "bq_list_jobs", "bq_cancel_job", + "bq_create_routine", "bq_list_routines", "bq_delete_routine", +] diff --git a/python/functions/bigquery/bq_auth.md b/python/functions/bigquery/bq_auth.md new file mode 100644 index 00000000..8531dce2 --- /dev/null +++ b/python/functions/bigquery/bq_auth.md @@ -0,0 +1,56 @@ +--- +name: bq_auth +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_auth(project_id: str = '', credentials_path: str = '') -> BQClient" +description: "Autentica contra Google BigQuery con ADC o service account JSON. Retorna un BQClient listo para usar con todas las funciones CRUD." +tags: [bigquery, gcp, auth, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: project_id + desc: "ID del proyecto GCP (vacio = detectar de credenciales/entorno)" + - name: credentials_path + desc: "ruta a archivo JSON de service account (vacio = Application Default Credentials)" +output: "BQClient: cliente autenticado con proyecto resuelto" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/client.py" +--- + +## Ejemplo + +```python +from bigquery import bq_auth + +# ADC (gcloud auth application-default login) +client = bq_auth() + +# Proyecto explicito +client = bq_auth("my-project-id") + +# Service account +client = bq_auth(credentials_path="/path/to/service-account.json") + +# Context manager +with bq_auth() as client: + # client se cierra automaticamente + pass +``` + +## Notas + +Tres modos de autenticacion: +- Sin argumentos: usa Application Default Credentials (ADC) — requiere `gcloud auth application-default login` +- Con project_id: usa ADC pero fuerza el proyecto +- Con credentials_path: lee el JSON de service account directamente + +El BQClient wrappea `google.cloud.bigquery.Client` y expone `_client` para que las funciones del modulo lo usen internamente. diff --git a/python/functions/bigquery/bq_cancel_job.md b/python/functions/bigquery/bq_cancel_job.md new file mode 100644 index 00000000..444ab06b --- /dev/null +++ b/python/functions/bigquery/bq_cancel_job.md @@ -0,0 +1,52 @@ +--- +name: bq_cancel_job +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_cancel_job(client: BQClient, job_id: str) -> dict" +description: "Cancela un job en ejecucion. Retorna el estado tras la cancelacion." +tags: [bigquery, gcp, job, cancel, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado de BigQuery obtenido con bq_auth" + - name: job_id + desc: "ID del job a cancelar (formato: 'proyecto:region.job_id' o solo 'job_id')" +output: "dict con campos: job_id (string), state (string, tipicamente 'DONE' tras la cancelacion)" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/jobs.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.jobs import bq_list_jobs, bq_cancel_job + +client = bq_auth("my-project") + +# Cancelar todos los jobs en ejecucion +running = bq_list_jobs(client, state_filter="running") +for job in running: + result = bq_cancel_job(client, job["job_id"]) + print(f"Cancelado {result['job_id']}: {result['state']}") +``` + +## Notas + +Lanza `google.api_core.exceptions.NotFound` si el job_id no existe en el proyecto. + +BigQuery no garantiza cancelacion inmediata: el job puede seguir procesando brevemente antes de detenerse. El estado retornado refleja el estado al momento de la respuesta de la API, que tipicamente es `"DONE"`. + +Cancelar un job ya completado o ya cancelado no genera error; la API lo acepta de forma idempotente y retorna el estado actual. + +Solo el usuario que creo el job o un administrador del proyecto puede cancelarlo. diff --git a/python/functions/bigquery/bq_copy_table.md b/python/functions/bigquery/bq_copy_table.md new file mode 100644 index 00000000..a434a762 --- /dev/null +++ b/python/functions/bigquery/bq_copy_table.md @@ -0,0 +1,72 @@ +--- +name: bq_copy_table +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_copy_table(client: BQClient, source_dataset: str, source_table: str, dest_dataset: str, dest_table: str, write_disposition: str = 'WRITE_EMPTY') -> dict" +description: "Copia una tabla BigQuery a otro dataset o tabla dentro del mismo proyecto usando copy_table del SDK. Espera la finalizacion del CopyJob." +tags: [bigquery, gcp, copy, table, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente BQClient autenticado contra el proyecto GCP" + - name: source_dataset + desc: "ID del dataset de origen en BigQuery" + - name: source_table + desc: "ID de la tabla de origen en BigQuery" + - name: dest_dataset + desc: "ID del dataset de destino en BigQuery (puede ser el mismo que el origen)" + - name: dest_table + desc: "ID de la tabla de destino; si no existe, BigQuery la crea automaticamente" + - name: write_disposition + desc: "comportamiento si la tabla destino ya existe: WRITE_EMPTY falla, WRITE_APPEND agrega, WRITE_TRUNCATE sobreescribe" +output: "dict con {job_id: ID del CopyJob, status: DONE o FAILED}" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/queries.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.queries import bq_copy_table + +client = bq_auth("my-project") + +# Copia de produccion a staging (falla si ya existe) +result = bq_copy_table( + client, + "production", "users", + "staging", "users_backup", +) +print(f"Copia completada: {result['status']} — job: {result['job_id']}") + +# Copia sobreescribiendo destino +result = bq_copy_table( + client, + "raw", "events_2024", + "processed", "events_latest", + write_disposition="WRITE_TRUNCATE", +) +``` + +## Notas + +`copy_table` solo funciona dentro del mismo proyecto GCP. Para copiar entre proyectos, +usar `bq_export_to_gcs` + `bq_load_from_gcs`. + +Si la tabla destino no existe, BigQuery la crea con el schema de la tabla origen +independientemente del `write_disposition`. + +El CopyJob es asincrono; `job.result()` bloquea hasta completar. La copia no mueve +datos fisicamente — BigQuery usa referencias de bloques internamente hasta que se +modifica la copia. diff --git a/python/functions/bigquery/bq_create_dataset.md b/python/functions/bigquery/bq_create_dataset.md new file mode 100644 index 00000000..6041501f --- /dev/null +++ b/python/functions/bigquery/bq_create_dataset.md @@ -0,0 +1,59 @@ +--- +name: bq_create_dataset +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_create_dataset(client: BQClient, dataset_id: str, location: str = 'US', description: str = '', labels: dict[str, str] | None = None, default_table_expiration_ms: int = 0) -> dict" +description: "Crea un dataset en Google BigQuery con ubicacion, descripcion y labels. Usa client._client.create_dataset() del SDK oficial." +tags: [bigquery, gcp, dataset, create, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "instancia autenticada de BQClient" + - name: dataset_id + desc: "nombre del dataset dentro del proyecto (sin prefijo de proyecto)" + - name: location + desc: "ubicacion geografica del dataset: US, EU, us-central1, europe-west1, etc." + - name: description + desc: "descripcion opcional del dataset" + - name: labels + desc: "dict de labels key-value para categorizar el dataset" + - name: default_table_expiration_ms + desc: "tiempo de expiracion por defecto para tablas en milisegundos; 0 = sin expiracion" +output: "dict con dataset_id, project, full_id, location, description, labels, created, modified, default_table_expiration_ms" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/datasets.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.datasets import bq_create_dataset + +client = bq_auth("my-project") +ds = bq_create_dataset( + client, + "analytics", + location="EU", + description="Data warehouse principal", + labels={"env": "prod", "team": "data"}, +) +print(ds["full_id"], ds["location"]) +# my-project.analytics EU +``` + +## Notas + +Lanza `google.api_core.exceptions.Conflict` (409) si el dataset ya existe. +El `full_id` tiene formato `{project}.{dataset_id}` y puede usarse directamente como referencia en otras llamadas al SDK. +`default_table_expiration_ms` aplica a todas las tablas nuevas del dataset; las tablas existentes no se ven afectadas. diff --git a/python/functions/bigquery/bq_create_routine.md b/python/functions/bigquery/bq_create_routine.md new file mode 100644 index 00000000..8773f096 --- /dev/null +++ b/python/functions/bigquery/bq_create_routine.md @@ -0,0 +1,80 @@ +--- +name: bq_create_routine +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_create_routine(client: BQClient, dataset_id: str, routine_id: str, body: str, routine_type: str = 'SCALAR_FUNCTION', language: str = 'SQL', arguments: list[dict] | None = None, return_type: str = '', description: str = '') -> dict" +description: "Crea una routine (UDF scalar, tabla o stored procedure) en BigQuery. Soporta SQL, JavaScript y Python." +tags: [bigquery, gcp, routine, udf, create, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "instancia autenticada de BQClient" + - name: dataset_id + desc: "nombre del dataset donde se crea la routine" + - name: routine_id + desc: "nombre/identificador de la routine dentro del dataset" + - name: body + desc: "cuerpo de la routine: expresion SQL, bloque JavaScript o codigo Python segun el lenguaje" + - name: routine_type + desc: "tipo de routine: SCALAR_FUNCTION para UDFs escalares, TABLE_VALUED_FUNCTION para UDFs de tabla, PROCEDURE para stored procedures" + - name: language + desc: "lenguaje de implementacion: SQL, JAVASCRIPT o PYTHON" + - name: arguments + desc: "lista de argumentos, cada uno como dict con claves 'name' y 'data_type' (ej: INT64, STRING, FLOAT64)" + - name: return_type + desc: "tipo de dato que retorna la funcion (ej: INT64, STRING); ignorado para PROCEDURE" + - name: description + desc: "descripcion opcional de la routine" +output: "dict con routine_id, dataset_id, project, routine_type, language, body, description, created y modified (ISO 8601)" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/routines.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.routines import bq_create_routine + +client = bq_auth("my-project") + +# UDF escalar SQL +fn = bq_create_routine( + client, + dataset_id="analytics", + routine_id="double_value", + body="x * 2", + arguments=[{"name": "x", "data_type": "INT64"}], + return_type="INT64", + description="Duplica un entero", +) +print(fn["routine_id"], fn["routine_type"], fn["language"]) +# double_value SCALAR_FUNCTION SQL + +# Stored procedure SQL +bq_create_routine( + client, + dataset_id="analytics", + routine_id="refresh_summary", + body="INSERT INTO summary SELECT * FROM raw WHERE date = CURRENT_DATE();", + routine_type="PROCEDURE", +) +``` + +## Notas + +Lanza `google.api_core.exceptions.Conflict` (409) si la routine ya existe. Para actualizar una routine existente, eliminarla primero con `bq_delete_routine` y recrearla, o usar `client._client.update_routine()` directamente. + +Los `data_type` de los argumentos deben ser constantes de `bigquery.StandardSqlTypeNames`: `INT64`, `FLOAT64`, `STRING`, `BOOL`, `BYTES`, `DATE`, `DATETIME`, `TIMESTAMP`, `TIME`, `NUMERIC`, `BIGNUMERIC`, `JSON`, `ARRAY`, `STRUCT`. + +Las routines JavaScript permiten librerias externas via `imported_libraries` (no expuesto en este wrapper). diff --git a/python/functions/bigquery/bq_create_table.md b/python/functions/bigquery/bq_create_table.md new file mode 100644 index 00000000..f4167a7f --- /dev/null +++ b/python/functions/bigquery/bq_create_table.md @@ -0,0 +1,68 @@ +--- +name: bq_create_table +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_create_table(client: BQClient, dataset_id: str, table_id: str, schema: list[dict], partitioning: dict | None = None, clustering: list[str] | None = None, description: str = '', labels: dict | None = None) -> dict" +description: "Crea una tabla en BigQuery con schema, particionamiento opcional y clustering. Usa client._client.create_table() del SDK oficial." +tags: [bigquery, gcp, table, create, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado BQClient obtenido con bq_auth" + - name: dataset_id + desc: "ID del dataset de BigQuery donde crear la tabla" + - name: table_id + desc: "nombre (ID) de la tabla a crear" + - name: schema + desc: "lista de dicts con definicion de columnas: [{name, type, mode, description}]. Tipos: STRING, INTEGER, FLOAT, BOOLEAN, DATE, TIMESTAMP, RECORD, etc." + - name: partitioning + desc: "dict opcional con tipo y campo de particion: {type: DAY|MONTH|YEAR|HOUR, field: nombre_col}. None = sin particion" + - name: clustering + desc: "lista de hasta 4 columnas para clustering (ordenacion fisica). None = sin clustering" + - name: description + desc: "descripcion legible de la tabla (vacio = sin descripcion)" + - name: labels + desc: "etiquetas clave-valor para la tabla, ej: {env: prod, team: data}" +output: "dict con metadata de la tabla creada: table_id, dataset_id, project, full_id, schema, num_rows, num_bytes, created, modified, type, partitioning, clustering, description, labels" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/tables.py" +--- + +## Ejemplo + +```python +from bigquery import bq_auth, bq_create_table + +client = bq_auth("mi-proyecto") + +tabla = bq_create_table( + client, + dataset_id="ventas_ds", + table_id="transacciones", + schema=[ + {"name": "id", "type": "INTEGER", "mode": "REQUIRED", "description": "ID unico"}, + {"name": "fecha", "type": "DATE", "mode": "NULLABLE", "description": "Fecha de la transaccion"}, + {"name": "monto", "type": "FLOAT", "mode": "NULLABLE", "description": "Monto en USD"}, + {"name": "pais", "type": "STRING", "mode": "NULLABLE", "description": "Codigo de pais"}, + ], + partitioning={"type": "MONTH", "field": "fecha"}, + clustering=["pais"], + description="Transacciones de ventas por mes", + labels={"env": "prod", "team": "data"}, +) +print(tabla["full_id"]) +``` + +## Notas + +El schema se convierte internamente de dicts a objetos `bigquery.SchemaField`. El particionamiento `TIME` soporta DAY, MONTH, YEAR y HOUR sobre columnas DATE/DATETIME/TIMESTAMP. Si `field` se omite en `partitioning`, BigQuery usa la pseudo-columna `_PARTITIONTIME`. El clustering requiere que las columnas existan en el schema y mejora rendimiento en filtros frecuentes. diff --git a/python/functions/bigquery/bq_delete_dataset.md b/python/functions/bigquery/bq_delete_dataset.md new file mode 100644 index 00000000..f8374f1f --- /dev/null +++ b/python/functions/bigquery/bq_delete_dataset.md @@ -0,0 +1,51 @@ +--- +name: bq_delete_dataset +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_delete_dataset(client: BQClient, dataset_id: str, delete_contents: bool = False) -> None" +description: "Elimina un dataset de Google BigQuery. IRREVERSIBLE. Usa client._client.delete_dataset() del SDK oficial." +tags: [bigquery, gcp, dataset, delete, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "instancia autenticada de BQClient" + - name: dataset_id + desc: "nombre del dataset a eliminar (sin prefijo de proyecto)" + - name: delete_contents + desc: "si True elimina todas las tablas y vistas del dataset antes de borrarlo; si False falla si el dataset contiene objetos" +output: "None" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/datasets.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.datasets import bq_delete_dataset + +client = bq_auth("my-project") + +# Eliminar dataset vacio +bq_delete_dataset(client, "temp_dataset") + +# Eliminar dataset con todas sus tablas +bq_delete_dataset(client, "temp_analytics", delete_contents=True) +``` + +## Notas + +IRREVERSIBLE. Todos los datos del dataset se pierden permanentemente. +Lanza `google.api_core.exceptions.NotFound` (404) si el dataset no existe. +Lanza `google.api_core.exceptions.BadRequest` (400) si el dataset contiene tablas y `delete_contents=False`. +Por seguridad el valor por defecto de `delete_contents` es `False` — requiere confirmacion explicita para borrar contenido. diff --git a/python/functions/bigquery/bq_delete_routine.md b/python/functions/bigquery/bq_delete_routine.md new file mode 100644 index 00000000..73430ce5 --- /dev/null +++ b/python/functions/bigquery/bq_delete_routine.md @@ -0,0 +1,51 @@ +--- +name: bq_delete_routine +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_delete_routine(client: BQClient, dataset_id: str, routine_id: str) -> None" +description: "Elimina una routine de un dataset." +tags: [bigquery, gcp, routine, udf, delete, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: true +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "instancia autenticada de BQClient" + - name: dataset_id + desc: "nombre del dataset que contiene la routine" + - name: routine_id + desc: "nombre/identificador de la routine a eliminar" +output: "None; lanza NotFound si la routine no existe" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/routines.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.routines import bq_delete_routine + +client = bq_auth("my-project") +bq_delete_routine(client, "analytics", "double_value") +# La routine queda eliminada permanentemente +``` + +## Notas + +Lanza `google.api_core.exceptions.NotFound` (404) si la routine no existe. La operacion es permanente e irreversible. + +Para eliminar y recrear una routine actualizada, combinar con `bq_create_routine`: + +```python +bq_delete_routine(client, "analytics", "double_value") +bq_create_routine(client, "analytics", "double_value", body="x * 3", ...) +``` diff --git a/python/functions/bigquery/bq_delete_table.md b/python/functions/bigquery/bq_delete_table.md new file mode 100644 index 00000000..c8e7cb9f --- /dev/null +++ b/python/functions/bigquery/bq_delete_table.md @@ -0,0 +1,51 @@ +--- +name: bq_delete_table +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_delete_table(client: BQClient, dataset_id: str, table_id: str) -> None" +description: "Elimina permanentemente una tabla de BigQuery. IRREVERSIBLE. Usa client._client.delete_table() del SDK oficial." +tags: [bigquery, gcp, table, delete, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: true +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado BQClient obtenido con bq_auth" + - name: dataset_id + desc: "ID del dataset que contiene la tabla" + - name: table_id + desc: "nombre (ID) de la tabla a eliminar" +output: "None. Lanza excepcion si la tabla no existe o no hay permisos" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/tables.py" +--- + +## Ejemplo + +```python +from bigquery import bq_auth, bq_delete_table + +client = bq_auth("mi-proyecto") + +# Eliminar tabla temporal +bq_delete_table(client, "mi_dataset", "tabla_temporal_2024") + +# Verificar que no existe (capturar excepcion) +from google.api_core.exceptions import NotFound +try: + bq_delete_table(client, "mi_dataset", "tabla_que_no_existe") +except NotFound as e: + print(f"No encontrada: {e}") +``` + +## Notas + +La eliminacion es PERMANENTE — BigQuery no tiene papelera de reciclaje para tablas. Considerar exportar los datos a GCS antes de eliminar si hay posibilidad de necesitarlos. Si el dataset tiene `defaultTableExpirationMs` configurado, las tablas se pueden dejar expirar automaticamente en vez de eliminar manualmente. Requiere permiso `bigquery.tables.delete` sobre el dataset. diff --git a/python/functions/bigquery/bq_export_to_gcs.md b/python/functions/bigquery/bq_export_to_gcs.md new file mode 100644 index 00000000..3d34e985 --- /dev/null +++ b/python/functions/bigquery/bq_export_to_gcs.md @@ -0,0 +1,73 @@ +--- +name: bq_export_to_gcs +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_export_to_gcs(client: BQClient, dataset_id: str, table_id: str, destination_uri: str, destination_format: str = 'CSV', compression: str = 'NONE') -> dict" +description: "Exporta una tabla BigQuery a Google Cloud Storage usando extract_table del SDK. Soporta CSV, JSON, Avro y Parquet con compresion opcional." +tags: [bigquery, gcp, export, gcs, google-cloud, python, etl] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente BQClient autenticado contra el proyecto GCP" + - name: dataset_id + desc: "ID del dataset de origen en BigQuery" + - name: table_id + desc: "ID de la tabla de origen en BigQuery" + - name: destination_uri + desc: "URI de destino en GCS; para tablas grandes usar wildcard: gs://bucket/prefix_*.csv" + - name: destination_format + desc: "formato de exportacion: CSV, NEWLINE_DELIMITED_JSON, AVRO, PARQUET" + - name: compression + desc: "algoritmo de compresion: NONE, GZIP, SNAPPY (solo Parquet/Avro), DEFLATE" +output: "dict con {job_id: ID del ExtractJob, destination_uri: URI de destino, status: DONE o FAILED}" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/queries.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.queries import bq_export_to_gcs + +client = bq_auth("my-project") + +# Exportar como CSV comprimido con GZIP +result = bq_export_to_gcs( + client, + "my_dataset", "users", + "gs://my-bucket/exports/users_*.csv.gz", + compression="GZIP", +) +print(f"Exportado: {result['destination_uri']} — job: {result['job_id']}") + +# Exportar como Parquet con compresion Snappy +result = bq_export_to_gcs( + client, + "my_dataset", "events", + "gs://my-bucket/exports/events_*.parquet", + destination_format="PARQUET", + compression="SNAPPY", +) +print(f"Status: {result['status']}") +``` + +## Notas + +Para tablas grandes BigQuery genera multiples archivos automaticamente cuando se usa +wildcard (`*`) en el URI. Sin wildcard falla si la tabla supera 1 GB. + +`SNAPPY` solo es valido para formatos binarios (PARQUET, AVRO). Usar `GZIP` para CSV +y NEWLINE_DELIMITED_JSON. + +El ExtractJob es asincrono en BigQuery; `job.result()` bloquea hasta completar. diff --git a/python/functions/bigquery/bq_get_dataset.md b/python/functions/bigquery/bq_get_dataset.md new file mode 100644 index 00000000..a8a2e000 --- /dev/null +++ b/python/functions/bigquery/bq_get_dataset.md @@ -0,0 +1,45 @@ +--- +name: bq_get_dataset +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_get_dataset(client: BQClient, dataset_id: str) -> dict" +description: "Obtiene los detalles completos de un dataset de Google BigQuery. Usa client._client.get_dataset() del SDK oficial." +tags: [bigquery, gcp, dataset, get, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "instancia autenticada de BQClient" + - name: dataset_id + desc: "nombre del dataset a consultar (sin prefijo de proyecto)" +output: "dict con dataset_id, project, full_id, location, description, labels, created, modified, default_table_expiration_ms" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/datasets.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.datasets import bq_get_dataset + +client = bq_auth("my-project") +ds = bq_get_dataset(client, "analytics") +print(ds["location"], ds["description"]) +print(ds["created"]) # ISO 8601: "2024-01-15T10:30:00+00:00" +``` + +## Notas + +Lanza `google.api_core.exceptions.NotFound` (404) si el dataset no existe. +Los campos `created` y `modified` son strings ISO 8601 con timezone UTC, o `None` si el SDK no los retorna. +`labels` es un dict vacio `{}` si el dataset no tiene labels. diff --git a/python/functions/bigquery/bq_get_job.md b/python/functions/bigquery/bq_get_job.md new file mode 100644 index 00000000..1f77a16f --- /dev/null +++ b/python/functions/bigquery/bq_get_job.md @@ -0,0 +1,60 @@ +--- +name: bq_get_job +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_get_job(client: BQClient, job_id: str) -> dict" +description: "Obtiene detalles completos de un job por su ID incluyendo estado, bytes procesados y errores. Incluye campos adicionales respecto a bq_list_jobs: destination_table, query y lista de errores." +tags: [bigquery, gcp, job, get, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado de BigQuery obtenido con bq_auth" + - name: job_id + desc: "ID del job a consultar (formato: 'proyecto:region.job_id' o solo 'job_id')" +output: "dict con campos: job_id, job_type, state, created, started, ended, user_email, bytes_processed, error, y opcionalmente destination_table, query, errors" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/jobs.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.jobs import bq_get_job + +client = bq_auth("my-project") + +job = bq_get_job(client, "job_abc123") +print(job["state"], job["bytes_processed"]) + +# Inspeccionar query SQL del job +if job.get("query"): + print("SQL:", job["query"][:200]) + +# Ver errores detallados +if job["error"]: + print("Error principal:", job["error"]) + for err in job.get("errors", []): + print(" -", err) +``` + +## Notas + +Lanza `google.api_core.exceptions.NotFound` si el job_id no existe en el proyecto. + +A diferencia de `bq_list_jobs`, este metodo retorna campos adicionales cuando estan disponibles: +- `destination_table`: tabla destino (solo en query/load jobs con destino explicito) +- `query`: texto SQL del job (solo en query jobs) +- `errors`: lista completa de errores (cada uno como dict con `reason`, `message`, `location`) + +Los campos `created`, `started` y `ended` se serializan como strings ISO 8601. diff --git a/python/functions/bigquery/bq_get_table.md b/python/functions/bigquery/bq_get_table.md new file mode 100644 index 00000000..a9d61e7e --- /dev/null +++ b/python/functions/bigquery/bq_get_table.md @@ -0,0 +1,48 @@ +--- +name: bq_get_table +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_get_table(client: BQClient, dataset_id: str, table_id: str) -> dict" +description: "Obtiene los metadatos completos de una tabla BigQuery incluyendo schema, estadisticas y configuracion. Usa client._client.get_table() del SDK oficial." +tags: [bigquery, gcp, table, get, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado BQClient obtenido con bq_auth" + - name: dataset_id + desc: "ID del dataset que contiene la tabla" + - name: table_id + desc: "nombre (ID) de la tabla a consultar" +output: "dict con metadata completa: table_id, dataset_id, project, full_id, schema (lista de dicts), num_rows, num_bytes, created (ISO 8601), modified (ISO 8601), type, partitioning, clustering, description, labels" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/tables.py" +--- + +## Ejemplo + +```python +from bigquery import bq_auth, bq_get_table + +client = bq_auth("mi-proyecto") + +tabla = bq_get_table(client, "ventas_ds", "transacciones") +print(tabla["full_id"]) # mi-proyecto.ventas_ds.transacciones +print(tabla["num_rows"]) # filas totales +print(tabla["num_bytes"]) # bytes almacenados +for col in tabla["schema"]: + print(col["name"], col["type"], col["mode"]) +``` + +## Notas + +`num_rows` y `num_bytes` son estadisticas actualizadas por BigQuery periodicamente (pueden tener un pequeno retraso). El campo `type` puede ser TABLE, VIEW, MATERIALIZED_VIEW o EXTERNAL. `partitioning` es None si la tabla no tiene particionamiento. `created` y `modified` estan en formato ISO 8601. diff --git a/python/functions/bigquery/bq_insert_rows.md b/python/functions/bigquery/bq_insert_rows.md new file mode 100644 index 00000000..0798e451 --- /dev/null +++ b/python/functions/bigquery/bq_insert_rows.md @@ -0,0 +1,60 @@ +--- +name: bq_insert_rows +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_insert_rows(client: BQClient, dataset_id: str, table_id: str, rows: list[dict]) -> dict" +description: "Inserta filas en una tabla BigQuery usando streaming insert (insert_rows_json). Retorna el conteo de filas insertadas y errores por fila." +tags: [bigquery, gcp, insert, streaming, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente BQClient autenticado contra el proyecto GCP" + - name: dataset_id + desc: "ID del dataset de destino en BigQuery" + - name: table_id + desc: "ID de la tabla de destino en BigQuery" + - name: rows + desc: "lista de dicts con los datos a insertar; las claves deben coincidir con las columnas de la tabla" +output: "dict con {inserted: N filas insertadas sin error, errors: lista de errores por fila retornada por la API}" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/queries.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.queries import bq_insert_rows + +client = bq_auth("my-project") + +result = bq_insert_rows(client, "my_dataset", "events", [ + {"event_id": "abc1", "user_id": 42, "ts": "2024-01-01T12:00:00Z", "action": "click"}, + {"event_id": "abc2", "user_id": 99, "ts": "2024-01-01T12:01:00Z", "action": "view"}, +]) + +print(f"Insertadas: {result['inserted']}") +if result["errors"]: + print("Errores:", result["errors"]) +``` + +## Notas + +El streaming insert tiene disponibilidad casi inmediata pero no es transaccional. +Las filas pueden aparecer duplicadas si el job se reintenta — BigQuery no garantiza +exactamente-una-vez con insert_rows_json. + +`errors` es la lista retornada directamente por la API. Cada elemento es un dict con +`index` (posicion de la fila fallida) y `errors` (lista de mensajes de error). + +Para cargas masivas o garantias ACID, preferir `bq_load_from_gcs` o `bq_load_from_file`. diff --git a/python/functions/bigquery/bq_list_datasets.md b/python/functions/bigquery/bq_list_datasets.md new file mode 100644 index 00000000..e9672250 --- /dev/null +++ b/python/functions/bigquery/bq_list_datasets.md @@ -0,0 +1,45 @@ +--- +name: bq_list_datasets +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_list_datasets(client: BQClient) -> list[dict]" +description: "Lista todos los datasets del proyecto de Google BigQuery. Usa client._client.list_datasets() del SDK oficial." +tags: [bigquery, gcp, dataset, list, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "instancia autenticada de BQClient" +output: "lista de dicts con dataset_id, project y full_id por cada dataset del proyecto" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/datasets.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.datasets import bq_list_datasets + +client = bq_auth("my-project") +datasets = bq_list_datasets(client) +for ds in datasets: + print(ds["dataset_id"], ds["full_id"]) +# analytics my-project.analytics +# raw_data my-project.raw_data +``` + +## Notas + +Retorna solo campos basicos (dataset_id, project, full_id). Para obtener detalles completos (location, description, labels) usar `bq_get_dataset` sobre cada item. +El SDK retorna un iterador; esta funcion lo materializa en una lista completa. +Si el proyecto no tiene datasets, retorna lista vacia `[]`. diff --git a/python/functions/bigquery/bq_list_jobs.md b/python/functions/bigquery/bq_list_jobs.md new file mode 100644 index 00000000..a59bfa1a --- /dev/null +++ b/python/functions/bigquery/bq_list_jobs.md @@ -0,0 +1,61 @@ +--- +name: bq_list_jobs +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_list_jobs(client: BQClient, state_filter: str = '', max_results: int = 50, all_users: bool = False) -> list[dict]" +description: "Lista jobs del proyecto con filtro por estado (running, pending, done). Retorna una lista de dicts planos con metadatos de cada job." +tags: [bigquery, gcp, job, list, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado de BigQuery obtenido con bq_auth" + - name: state_filter + desc: "filtro por estado del job: 'running', 'pending' o 'done'; vacio = todos los estados" + - name: max_results + desc: "numero maximo de jobs a retornar (por defecto 50)" + - name: all_users + desc: "si True lista jobs de todos los usuarios del proyecto; requiere permisos de administrador" +output: "lista de dicts con campos: job_id, job_type, state, created, started, ended, user_email, bytes_processed, error" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/jobs.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.jobs import bq_list_jobs + +client = bq_auth("my-project") + +# Todos los jobs recientes +jobs = bq_list_jobs(client, max_results=20) + +# Solo jobs en ejecucion +running = bq_list_jobs(client, state_filter="running") +for j in running: + print(j["job_id"], j["state"], j["job_type"]) + +# Jobs completados de todos los usuarios +done = bq_list_jobs(client, state_filter="done", all_users=True, max_results=100) +``` + +## Notas + +Los campos `created`, `started` y `ended` se serializan como strings ISO 8601 (o `None` si no estan disponibles). + +El campo `bytes_processed` solo esta disponible en query jobs; para otros tipos de job (load, export, copy) se retorna `None`. + +El campo `error` contiene el error_result como string si el job fallo, o `None` si no hay error. + +Para obtener el texto completo del SQL de un job concreto, usar `bq_get_job` que incluye el campo `query`. diff --git a/python/functions/bigquery/bq_list_routines.md b/python/functions/bigquery/bq_list_routines.md new file mode 100644 index 00000000..764e13a2 --- /dev/null +++ b/python/functions/bigquery/bq_list_routines.md @@ -0,0 +1,49 @@ +--- +name: bq_list_routines +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_list_routines(client: BQClient, dataset_id: str) -> list[dict]" +description: "Lista routines de un dataset incluyendo tipo y lenguaje." +tags: [bigquery, gcp, routine, udf, list, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "instancia autenticada de BQClient" + - name: dataset_id + desc: "nombre del dataset cuyas routines se quieren listar" +output: "lista de dicts con routine_id, dataset_id, project, routine_type y language para cada routine del dataset" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/routines.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.routines import bq_list_routines + +client = bq_auth("my-project") +routines = bq_list_routines(client, "analytics") + +for r in routines: + print(r["routine_id"], r["routine_type"], r["language"]) +# double_value SCALAR_FUNCTION SQL +# refresh_summary PROCEDURE SQL +# parse_json SCALAR_FUNCTION JAVASCRIPT +``` + +## Notas + +Retorna lista vacia si el dataset no tiene routines. Lanza `google.api_core.exceptions.NotFound` si el dataset no existe. + +El listado solo incluye metadata basica (routine_id, tipo, lenguaje). Para obtener el cuerpo y argumentos completos de una routine especifica, usar `client._client.get_routine(f"{project}.{dataset}.{routine_id}")`. diff --git a/python/functions/bigquery/bq_list_tables.md b/python/functions/bigquery/bq_list_tables.md new file mode 100644 index 00000000..d83d5e9f --- /dev/null +++ b/python/functions/bigquery/bq_list_tables.md @@ -0,0 +1,48 @@ +--- +name: bq_list_tables +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_list_tables(client: BQClient, dataset_id: str) -> list[dict]" +description: "Lista todas las tablas (y vistas) de un dataset BigQuery con informacion resumida. Usa client._client.list_tables() del SDK oficial." +tags: [bigquery, gcp, table, list, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado BQClient obtenido con bq_auth" + - name: dataset_id + desc: "ID del dataset a listar" +output: "lista de dicts, uno por objeto del dataset. Cada dict contiene: table_id (nombre), full_id (project.dataset.table), type (TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL)" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/tables.py" +--- + +## Ejemplo + +```python +from bigquery import bq_auth, bq_list_tables + +client = bq_auth("mi-proyecto") + +tablas = bq_list_tables(client, "ventas_ds") +for t in tablas: + print(t["table_id"], t["type"]) + # transacciones TABLE + # vista_mensual VIEW + +# Filtrar solo tablas fisicas +solo_tablas = [t for t in tablas if t["type"] == "TABLE"] +``` + +## Notas + +Retorna objetos resumidos (`ListTableItem`), no los metadatos completos. Para obtener schema, estadisticas y configuracion completa usar `bq_get_table`. El listado incluye tablas, vistas, vistas materializadas y tablas externas (EXTERNAL). El orden de retorno no esta garantizado. diff --git a/python/functions/bigquery/bq_load_from_file.md b/python/functions/bigquery/bq_load_from_file.md new file mode 100644 index 00000000..2a3ce627 --- /dev/null +++ b/python/functions/bigquery/bq_load_from_file.md @@ -0,0 +1,75 @@ +--- +name: bq_load_from_file +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_load_from_file(client: BQClient, file_path: str, dataset_id: str, table_id: str, source_format: str = 'CSV', write_disposition: str = 'WRITE_APPEND', autodetect: bool = True, skip_leading_rows: int = 0) -> dict" +description: "Carga datos desde un archivo local a una tabla BigQuery usando load_table_from_file del SDK. Equivalente a bq_load_from_gcs pero para disco local." +tags: [bigquery, gcp, load, file, google-cloud, python, etl] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente BQClient autenticado contra el proyecto GCP" + - name: file_path + desc: "ruta absoluta o relativa al archivo local a cargar" + - name: dataset_id + desc: "ID del dataset de destino en BigQuery" + - name: table_id + desc: "ID de la tabla de destino en BigQuery" + - name: source_format + desc: "formato del archivo fuente: CSV, NEWLINE_DELIMITED_JSON, AVRO, PARQUET, ORC" + - name: write_disposition + desc: "comportamiento si la tabla ya existe: WRITE_APPEND agrega, WRITE_TRUNCATE reemplaza, WRITE_EMPTY falla si hay datos" + - name: autodetect + desc: "si True, BigQuery infiere el schema automaticamente desde los datos" + - name: skip_leading_rows + desc: "numero de filas a ignorar al inicio del archivo (tipicamente 1 para saltar cabeceras CSV)" +output: "dict con {job_id: ID del LoadJob, rows_loaded: filas cargadas, status: DONE o FAILED}" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/queries.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.queries import bq_load_from_file + +client = bq_auth("my-project") + +# Cargar CSV local con cabecera +result = bq_load_from_file( + client, + "/tmp/export_users.csv", + "my_dataset", "users", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", +) +print(f"Cargadas {result['rows_loaded']} filas — job: {result['job_id']}") + +# Cargar JSONL local +result = bq_load_from_file( + client, + "/data/events.jsonl", + "my_dataset", "events", + source_format="NEWLINE_DELIMITED_JSON", +) +``` + +## Notas + +El archivo se abre en modo binario (`rb`) y se sube directamente al job de BigQuery. +Para archivos muy grandes, preferir `bq_load_from_gcs` — subir primero a GCS y luego +cargar desde ahi es mas eficiente y permite paralelismo. + +La funcion bloquea hasta que el job termina (`job.result()`). Los archivos Parquet y +Avro no admiten `skip_leading_rows` — ese parametro solo aplica para CSV. diff --git a/python/functions/bigquery/bq_load_from_gcs.md b/python/functions/bigquery/bq_load_from_gcs.md new file mode 100644 index 00000000..730d847e --- /dev/null +++ b/python/functions/bigquery/bq_load_from_gcs.md @@ -0,0 +1,77 @@ +--- +name: bq_load_from_gcs +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_load_from_gcs(client: BQClient, uri: str | list[str], dataset_id: str, table_id: str, source_format: str = 'CSV', write_disposition: str = 'WRITE_APPEND', autodetect: bool = True, skip_leading_rows: int = 0) -> dict" +description: "Carga datos desde uno o varios URIs de Google Cloud Storage a una tabla BigQuery configurando un LoadJob. Espera la finalizacion del job." +tags: [bigquery, gcp, load, gcs, google-cloud, python, etl] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente BQClient autenticado contra el proyecto GCP" + - name: uri + desc: "URI de GCS de origen (gs://bucket/file.csv) o lista de URIs; soporta wildcards como gs://bucket/prefix_*.csv" + - name: dataset_id + desc: "ID del dataset de destino en BigQuery" + - name: table_id + desc: "ID de la tabla de destino en BigQuery" + - name: source_format + desc: "formato del archivo fuente: CSV, NEWLINE_DELIMITED_JSON, AVRO, PARQUET, ORC" + - name: write_disposition + desc: "comportamiento si la tabla ya existe: WRITE_APPEND agrega, WRITE_TRUNCATE reemplaza, WRITE_EMPTY falla si hay datos" + - name: autodetect + desc: "si True, BigQuery infiere el schema automaticamente desde los datos" + - name: skip_leading_rows + desc: "numero de filas a ignorar al inicio del archivo (tipicamente 1 para saltar cabeceras CSV)" +output: "dict con {job_id: ID del LoadJob, rows_loaded: filas cargadas, status: DONE o FAILED}" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/queries.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.queries import bq_load_from_gcs + +client = bq_auth("my-project") + +# Cargar un archivo CSV con cabecera +result = bq_load_from_gcs( + client, + "gs://my-bucket/data/users_2024.csv", + "my_dataset", "users", + skip_leading_rows=1, +) +print(f"Cargadas {result['rows_loaded']} filas — job: {result['job_id']}") + +# Cargar multiples archivos Parquet reemplazando la tabla +result = bq_load_from_gcs( + client, + ["gs://my-bucket/export/part_001.parquet", "gs://my-bucket/export/part_002.parquet"], + "my_dataset", "events", + source_format="PARQUET", + write_disposition="WRITE_TRUNCATE", +) +``` + +## Notas + +El job se ejecuta de forma asincrona en BigQuery; `job.result()` bloquea hasta completar. + +Los wildcards en el URI (`gs://bucket/prefix_*.csv`) son resueltos por GCS — BigQuery +acepta la lista de archivos resultante como una sola carga atomica. + +`autodetect=True` es conveniente pero puede inferir tipos incorrectamente para columnas +con valores nulos o mixtos. Para produccion, definir el schema explicitamente via +`job_config.schema`. diff --git a/python/functions/bigquery/bq_preview_rows.md b/python/functions/bigquery/bq_preview_rows.md new file mode 100644 index 00000000..79587f7e --- /dev/null +++ b/python/functions/bigquery/bq_preview_rows.md @@ -0,0 +1,50 @@ +--- +name: bq_preview_rows +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_preview_rows(client: BQClient, dataset_id: str, table_id: str, max_results: int = 10) -> dict" +description: "Obtiene una muestra de filas de una tabla BigQuery sin ejecutar query SQL, sin coste de procesamiento. Usa client._client.list_rows() del SDK oficial." +tags: [bigquery, gcp, table, preview, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado BQClient obtenido con bq_auth" + - name: dataset_id + desc: "ID del dataset que contiene la tabla" + - name: table_id + desc: "nombre (ID) de la tabla a previsualizar" + - name: max_results + desc: "numero maximo de filas a retornar (default: 10)" +output: "dict con: columns (lista de nombres de columnas), rows (lista de listas con valores), total_rows (int con total de filas en la tabla completa)" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/tables.py" +--- + +## Ejemplo + +```python +from bigquery import bq_auth, bq_preview_rows + +client = bq_auth("mi-proyecto") + +preview = bq_preview_rows(client, "ventas_ds", "transacciones", max_results=5) + +print(preview["columns"]) # ["id", "fecha", "monto", "pais"] +print(f"Total filas: {preview['total_rows']}") +for row in preview["rows"]: + print(row) # [1, datetime.date(2024, 1, 15), 99.5, "MX"] +``` + +## Notas + +`list_rows()` usa la Storage Read API internamente y NO genera un job de query — por tanto no se contabiliza en el uso de bytes procesados. Ideal para inspeccionar rapidamente la estructura y contenido de una tabla. El orden de las filas retornadas no esta garantizado (depende del almacenamiento interno de BigQuery). Para muestras reproducibles o con filtros, usar una query SQL con `LIMIT`. `total_rows` refleja el conteo de la tabla en el momento de la llamada a `get_table()`, que puede tener un pequeno retraso respecto al dato real. diff --git a/python/functions/bigquery/bq_query.md b/python/functions/bigquery/bq_query.md new file mode 100644 index 00000000..4de7b043 --- /dev/null +++ b/python/functions/bigquery/bq_query.md @@ -0,0 +1,69 @@ +--- +name: bq_query +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_query(client: BQClient, sql: str, params: list[dict] | None = None, dry_run: bool = False) -> dict" +description: "Ejecuta una query SQL en BigQuery con soporte para parametros tipados y modo dry-run para estimacion de costos." +tags: [bigquery, gcp, query, sql, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente BQClient autenticado contra el proyecto GCP" + - name: sql + desc: "query SQL a ejecutar; usar @nombre para referencias a parametros tipados" + - name: params + desc: "lista de parametros tipados, cada uno con {name, type, value}; tipos: STRING, INT64, FLOAT64, BOOL, DATE, TIMESTAMP" + - name: dry_run + desc: "si True, estima bytes procesados/facturados sin ejecutar la query" +output: "si dry_run=True: {total_bytes_processed, total_bytes_billed}; si False: {columns, rows, total_rows, bytes_processed, cache_hit}" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/queries.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.queries import bq_query + +client = bq_auth("my-project") + +# Query simple +result = bq_query(client, "SELECT COUNT(*) as n FROM `my_project.dataset.table`") +print(result["columns"]) # ["n"] +print(result["rows"]) # [[12345]] + +# Con parametros +result = bq_query( + client, + "SELECT * FROM `dataset.users` WHERE status = @s AND age > @a", + params=[ + {"name": "s", "type": "STRING", "value": "active"}, + {"name": "a", "type": "INT64", "value": 18}, + ], +) + +# Dry-run para estimar costo +est = bq_query(client, "SELECT * FROM `dataset.big_table`", dry_run=True) +print(f"Procesaria {est['total_bytes_processed'] / 1e9:.2f} GB") +``` + +## Notas + +Usa `bigquery.ScalarQueryParameter` para parametros — solo soporta escalares. Para arrays +usar `bigquery.ArrayQueryParameter` directamente si se necesita. + +En dry_run=True el job se crea pero no se ejecuta; `job.result()` no se llama. BigQuery +retorna la estimacion de bytes sin cargo. + +`cache_hit=True` indica que el resultado provino de la cache de BigQuery (sin costo). diff --git a/python/functions/bigquery/bq_update_dataset.md b/python/functions/bigquery/bq_update_dataset.md new file mode 100644 index 00000000..21ae1b30 --- /dev/null +++ b/python/functions/bigquery/bq_update_dataset.md @@ -0,0 +1,59 @@ +--- +name: bq_update_dataset +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_update_dataset(client: BQClient, dataset_id: str, description: str | None = None, labels: dict[str, str] | None = None, default_table_expiration_ms: int | None = None) -> dict" +description: "Actualiza campos de un dataset de Google BigQuery. Solo modifica los campos pasados explicitamente (no-None). Usa client._client.update_dataset() del SDK oficial." +tags: [bigquery, gcp, dataset, update, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "instancia autenticada de BQClient" + - name: dataset_id + desc: "nombre del dataset a actualizar (sin prefijo de proyecto)" + - name: description + desc: "nueva descripcion del dataset; None = no modificar" + - name: labels + desc: "nuevos labels key-value; None = no modificar; dict vacio {} = eliminar todos los labels" + - name: default_table_expiration_ms + desc: "nueva expiracion por defecto de tablas en ms; None = no modificar; 0 = eliminar expiracion" +output: "dict con el dataset actualizado: dataset_id, project, full_id, location, description, labels, created, modified, default_table_expiration_ms" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/datasets.py" +--- + +## Ejemplo + +```python +from bigquery.client import bq_auth +from bigquery.datasets import bq_update_dataset + +client = bq_auth("my-project") + +# Actualizar solo la descripcion +ds = bq_update_dataset(client, "analytics", description="Data warehouse actualizado") + +# Actualizar labels +ds = bq_update_dataset(client, "analytics", labels={"env": "prod", "version": "2"}) + +# Eliminar expiracion de tablas +ds = bq_update_dataset(client, "analytics", default_table_expiration_ms=0) +print(ds["description"]) +``` + +## Notas + +Lanza `google.api_core.exceptions.NotFound` (404) si el dataset no existe. +Si no se pasa ningun campo (todos None), retorna el dataset sin modificar (no llama a update_dataset). +Para eliminar todos los labels pasar `labels={}`. Para eliminar la expiracion de tablas pasar `default_table_expiration_ms=0`. +El SDK hace un GET interno antes del PATCH para obtener el estado actual; esta funcion replica ese patron explicitamente. diff --git a/python/functions/bigquery/bq_update_table.md b/python/functions/bigquery/bq_update_table.md new file mode 100644 index 00000000..e175d143 --- /dev/null +++ b/python/functions/bigquery/bq_update_table.md @@ -0,0 +1,65 @@ +--- +name: bq_update_table +kind: function +lang: py +domain: infra +version: "1.0.0" +purity: impure +signature: "def bq_update_table(client: BQClient, dataset_id: str, table_id: str, schema: list[dict] | None = None, description: str | None = None, labels: dict | None = None) -> dict" +description: "Actualiza metadatos de una tabla BigQuery: schema (solo adicion de columnas), descripcion y etiquetas. Usa client._client.update_table() del SDK oficial." +tags: [bigquery, gcp, table, update, google-cloud, python] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [google-cloud-bigquery] +params: + - name: client + desc: "cliente autenticado BQClient obtenido con bq_auth" + - name: dataset_id + desc: "ID del dataset que contiene la tabla" + - name: table_id + desc: "nombre (ID) de la tabla a actualizar" + - name: schema + desc: "schema completo nuevo (columnas existentes + nuevas al final). BigQuery SOLO permite agregar columnas, no eliminar ni renombrar. None = no modificar" + - name: description + desc: "nueva descripcion de la tabla. None = no modificar" + - name: labels + desc: "nuevas etiquetas clave-valor (reemplaza las existentes). None = no modificar" +output: "dict con la metadata actualizada de la tabla (misma estructura que bq_get_table)" +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/bigquery/tables.py" +--- + +## Ejemplo + +```python +from bigquery import bq_auth, bq_update_table + +client = bq_auth("mi-proyecto") + +# Actualizar descripcion y etiquetas +tabla = bq_update_table( + client, "ventas_ds", "transacciones", + description="Transacciones de ventas — actualizado", + labels={"env": "prod", "team": "data", "version": "2"}, +) + +# Agregar columna nueva al schema existente +tabla = bq_update_table( + client, "ventas_ds", "transacciones", + schema=[ + {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "fecha", "type": "DATE", "mode": "NULLABLE"}, + {"name": "monto", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "nueva_col", "type": "STRING", "mode": "NULLABLE"}, # nueva + ], +) +``` + +## Notas + +BigQuery tiene restricciones estrictas sobre modificacion de schema: se pueden agregar columnas NULLABLE o REPEATED al final, pero NO se pueden eliminar columnas, renombrar columnas ni cambiar el tipo de una columna existente. Si se necesita ese tipo de cambio, la alternativa es crear una tabla nueva con `CREATE TABLE AS SELECT`. Los campos `None` no generan actualizacion — solo se envian al API los campos que cambian. diff --git a/python/functions/bigquery/client.py b/python/functions/bigquery/client.py new file mode 100644 index 00000000..06b65edb --- /dev/null +++ b/python/functions/bigquery/client.py @@ -0,0 +1,63 @@ +"""Cliente base para Google BigQuery.""" + +from dataclasses import dataclass, field +from google.cloud import bigquery +from google.oauth2 import service_account + + +@dataclass +class BQClient: + """Cliente para Google BigQuery. + + Attributes: + project_id: ID del proyecto GCP. + _client: Cliente oficial de BigQuery (interno). + """ + project_id: str + _client: bigquery.Client = field(repr=False) + + def close(self) -> None: + """Cierra el cliente BigQuery.""" + self._client.close() + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +def bq_auth(project_id: str = "", credentials_path: str = "") -> BQClient: + """Autentica contra Google BigQuery. + + Tres modos de autenticacion: + 1. ADC (Application Default Credentials): sin argumentos, usa gcloud auth + 2. Service account JSON: con credentials_path + 3. Proyecto explicito: con project_id (usa ADC para credenciales) + + Args: + project_id: ID del proyecto GCP. Vacio = detectar de credenciales. + credentials_path: Ruta a archivo JSON de service account. Vacio = ADC. + + Returns: + BQClient autenticado listo para usar. + + Raises: + google.auth.exceptions.DefaultCredentialsError: Si no hay credenciales configuradas. + FileNotFoundError: Si credentials_path no existe. + + Example: + >>> client = bq_auth() # ADC + >>> client = bq_auth("my-project") # ADC con proyecto explicito + >>> client = bq_auth(credentials_path="/path/to/sa.json") # Service account + """ + if credentials_path: + creds = service_account.Credentials.from_service_account_file(credentials_path) + proj = project_id or creds.project_id + client = bigquery.Client(credentials=creds, project=proj) + elif project_id: + client = bigquery.Client(project=project_id) + else: + client = bigquery.Client() + + return BQClient(project_id=client.project, _client=client) diff --git a/python/functions/bigquery/datasets.py b/python/functions/bigquery/datasets.py new file mode 100644 index 00000000..9b996c7b --- /dev/null +++ b/python/functions/bigquery/datasets.py @@ -0,0 +1,176 @@ +"""CRUD de datasets en Google BigQuery.""" + +from .client import BQClient +from google.cloud import bigquery + + +def bq_create_dataset( + client: BQClient, + dataset_id: str, + location: str = "US", + description: str = "", + labels: dict[str, str] | None = None, + default_table_expiration_ms: int = 0, +) -> dict: + """Crea un dataset en BigQuery. + + Args: + client: Cliente autenticado. + dataset_id: ID del dataset (solo el nombre, sin proyecto). + location: Ubicacion geografica (US, EU, us-central1, etc.). + description: Descripcion opcional. + labels: Labels key-value opcionales. + default_table_expiration_ms: Expiracion por defecto de tablas en ms. 0 = sin expiracion. + + Returns: + Dict con: dataset_id, project, full_id, location, description, labels, created, modified, + default_table_expiration_ms. + + Raises: + google.api_core.exceptions.Conflict: Si el dataset ya existe (409). + + Example: + >>> ds = bq_create_dataset(client, "analytics", location="EU", description="Data warehouse") + """ + ref = f"{client.project_id}.{dataset_id}" + ds = bigquery.Dataset(ref) + ds.location = location + if description: + ds.description = description + if labels: + ds.labels = labels + if default_table_expiration_ms > 0: + ds.default_table_expiration_ms = default_table_expiration_ms + created = client._client.create_dataset(ds) + return _dataset_to_dict(created) + + +def bq_get_dataset(client: BQClient, dataset_id: str) -> dict: + """Obtiene los detalles de un dataset. + + Args: + client: Cliente autenticado. + dataset_id: ID del dataset. + + Returns: + Dict con: dataset_id, project, full_id, location, description, labels, + created, modified, default_table_expiration_ms. + + Raises: + google.api_core.exceptions.NotFound: Si el dataset no existe (404). + + Example: + >>> ds = bq_get_dataset(client, "analytics") + >>> print(ds["location"], ds["description"]) + """ + ref = f"{client.project_id}.{dataset_id}" + ds = client._client.get_dataset(ref) + return _dataset_to_dict(ds) + + +def bq_list_datasets(client: BQClient) -> list[dict]: + """Lista todos los datasets del proyecto. + + Args: + client: Cliente autenticado. + + Returns: + Lista de dicts con: dataset_id, project, full_id. + + Example: + >>> datasets = bq_list_datasets(client) + >>> for ds in datasets: + ... print(ds["dataset_id"], ds["full_id"]) + """ + return [ + { + "dataset_id": ds.dataset_id, + "project": ds.project, + "full_id": f"{ds.project}.{ds.dataset_id}", + } + for ds in client._client.list_datasets() + ] + + +def bq_update_dataset( + client: BQClient, + dataset_id: str, + description: str | None = None, + labels: dict[str, str] | None = None, + default_table_expiration_ms: int | None = None, +) -> dict: + """Actualiza campos de un dataset. + + Solo se modifican los campos pasados (no-None). + + Args: + client: Cliente autenticado. + dataset_id: ID del dataset. + description: Nueva descripcion (None = no cambiar). + labels: Nuevos labels (None = no cambiar). + default_table_expiration_ms: Nueva expiracion de tablas en ms (None = no cambiar). + + Returns: + Dict con el dataset actualizado. + + Raises: + google.api_core.exceptions.NotFound: Si el dataset no existe. + + Example: + >>> bq_update_dataset(client, "analytics", description="Updated description") + """ + ref = f"{client.project_id}.{dataset_id}" + ds = client._client.get_dataset(ref) + fields = [] + if description is not None: + ds.description = description + fields.append("description") + if labels is not None: + ds.labels = labels + fields.append("labels") + if default_table_expiration_ms is not None: + ds.default_table_expiration_ms = default_table_expiration_ms + fields.append("default_table_expiration_ms") + if not fields: + return _dataset_to_dict(ds) + updated = client._client.update_dataset(ds, fields) + return _dataset_to_dict(updated) + + +def bq_delete_dataset( + client: BQClient, + dataset_id: str, + delete_contents: bool = False, +) -> None: + """Elimina un dataset. + + Args: + client: Cliente autenticado. + dataset_id: ID del dataset. + delete_contents: Si True, elimina todas las tablas del dataset. + Si False y el dataset tiene tablas, falla. + + Raises: + google.api_core.exceptions.NotFound: Si el dataset no existe. + google.api_core.exceptions.BadRequest: Si tiene tablas y delete_contents=False. + + Example: + >>> bq_delete_dataset(client, "temp_analytics", delete_contents=True) + """ + ref = f"{client.project_id}.{dataset_id}" + client._client.delete_dataset(ref, delete_contents=delete_contents) + + +def _dataset_to_dict(ds) -> dict: + """Convierte un objeto Dataset del SDK a dict plano.""" + return { + "dataset_id": ds.dataset_id, + "project": ds.project, + "full_id": f"{ds.project}.{ds.dataset_id}", + "location": ds.location, + "description": ds.description or "", + "labels": dict(ds.labels) if ds.labels else {}, + "created": ds.created.isoformat() if ds.created else None, + "modified": ds.modified.isoformat() if ds.modified else None, + "default_table_expiration_ms": ds.default_table_expiration_ms, + } diff --git a/python/functions/bigquery/jobs.py b/python/functions/bigquery/jobs.py new file mode 100644 index 00000000..4b68e7f8 --- /dev/null +++ b/python/functions/bigquery/jobs.py @@ -0,0 +1,105 @@ +"""Gestion de jobs en Google BigQuery.""" + +from .client import BQClient + + +def bq_list_jobs( + client: BQClient, + state_filter: str = "", + max_results: int = 50, + all_users: bool = False, +) -> list[dict]: + """Lista jobs del proyecto con filtro opcional por estado. + + Args: + client: Cliente autenticado. + state_filter: Filtro: "running", "pending", "done". Vacio = todos. + max_results: Numero maximo de jobs a retornar. + all_users: Si True, lista jobs de todos los usuarios (requiere permisos). + + Returns: + Lista de dicts con: job_id, job_type, state, created, started, ended, + user_email, bytes_processed, error. + + Example: + >>> jobs = bq_list_jobs(client, state_filter="running") + >>> for j in jobs: + ... print(j["job_id"], j["state"], j["job_type"]) + """ + kwargs = {"max_results": max_results, "all_users": all_users} + if state_filter: + kwargs["state_filter"] = state_filter + return [_job_to_dict(job) for job in client._client.list_jobs(**kwargs)] + + +def bq_get_job(client: BQClient, job_id: str) -> dict: + """Obtiene detalles de un job por su ID. + + Args: + client: Cliente autenticado. + job_id: ID del job. + + Returns: + Dict con: job_id, job_type, state, created, started, ended, + user_email, bytes_processed, destination_table, query, error, errors. + + Raises: + google.api_core.exceptions.NotFound: Si el job no existe. + + Example: + >>> job = bq_get_job(client, "job_abc123") + >>> print(job["state"], job["bytes_processed"]) + >>> if job["error"]: + ... print("Error:", job["error"]) + """ + job = client._client.get_job(job_id) + result = _job_to_dict(job) + # Agregar detalles extra disponibles en get pero no en list + if hasattr(job, 'destination') and job.destination: + result["destination_table"] = str(job.destination) + if hasattr(job, 'query') and job.query: + result["query"] = job.query + if job.errors: + result["errors"] = [dict(e) for e in job.errors] + return result + + +def bq_cancel_job(client: BQClient, job_id: str) -> dict: + """Cancela un job en ejecucion. + + Args: + client: Cliente autenticado. + job_id: ID del job a cancelar. + + Returns: + Dict con: job_id, state (tras cancelacion). + + Raises: + google.api_core.exceptions.NotFound: Si el job no existe. + + Example: + >>> result = bq_cancel_job(client, "job_abc123") + >>> print(result["state"]) # "DONE" (cancelled) + """ + job = client._client.cancel_job(job_id) + return {"job_id": job.job_id, "state": job.state} + + +def _job_to_dict(job) -> dict: + """Convierte un objeto Job del SDK a dict plano.""" + result = { + "job_id": job.job_id, + "job_type": job.job_type, + "state": job.state, + "created": job.created.isoformat() if job.created else None, + "started": job.started.isoformat() if job.started else None, + "ended": job.ended.isoformat() if job.ended else None, + "user_email": job.user_email, + "error": str(job.error_result) if job.error_result else None, + } + # bytes_processed solo disponible en query jobs + try: + result["bytes_processed"] = job.total_bytes_processed + except AttributeError: + result["bytes_processed"] = None + return result diff --git a/python/functions/bigquery/queries.py b/python/functions/bigquery/queries.py new file mode 100644 index 00000000..add76df8 --- /dev/null +++ b/python/functions/bigquery/queries.py @@ -0,0 +1,397 @@ +"""Queries y operaciones de datos para Google BigQuery.""" + +from .client import BQClient +from google.cloud import bigquery + + +def bq_query( + client: BQClient, + sql: str, + params: list[dict] | None = None, + dry_run: bool = False, +) -> dict: + """Ejecuta una query SQL en BigQuery con soporte para parametros y dry-run. + + Si dry_run=True, estima el costo sin ejecutar la query. Si params no es + None, los convierte a ScalarQueryParameter usando la sintaxis @nombre en SQL. + + Args: + client: Cliente autenticado de BigQuery. + sql: Query SQL a ejecutar. Usar @nombre para referencias a parametros. + params: Lista de parametros. Cada elemento: {"name": "x", "type": "STRING", "value": "hello"}. + Tipos soportados: STRING, INT64, FLOAT64, BOOL, DATE, TIMESTAMP. + dry_run: Si True, retorna estimacion de bytes sin ejecutar. + + Returns: + Si dry_run=True: {"total_bytes_processed": N, "total_bytes_billed": N}. + Si dry_run=False: {"columns": [...], "rows": [[...], ...], + "total_rows": N, "bytes_processed": N, "cache_hit": bool}. + + Raises: + google.api_core.exceptions.GoogleAPICallError: Si la query falla. + ValueError: Si un tipo de parametro no es soportado. + + Example: + >>> result = bq_query(client, "SELECT COUNT(*) as n FROM `project.dataset.table`") + >>> print(result["rows"]) + >>> # Con parametros: + >>> result = bq_query(client, "SELECT * FROM t WHERE status = @s", + ... params=[{"name": "s", "type": "STRING", "value": "active"}]) + """ + job_config = bigquery.QueryJobConfig() + + if params: + query_params = [] + for p in params: + query_params.append( + bigquery.ScalarQueryParameter(p["name"], p["type"], p["value"]) + ) + job_config.query_parameters = query_params + + if dry_run: + job_config.dry_run = True + job_config.use_query_cache = False + job = client._client.query(sql, job_config=job_config) + return { + "total_bytes_processed": job.total_bytes_processed, + "total_bytes_billed": job.total_bytes_billed, + } + + job = client._client.query(sql, job_config=job_config) + result = job.result() + + columns = [field.name for field in result.schema] + rows = [list(row.values()) for row in result] + + return { + "columns": columns, + "rows": rows, + "total_rows": result.total_rows, + "bytes_processed": job.total_bytes_processed, + "cache_hit": job.cache_hit, + } + + +def bq_insert_rows( + client: BQClient, + dataset_id: str, + table_id: str, + rows: list[dict], +) -> dict: + """Inserta filas en una tabla BigQuery usando el streaming insert. + + Usa la API de streaming (insert_rows_json) para insercion en tiempo real. + Los errores de filas individuales se retornan en el campo "errors" sin + lanzar excepcion — revisar siempre ese campo. + + Args: + client: Cliente autenticado de BigQuery. + dataset_id: ID del dataset de destino. + table_id: ID de la tabla de destino. + rows: Lista de dicts con los datos a insertar. Cada dict debe tener + claves que coincidan con las columnas de la tabla. + + Returns: + {"inserted": N, "errors": [...]} donde errors es la lista de errores + de streaming insert retornada por la API (vacia si todo OK). + + Raises: + google.api_core.exceptions.NotFound: Si la tabla no existe. + google.api_core.exceptions.GoogleAPICallError: Si la API falla. + + Example: + >>> result = bq_insert_rows(client, "my_dataset", "my_table", [ + ... {"id": 1, "name": "Alice", "created_at": "2024-01-01"}, + ... {"id": 2, "name": "Bob", "created_at": "2024-01-02"}, + ... ]) + >>> if result["errors"]: + ... print("Errores:", result["errors"]) + """ + table_ref = client._client.dataset(dataset_id).table(table_id) + errors = client._client.insert_rows_json(table_ref, rows) + return { + "inserted": len(rows) - len(errors), + "errors": errors, + } + + +def bq_load_from_gcs( + client: BQClient, + uri: str | list[str], + dataset_id: str, + table_id: str, + source_format: str = "CSV", + write_disposition: str = "WRITE_APPEND", + autodetect: bool = True, + skip_leading_rows: int = 0, +) -> dict: + """Carga datos desde Google Cloud Storage a una tabla BigQuery. + + Configura y ejecuta un LoadJob desde uno o varios URIs de GCS. Espera + la finalizacion del job con job.result(). + + Args: + client: Cliente autenticado de BigQuery. + uri: URI de GCS. Puede ser un string ("gs://bucket/file.csv") o una + lista de strings para multiples archivos. + dataset_id: ID del dataset de destino. + table_id: ID de la tabla de destino. + source_format: Formato del archivo: "CSV", "NEWLINE_DELIMITED_JSON", + "AVRO", "PARQUET", "ORC". Default: "CSV". + write_disposition: Comportamiento si la tabla existe: "WRITE_APPEND", + "WRITE_TRUNCATE", "WRITE_EMPTY". Default: "WRITE_APPEND". + autodetect: Si True, detecta el schema automaticamente. Default: True. + skip_leading_rows: Numero de filas a saltar al inicio (cabeceras CSV). + + Returns: + {"job_id": str, "rows_loaded": N, "status": "DONE"|"FAILED"}. + + Raises: + google.api_core.exceptions.GoogleAPICallError: Si el job falla. + google.cloud.exceptions.NotFound: Si el bucket o dataset no existe. + + Example: + >>> result = bq_load_from_gcs( + ... client, "gs://my-bucket/data/*.csv", + ... "my_dataset", "my_table", + ... skip_leading_rows=1, + ... ) + >>> print(f"Cargadas {result['rows_loaded']} filas, job: {result['job_id']}") + """ + format_map = { + "CSV": bigquery.SourceFormat.CSV, + "NEWLINE_DELIMITED_JSON": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + "AVRO": bigquery.SourceFormat.AVRO, + "PARQUET": bigquery.SourceFormat.PARQUET, + "ORC": bigquery.SourceFormat.ORC, + } + disposition_map = { + "WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND, + "WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE, + "WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY, + } + + job_config = bigquery.LoadJobConfig( + source_format=format_map.get(source_format, bigquery.SourceFormat.CSV), + write_disposition=disposition_map.get(source_format, bigquery.WriteDisposition.WRITE_APPEND), + autodetect=autodetect, + skip_leading_rows=skip_leading_rows, + ) + job_config.write_disposition = disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_APPEND) + + table_ref = client._client.dataset(dataset_id).table(table_id) + uris = uri if isinstance(uri, list) else [uri] + + job = client._client.load_table_from_uri(uris, table_ref, job_config=job_config) + result = job.result() + + return { + "job_id": job.job_id, + "rows_loaded": result.output_rows, + "status": "DONE" if job.state == "DONE" and not job.errors else "FAILED", + } + + +def bq_load_from_file( + client: BQClient, + file_path: str, + dataset_id: str, + table_id: str, + source_format: str = "CSV", + write_disposition: str = "WRITE_APPEND", + autodetect: bool = True, + skip_leading_rows: int = 0, +) -> dict: + """Carga datos desde un archivo local a una tabla BigQuery. + + Abre el archivo y usa load_table_from_file del SDK. Equivalente a + bq_load_from_gcs pero para archivos en disco local. + + Args: + client: Cliente autenticado de BigQuery. + file_path: Ruta absoluta o relativa al archivo local a cargar. + dataset_id: ID del dataset de destino. + table_id: ID de la tabla de destino. + source_format: Formato del archivo: "CSV", "NEWLINE_DELIMITED_JSON", + "AVRO", "PARQUET", "ORC". Default: "CSV". + write_disposition: Comportamiento si la tabla existe: "WRITE_APPEND", + "WRITE_TRUNCATE", "WRITE_EMPTY". Default: "WRITE_APPEND". + autodetect: Si True, detecta el schema automaticamente. Default: True. + skip_leading_rows: Numero de filas a saltar al inicio (cabeceras CSV). + + Returns: + {"job_id": str, "rows_loaded": N, "status": "DONE"|"FAILED"}. + + Raises: + FileNotFoundError: Si el archivo local no existe. + google.api_core.exceptions.GoogleAPICallError: Si el job falla. + + Example: + >>> result = bq_load_from_file( + ... client, "/tmp/data.csv", + ... "my_dataset", "my_table", + ... skip_leading_rows=1, + ... ) + >>> print(f"Cargadas {result['rows_loaded']} filas") + """ + format_map = { + "CSV": bigquery.SourceFormat.CSV, + "NEWLINE_DELIMITED_JSON": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + "AVRO": bigquery.SourceFormat.AVRO, + "PARQUET": bigquery.SourceFormat.PARQUET, + "ORC": bigquery.SourceFormat.ORC, + } + disposition_map = { + "WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND, + "WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE, + "WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY, + } + + job_config = bigquery.LoadJobConfig( + source_format=format_map.get(source_format, bigquery.SourceFormat.CSV), + write_disposition=disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_APPEND), + autodetect=autodetect, + skip_leading_rows=skip_leading_rows, + ) + + table_ref = client._client.dataset(dataset_id).table(table_id) + + with open(file_path, "rb") as f: + job = client._client.load_table_from_file(f, table_ref, job_config=job_config) + + result = job.result() + + return { + "job_id": job.job_id, + "rows_loaded": result.output_rows, + "status": "DONE" if job.state == "DONE" and not job.errors else "FAILED", + } + + +def bq_export_to_gcs( + client: BQClient, + dataset_id: str, + table_id: str, + destination_uri: str, + destination_format: str = "CSV", + compression: str = "NONE", +) -> dict: + """Exporta una tabla BigQuery a Google Cloud Storage. + + Usa extract_table del SDK para crear un ExtractJob. Espera la finalizacion + del job con job.result(). + + Args: + client: Cliente autenticado de BigQuery. + dataset_id: ID del dataset de origen. + table_id: ID de la tabla de origen. + destination_uri: URI de destino en GCS. Para multiples archivos usar + wildcard: "gs://bucket/prefix_*.csv". + destination_format: Formato de exportacion: "CSV", "NEWLINE_DELIMITED_JSON", + "AVRO", "PARQUET". Default: "CSV". + compression: Compresion: "NONE", "GZIP", "SNAPPY", "DEFLATE". Default: "NONE". + + Returns: + {"job_id": str, "destination_uri": str, "status": "DONE"|"FAILED"}. + + Raises: + google.api_core.exceptions.NotFound: Si la tabla no existe. + google.api_core.exceptions.GoogleAPICallError: Si el job falla. + + Example: + >>> result = bq_export_to_gcs( + ... client, "my_dataset", "my_table", + ... "gs://my-bucket/export/data_*.csv", + ... compression="GZIP", + ... ) + >>> print(f"Exportado a {result['destination_uri']}, job: {result['job_id']}") + """ + format_map = { + "CSV": bigquery.DestinationFormat.CSV, + "NEWLINE_DELIMITED_JSON": bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON, + "AVRO": bigquery.DestinationFormat.AVRO, + "PARQUET": bigquery.DestinationFormat.PARQUET, + } + compression_map = { + "NONE": bigquery.Compression.NONE, + "GZIP": bigquery.Compression.GZIP, + "SNAPPY": bigquery.Compression.SNAPPY, + "DEFLATE": bigquery.Compression.DEFLATE, + } + + job_config = bigquery.ExtractJobConfig( + destination_format=format_map.get(destination_format, bigquery.DestinationFormat.CSV), + compression=compression_map.get(compression, bigquery.Compression.NONE), + ) + + table_ref = client._client.dataset(dataset_id).table(table_id) + job = client._client.extract_table(table_ref, destination_uri, job_config=job_config) + job.result() + + return { + "job_id": job.job_id, + "destination_uri": destination_uri, + "status": "DONE" if job.state == "DONE" and not job.errors else "FAILED", + } + + +def bq_copy_table( + client: BQClient, + source_dataset: str, + source_table: str, + dest_dataset: str, + dest_table: str, + write_disposition: str = "WRITE_EMPTY", +) -> dict: + """Copia una tabla BigQuery a otro dataset o tabla dentro del mismo proyecto. + + Usa copy_table del SDK para crear un CopyJob. Espera la finalizacion + del job con job.result(). No copia datos entre proyectos distintos. + + Args: + client: Cliente autenticado de BigQuery. + source_dataset: ID del dataset de origen. + source_table: ID de la tabla de origen. + dest_dataset: ID del dataset de destino. + dest_table: ID de la tabla de destino. + write_disposition: Comportamiento si la tabla destino existe: "WRITE_EMPTY" + (falla si existe), "WRITE_APPEND", "WRITE_TRUNCATE". + Default: "WRITE_EMPTY". + + Returns: + {"job_id": str, "status": "DONE"|"FAILED"}. + + Raises: + google.api_core.exceptions.NotFound: Si la tabla de origen no existe. + google.api_core.exceptions.Conflict: Si destino existe y write_disposition es WRITE_EMPTY. + google.api_core.exceptions.GoogleAPICallError: Si el job falla. + + Example: + >>> result = bq_copy_table( + ... client, + ... "production", "users", + ... "staging", "users_backup", + ... write_disposition="WRITE_TRUNCATE", + ... ) + >>> print(f"Copia completada: {result['status']}, job: {result['job_id']}") + """ + disposition_map = { + "WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND, + "WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE, + "WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY, + } + + job_config = bigquery.CopyJobConfig( + write_disposition=disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_EMPTY), + ) + + source_ref = client._client.dataset(source_dataset).table(source_table) + dest_ref = client._client.dataset(dest_dataset).table(dest_table) + + job = client._client.copy_table(source_ref, dest_ref, job_config=job_config) + job.result() + + return { + "job_id": job.job_id, + "status": "DONE" if job.state == "DONE" and not job.errors else "FAILED", + } diff --git a/python/functions/bigquery/routines.py b/python/functions/bigquery/routines.py new file mode 100644 index 00000000..15897215 --- /dev/null +++ b/python/functions/bigquery/routines.py @@ -0,0 +1,134 @@ +"""Gestion de routines (UDFs y stored procedures) en Google BigQuery.""" + +from .client import BQClient +from google.cloud import bigquery + + +def bq_create_routine( + client: BQClient, + dataset_id: str, + routine_id: str, + body: str, + routine_type: str = "SCALAR_FUNCTION", + language: str = "SQL", + arguments: list[dict] | None = None, + return_type: str = "", + description: str = "", +) -> dict: + """Crea una routine (UDF o stored procedure) en BigQuery. + + Args: + client: Cliente autenticado. + dataset_id: ID del dataset donde crear la routine. + routine_id: ID/nombre de la routine. + body: Cuerpo de la routine (SQL, JavaScript o Python). + routine_type: Tipo: "SCALAR_FUNCTION", "TABLE_VALUED_FUNCTION", "PROCEDURE". + language: Lenguaje: "SQL", "JAVASCRIPT", "PYTHON". + arguments: Lista de argumentos, cada uno: {"name": "x", "data_type": "STRING"}. + return_type: Tipo de retorno para funciones (ej: "STRING", "INT64"). No aplica a procedures. + description: Descripcion opcional. + + Returns: + Dict con: routine_id, dataset_id, project, routine_type, language, body, created, modified. + + Raises: + google.api_core.exceptions.Conflict: Si la routine ya existe. + + Example: + >>> bq_create_routine(client, "analytics", "double_value", + ... body="x * 2", + ... arguments=[{"name": "x", "data_type": "INT64"}], + ... return_type="INT64") + """ + ref = bigquery.RoutineReference.from_string( + f"{client.project_id}.{dataset_id}.{routine_id}" + ) + routine = bigquery.Routine(ref) + routine.type_ = routine_type + routine.language = language + routine.body = body + if description: + routine.description = description + if arguments: + routine.arguments = [ + bigquery.RoutineArgument( + name=arg["name"], + data_type=bigquery.StandardSqlDataType( + type_kind=getattr( + bigquery.StandardSqlTypeNames, arg["data_type"] + ) + ), + ) + for arg in arguments + ] + if return_type: + routine.return_type = bigquery.StandardSqlDataType( + type_kind=getattr(bigquery.StandardSqlTypeNames, return_type) + ) + created = client._client.create_routine(routine) + return _routine_to_dict(created) + + +def bq_list_routines(client: BQClient, dataset_id: str) -> list[dict]: + """Lista routines de un dataset. + + Args: + client: Cliente autenticado. + dataset_id: ID del dataset. + + Returns: + Lista de dicts con: routine_id, dataset_id, project, routine_type, language. + + Example: + >>> routines = bq_list_routines(client, "analytics") + >>> for r in routines: + ... print(r["routine_id"], r["routine_type"], r["language"]) + """ + ref = f"{client.project_id}.{dataset_id}" + return [ + { + "routine_id": r.routine_id, + "dataset_id": dataset_id, + "project": client.project_id, + "routine_type": r.type_, + "language": r.language, + } + for r in client._client.list_routines(ref) + ] + + +def bq_delete_routine( + client: BQClient, + dataset_id: str, + routine_id: str, +) -> None: + """Elimina una routine. + + Args: + client: Cliente autenticado. + dataset_id: ID del dataset. + routine_id: ID de la routine a eliminar. + + Raises: + google.api_core.exceptions.NotFound: Si la routine no existe. + + Example: + >>> bq_delete_routine(client, "analytics", "double_value") + """ + ref = f"{client.project_id}.{dataset_id}.{routine_id}" + client._client.delete_routine(ref) + + +def _routine_to_dict(routine) -> dict: + """Convierte un objeto Routine del SDK a dict plano.""" + return { + "routine_id": routine.routine_id, + "dataset_id": routine.dataset_id, + "project": routine.project, + "routine_type": routine.type_, + "language": routine.language, + "body": routine.body, + "description": routine.description or "", + "created": routine.created.isoformat() if routine.created else None, + "modified": routine.modified.isoformat() if routine.modified else None, + } diff --git a/python/functions/bigquery/tables.py b/python/functions/bigquery/tables.py new file mode 100644 index 00000000..e9feac41 --- /dev/null +++ b/python/functions/bigquery/tables.py @@ -0,0 +1,345 @@ +"""CRUD de tablas en Google BigQuery.""" + +from .client import BQClient +from google.cloud import bigquery + + +# --------------------------------------------------------------------------- +# Helpers de serializacion +# --------------------------------------------------------------------------- + +def _schema_field_to_dict(field: bigquery.SchemaField) -> dict: + """Convierte un SchemaField a dict serializable.""" + result = { + "name": field.name, + "type": field.field_type, + "mode": field.mode, + "description": field.description or "", + } + if field.fields: + result["fields"] = [_schema_field_to_dict(f) for f in field.fields] + return result + + +def _schema_to_dicts(schema: list) -> list[dict]: + """Convierte una lista de SchemaField a lista de dicts.""" + return [_schema_field_to_dict(f) for f in schema] + + +def _dict_to_schema_field(d: dict) -> bigquery.SchemaField: + """Convierte un dict a SchemaField.""" + nested = [_dict_to_schema_field(f) for f in d.get("fields", [])] + return bigquery.SchemaField( + name=d["name"], + field_type=d.get("type", "STRING"), + mode=d.get("mode", "NULLABLE"), + description=d.get("description", ""), + fields=nested, + ) + + +def _table_to_dict(table: bigquery.Table) -> dict: + """Convierte un objeto Table del SDK a dict plano serializable.""" + partitioning = None + if table.time_partitioning: + partitioning = { + "type": table.time_partitioning.type_, + "field": table.time_partitioning.field or "", + } + elif table.range_partitioning: + partitioning = { + "type": "RANGE", + "field": table.range_partitioning.field, + } + + clustering = None + if table.clustering_fields: + clustering = list(table.clustering_fields) + + return { + "table_id": table.table_id, + "dataset_id": table.dataset_id, + "project": table.project, + "full_id": table.full_table_id or f"{table.project}.{table.dataset_id}.{table.table_id}", + "schema": _schema_to_dicts(table.schema or []), + "num_rows": table.num_rows, + "num_bytes": table.num_bytes, + "created": table.created.isoformat() if table.created else None, + "modified": table.modified.isoformat() if table.modified else None, + "type": table.table_type or "TABLE", + "partitioning": partitioning, + "clustering": clustering, + "description": table.description or "", + "labels": dict(table.labels or {}), + } + + +# --------------------------------------------------------------------------- +# Funciones CRUD +# --------------------------------------------------------------------------- + +def bq_create_table( + client: BQClient, + dataset_id: str, + table_id: str, + schema: list[dict], + partitioning: dict | None = None, + clustering: list[str] | None = None, + description: str = "", + labels: dict | None = None, +) -> dict: + """Crea una tabla en BigQuery con schema, particionamiento y clustering opcionales. + + Usa `client._client.create_table()` del SDK oficial. + + Args: + client: Cliente autenticado BQClient. + dataset_id: ID del dataset donde crear la tabla. + table_id: ID (nombre) de la tabla a crear. + schema: Lista de dicts con definicion de columnas. Cada dict: + {"name": "col", "type": "STRING", "mode": "NULLABLE", "description": "..."} + Tipos validos: STRING, INTEGER, FLOAT, BOOLEAN, BYTES, DATE, DATETIME, + TIME, TIMESTAMP, RECORD, NUMERIC, BIGNUMERIC, JSON, GEOGRAPHY. + Modos: NULLABLE, REQUIRED, REPEATED. + partitioning: Dict de configuracion de particion o None. Ejemplo: + {"type": "DAY", "field": "created_at"} + Tipos: DAY, MONTH, YEAR, HOUR. Field vacio usa pseudo-columna _PARTITIONTIME. + clustering: Lista de columnas para clustering (max 4) o None. + Ejemplo: ["country", "status"] + description: Descripcion de la tabla. + labels: Etiquetas clave-valor para la tabla. Ejemplo: {"env": "prod"}. + + Returns: + Dict con metadata de la tabla creada: table_id, dataset_id, project, + full_id, schema, num_rows, num_bytes, created, modified, type, + partitioning, clustering, description, labels. + + Raises: + google.api_core.exceptions.Conflict: Si la tabla ya existe. + google.api_core.exceptions.NotFound: Si el dataset no existe. + google.api_core.exceptions.BadRequest: Si el schema es invalido. + + Example: + >>> table = bq_create_table(client, "mi_dataset", "ventas", [ + ... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, + ... {"name": "fecha", "type": "DATE", "mode": "NULLABLE"}, + ... {"name": "monto", "type": "FLOAT", "mode": "NULLABLE"}, + ... ], partitioning={"type": "MONTH", "field": "fecha"}, + ... clustering=["id"]) + >>> print(table["full_id"]) + """ + table_ref = f"{client.project_id}.{dataset_id}.{table_id}" + bq_schema = [_dict_to_schema_field(f) for f in schema] + + table = bigquery.Table(table_ref, schema=bq_schema) + + if partitioning: + table.time_partitioning = bigquery.TimePartitioning( + type_=partitioning.get("type", "DAY"), + field=partitioning.get("field") or None, + ) + + if clustering: + table.clustering_fields = clustering + + if description: + table.description = description + + if labels: + table.labels = labels + + created = client._client.create_table(table) + return _table_to_dict(created) + + +def bq_get_table(client: BQClient, dataset_id: str, table_id: str) -> dict: + """Obtiene los metadatos completos de una tabla BigQuery. + + Usa `client._client.get_table()` del SDK oficial. + + Args: + client: Cliente autenticado BQClient. + dataset_id: ID del dataset que contiene la tabla. + table_id: ID (nombre) de la tabla. + + Returns: + Dict con: table_id, dataset_id, project, full_id, schema (lista de dicts), + num_rows, num_bytes, created (ISO 8601), modified (ISO 8601), type + (TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL), partitioning (dict o None), + clustering (lista de strings o None), description, labels. + + Raises: + google.api_core.exceptions.NotFound: Si la tabla no existe. + + Example: + >>> tabla = bq_get_table(client, "mi_dataset", "ventas") + >>> print(tabla["num_rows"], tabla["schema"]) + """ + table_ref = f"{client.project_id}.{dataset_id}.{table_id}" + table = client._client.get_table(table_ref) + return _table_to_dict(table) + + +def bq_list_tables(client: BQClient, dataset_id: str) -> list[dict]: + """Lista todas las tablas de un dataset BigQuery. + + Usa `client._client.list_tables()` del SDK oficial. + + Args: + client: Cliente autenticado BQClient. + dataset_id: ID del dataset a listar. + + Returns: + Lista de dicts resumidos, uno por tabla. Cada dict contiene: + table_id, full_id, type (TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL). + + Raises: + google.api_core.exceptions.NotFound: Si el dataset no existe. + + Example: + >>> tablas = bq_list_tables(client, "mi_dataset") + >>> for t in tablas: + ... print(t["table_id"], t["type"]) + """ + dataset_ref = f"{client.project_id}.{dataset_id}" + tables = client._client.list_tables(dataset_ref) + return [ + { + "table_id": t.table_id, + "full_id": f"{t.project}.{t.dataset_id}.{t.table_id}", + "type": t.table_type or "TABLE", + } + for t in tables + ] + + +def bq_update_table( + client: BQClient, + dataset_id: str, + table_id: str, + schema: list[dict] | None = None, + description: str | None = None, + labels: dict | None = None, +) -> dict: + """Actualiza metadatos de una tabla BigQuery. + + Usa `client._client.update_table()` del SDK oficial. Solo modifica los + campos no-None. Para schema, BigQuery SOLO permite agregar columnas nuevas + al final — no se pueden eliminar ni modificar columnas existentes. + + Args: + client: Cliente autenticado BQClient. + dataset_id: ID del dataset que contiene la tabla. + table_id: ID (nombre) de la tabla a actualizar. + schema: Lista de dicts con el schema completo nuevo (incluye columnas + existentes + nuevas). Solo se permiten adiciones. None = sin cambios. + description: Nueva descripcion de la tabla. None = sin cambios. + labels: Nuevas etiquetas clave-valor. None = sin cambios. + + Returns: + Dict con la metadata actualizada de la tabla (misma estructura que bq_get_table). + + Raises: + google.api_core.exceptions.NotFound: Si la tabla no existe. + google.api_core.exceptions.BadRequest: Si se intenta eliminar columnas. + + Example: + >>> tabla = bq_update_table(client, "mi_dataset", "ventas", + ... description="Tabla de ventas actualizada", + ... labels={"env": "prod", "team": "data"}) + >>> tabla = bq_update_table(client, "mi_dataset", "ventas", + ... schema=[ + ... {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, + ... {"name": "nueva_col", "type": "STRING", "mode": "NULLABLE"}, + ... ]) + """ + table_ref = f"{client.project_id}.{dataset_id}.{table_id}" + table = client._client.get_table(table_ref) + + fields_to_update = [] + + if schema is not None: + table.schema = [_dict_to_schema_field(f) for f in schema] + fields_to_update.append("schema") + + if description is not None: + table.description = description + fields_to_update.append("description") + + if labels is not None: + table.labels = labels + fields_to_update.append("labels") + + if not fields_to_update: + return _table_to_dict(table) + + updated = client._client.update_table(table, fields_to_update) + return _table_to_dict(updated) + + +def bq_delete_table(client: BQClient, dataset_id: str, table_id: str) -> None: + """Elimina permanentemente una tabla de BigQuery. + + Usa `client._client.delete_table()` del SDK oficial. IRREVERSIBLE — no hay + papelera de reciclaje. Considerar exportar datos antes de eliminar. + + Args: + client: Cliente autenticado BQClient. + dataset_id: ID del dataset que contiene la tabla. + table_id: ID (nombre) de la tabla a eliminar. + + Raises: + google.api_core.exceptions.NotFound: Si la tabla no existe. + + Example: + >>> bq_delete_table(client, "mi_dataset", "tabla_temporal") + """ + table_ref = f"{client.project_id}.{dataset_id}.{table_id}" + client._client.delete_table(table_ref) + + +def bq_preview_rows( + client: BQClient, + dataset_id: str, + table_id: str, + max_results: int = 10, +) -> dict: + """Obtiene una muestra de filas de una tabla BigQuery sin ejecutar query. + + Usa `client._client.list_rows()` del SDK oficial — no genera costes de + procesamiento de bytes (no es una query SQL). Ideal para vista previa rapida. + + Args: + client: Cliente autenticado BQClient. + dataset_id: ID del dataset que contiene la tabla. + table_id: ID (nombre) de la tabla a previsualizar. + max_results: Numero maximo de filas a retornar. Default: 10. + + Returns: + Dict con: + - columns: lista de strings con nombres de columnas + - rows: lista de listas con valores de cada fila + - total_rows: numero total de filas en la tabla (no en el preview) + + Raises: + google.api_core.exceptions.NotFound: Si la tabla no existe. + + Example: + >>> preview = bq_preview_rows(client, "mi_dataset", "ventas", max_results=5) + >>> print(preview["columns"]) + >>> for row in preview["rows"]: + ... print(row) + >>> print(f"Total en tabla: {preview['total_rows']}") + """ + table_ref = f"{client.project_id}.{dataset_id}.{table_id}" + table = client._client.get_table(table_ref) + rows_iter = client._client.list_rows(table, max_results=max_results) + + columns = [f.name for f in rows_iter.schema] + rows = [list(row.values()) for row in rows_iter] + + return { + "columns": columns, + "rows": rows, + "total_rows": table.num_rows, + } diff --git a/python/types/infra/bq_client.md b/python/types/infra/bq_client.md new file mode 100644 index 00000000..7d10b4a4 --- /dev/null +++ b/python/types/infra/bq_client.md @@ -0,0 +1,31 @@ +--- +name: BQClient +lang: py +domain: infra +version: "1.0.0" +algebraic: product +definition: | + @dataclass + class BQClient: + project_id: str + _client: bigquery.Client +description: "Cliente para Google BigQuery. Contiene el project_id y el cliente oficial del SDK. Se obtiene con bq_auth()." +tags: [bigquery, gcp, client, google-cloud] +uses_types: [] +file_path: "python/types/infra/bq_client.py" +--- + +## Ejemplo + +```python +from bigquery import bq_auth, BQClient + +client: BQClient = bq_auth("my-project") +print(client.project_id) # "my-project" +client.close() +``` + +## Notas + +Wrapper sobre `google.cloud.bigquery.Client`. El campo `_client` es interno y no serializable. +Se usa como context manager: `with bq_auth() as client: ...` diff --git a/python/types/infra/bq_client.py b/python/types/infra/bq_client.py new file mode 100644 index 00000000..dbad8bc3 --- /dev/null +++ b/python/types/infra/bq_client.py @@ -0,0 +1,17 @@ +"""Tipo BQClient para Google BigQuery.""" + +from dataclasses import dataclass, field + +# Nota: la implementacion real esta en python/functions/bigquery/client.py +# Este archivo existe solo como referencia del tipo para el registry. + +@dataclass +class BQClient: + """Cliente para Google BigQuery. + + Attributes: + project_id: ID del proyecto GCP. + _client: Cliente oficial de BigQuery (interno, no serializable). + """ + project_id: str + _client: object = field(repr=False, default=None)