Files
fn_registry/python/functions/bigquery/queries.py
egutierrez 690e68a542 feat: add BigQuery Python functions and BQClient type
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries,
jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
2026-04-07 18:45:02 +02:00

398 lines
14 KiB
Python

"""Queries y operaciones de datos para Google BigQuery."""
from .client import BQClient
from google.cloud import bigquery
def bq_query(
client: BQClient,
sql: str,
params: list[dict] | None = None,
dry_run: bool = False,
) -> dict:
"""Ejecuta una query SQL en BigQuery con soporte para parametros y dry-run.
Si dry_run=True, estima el costo sin ejecutar la query. Si params no es
None, los convierte a ScalarQueryParameter usando la sintaxis @nombre en SQL.
Args:
client: Cliente autenticado de BigQuery.
sql: Query SQL a ejecutar. Usar @nombre para referencias a parametros.
params: Lista de parametros. Cada elemento: {"name": "x", "type": "STRING", "value": "hello"}.
Tipos soportados: STRING, INT64, FLOAT64, BOOL, DATE, TIMESTAMP.
dry_run: Si True, retorna estimacion de bytes sin ejecutar.
Returns:
Si dry_run=True: {"total_bytes_processed": N, "total_bytes_billed": N}.
Si dry_run=False: {"columns": [...], "rows": [[...], ...],
"total_rows": N, "bytes_processed": N, "cache_hit": bool}.
Raises:
google.api_core.exceptions.GoogleAPICallError: Si la query falla.
ValueError: Si un tipo de parametro no es soportado.
Example:
>>> result = bq_query(client, "SELECT COUNT(*) as n FROM `project.dataset.table`")
>>> print(result["rows"])
>>> # Con parametros:
>>> result = bq_query(client, "SELECT * FROM t WHERE status = @s",
... params=[{"name": "s", "type": "STRING", "value": "active"}])
"""
job_config = bigquery.QueryJobConfig()
if params:
query_params = []
for p in params:
query_params.append(
bigquery.ScalarQueryParameter(p["name"], p["type"], p["value"])
)
job_config.query_parameters = query_params
if dry_run:
job_config.dry_run = True
job_config.use_query_cache = False
job = client._client.query(sql, job_config=job_config)
return {
"total_bytes_processed": job.total_bytes_processed,
"total_bytes_billed": job.total_bytes_billed,
}
job = client._client.query(sql, job_config=job_config)
result = job.result()
columns = [field.name for field in result.schema]
rows = [list(row.values()) for row in result]
return {
"columns": columns,
"rows": rows,
"total_rows": result.total_rows,
"bytes_processed": job.total_bytes_processed,
"cache_hit": job.cache_hit,
}
def bq_insert_rows(
client: BQClient,
dataset_id: str,
table_id: str,
rows: list[dict],
) -> dict:
"""Inserta filas en una tabla BigQuery usando el streaming insert.
Usa la API de streaming (insert_rows_json) para insercion en tiempo real.
Los errores de filas individuales se retornan en el campo "errors" sin
lanzar excepcion — revisar siempre ese campo.
Args:
client: Cliente autenticado de BigQuery.
dataset_id: ID del dataset de destino.
table_id: ID de la tabla de destino.
rows: Lista de dicts con los datos a insertar. Cada dict debe tener
claves que coincidan con las columnas de la tabla.
Returns:
{"inserted": N, "errors": [...]} donde errors es la lista de errores
de streaming insert retornada por la API (vacia si todo OK).
Raises:
google.api_core.exceptions.NotFound: Si la tabla no existe.
google.api_core.exceptions.GoogleAPICallError: Si la API falla.
Example:
>>> result = bq_insert_rows(client, "my_dataset", "my_table", [
... {"id": 1, "name": "Alice", "created_at": "2024-01-01"},
... {"id": 2, "name": "Bob", "created_at": "2024-01-02"},
... ])
>>> if result["errors"]:
... print("Errores:", result["errors"])
"""
table_ref = client._client.dataset(dataset_id).table(table_id)
errors = client._client.insert_rows_json(table_ref, rows)
return {
"inserted": len(rows) - len(errors),
"errors": errors,
}
def bq_load_from_gcs(
client: BQClient,
uri: str | list[str],
dataset_id: str,
table_id: str,
source_format: str = "CSV",
write_disposition: str = "WRITE_APPEND",
autodetect: bool = True,
skip_leading_rows: int = 0,
) -> dict:
"""Carga datos desde Google Cloud Storage a una tabla BigQuery.
Configura y ejecuta un LoadJob desde uno o varios URIs de GCS. Espera
la finalizacion del job con job.result().
Args:
client: Cliente autenticado de BigQuery.
uri: URI de GCS. Puede ser un string ("gs://bucket/file.csv") o una
lista de strings para multiples archivos.
dataset_id: ID del dataset de destino.
table_id: ID de la tabla de destino.
source_format: Formato del archivo: "CSV", "NEWLINE_DELIMITED_JSON",
"AVRO", "PARQUET", "ORC". Default: "CSV".
write_disposition: Comportamiento si la tabla existe: "WRITE_APPEND",
"WRITE_TRUNCATE", "WRITE_EMPTY". Default: "WRITE_APPEND".
autodetect: Si True, detecta el schema automaticamente. Default: True.
skip_leading_rows: Numero de filas a saltar al inicio (cabeceras CSV).
Returns:
{"job_id": str, "rows_loaded": N, "status": "DONE"|"FAILED"}.
Raises:
google.api_core.exceptions.GoogleAPICallError: Si el job falla.
google.cloud.exceptions.NotFound: Si el bucket o dataset no existe.
Example:
>>> result = bq_load_from_gcs(
... client, "gs://my-bucket/data/*.csv",
... "my_dataset", "my_table",
... skip_leading_rows=1,
... )
>>> print(f"Cargadas {result['rows_loaded']} filas, job: {result['job_id']}")
"""
format_map = {
"CSV": bigquery.SourceFormat.CSV,
"NEWLINE_DELIMITED_JSON": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
"AVRO": bigquery.SourceFormat.AVRO,
"PARQUET": bigquery.SourceFormat.PARQUET,
"ORC": bigquery.SourceFormat.ORC,
}
disposition_map = {
"WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND,
"WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE,
"WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY,
}
job_config = bigquery.LoadJobConfig(
source_format=format_map.get(source_format, bigquery.SourceFormat.CSV),
write_disposition=disposition_map.get(source_format, bigquery.WriteDisposition.WRITE_APPEND),
autodetect=autodetect,
skip_leading_rows=skip_leading_rows,
)
job_config.write_disposition = disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_APPEND)
table_ref = client._client.dataset(dataset_id).table(table_id)
uris = uri if isinstance(uri, list) else [uri]
job = client._client.load_table_from_uri(uris, table_ref, job_config=job_config)
result = job.result()
return {
"job_id": job.job_id,
"rows_loaded": result.output_rows,
"status": "DONE" if job.state == "DONE" and not job.errors else "FAILED",
}
def bq_load_from_file(
client: BQClient,
file_path: str,
dataset_id: str,
table_id: str,
source_format: str = "CSV",
write_disposition: str = "WRITE_APPEND",
autodetect: bool = True,
skip_leading_rows: int = 0,
) -> dict:
"""Carga datos desde un archivo local a una tabla BigQuery.
Abre el archivo y usa load_table_from_file del SDK. Equivalente a
bq_load_from_gcs pero para archivos en disco local.
Args:
client: Cliente autenticado de BigQuery.
file_path: Ruta absoluta o relativa al archivo local a cargar.
dataset_id: ID del dataset de destino.
table_id: ID de la tabla de destino.
source_format: Formato del archivo: "CSV", "NEWLINE_DELIMITED_JSON",
"AVRO", "PARQUET", "ORC". Default: "CSV".
write_disposition: Comportamiento si la tabla existe: "WRITE_APPEND",
"WRITE_TRUNCATE", "WRITE_EMPTY". Default: "WRITE_APPEND".
autodetect: Si True, detecta el schema automaticamente. Default: True.
skip_leading_rows: Numero de filas a saltar al inicio (cabeceras CSV).
Returns:
{"job_id": str, "rows_loaded": N, "status": "DONE"|"FAILED"}.
Raises:
FileNotFoundError: Si el archivo local no existe.
google.api_core.exceptions.GoogleAPICallError: Si el job falla.
Example:
>>> result = bq_load_from_file(
... client, "/tmp/data.csv",
... "my_dataset", "my_table",
... skip_leading_rows=1,
... )
>>> print(f"Cargadas {result['rows_loaded']} filas")
"""
format_map = {
"CSV": bigquery.SourceFormat.CSV,
"NEWLINE_DELIMITED_JSON": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
"AVRO": bigquery.SourceFormat.AVRO,
"PARQUET": bigquery.SourceFormat.PARQUET,
"ORC": bigquery.SourceFormat.ORC,
}
disposition_map = {
"WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND,
"WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE,
"WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY,
}
job_config = bigquery.LoadJobConfig(
source_format=format_map.get(source_format, bigquery.SourceFormat.CSV),
write_disposition=disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_APPEND),
autodetect=autodetect,
skip_leading_rows=skip_leading_rows,
)
table_ref = client._client.dataset(dataset_id).table(table_id)
with open(file_path, "rb") as f:
job = client._client.load_table_from_file(f, table_ref, job_config=job_config)
result = job.result()
return {
"job_id": job.job_id,
"rows_loaded": result.output_rows,
"status": "DONE" if job.state == "DONE" and not job.errors else "FAILED",
}
def bq_export_to_gcs(
client: BQClient,
dataset_id: str,
table_id: str,
destination_uri: str,
destination_format: str = "CSV",
compression: str = "NONE",
) -> dict:
"""Exporta una tabla BigQuery a Google Cloud Storage.
Usa extract_table del SDK para crear un ExtractJob. Espera la finalizacion
del job con job.result().
Args:
client: Cliente autenticado de BigQuery.
dataset_id: ID del dataset de origen.
table_id: ID de la tabla de origen.
destination_uri: URI de destino en GCS. Para multiples archivos usar
wildcard: "gs://bucket/prefix_*.csv".
destination_format: Formato de exportacion: "CSV", "NEWLINE_DELIMITED_JSON",
"AVRO", "PARQUET". Default: "CSV".
compression: Compresion: "NONE", "GZIP", "SNAPPY", "DEFLATE". Default: "NONE".
Returns:
{"job_id": str, "destination_uri": str, "status": "DONE"|"FAILED"}.
Raises:
google.api_core.exceptions.NotFound: Si la tabla no existe.
google.api_core.exceptions.GoogleAPICallError: Si el job falla.
Example:
>>> result = bq_export_to_gcs(
... client, "my_dataset", "my_table",
... "gs://my-bucket/export/data_*.csv",
... compression="GZIP",
... )
>>> print(f"Exportado a {result['destination_uri']}, job: {result['job_id']}")
"""
format_map = {
"CSV": bigquery.DestinationFormat.CSV,
"NEWLINE_DELIMITED_JSON": bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON,
"AVRO": bigquery.DestinationFormat.AVRO,
"PARQUET": bigquery.DestinationFormat.PARQUET,
}
compression_map = {
"NONE": bigquery.Compression.NONE,
"GZIP": bigquery.Compression.GZIP,
"SNAPPY": bigquery.Compression.SNAPPY,
"DEFLATE": bigquery.Compression.DEFLATE,
}
job_config = bigquery.ExtractJobConfig(
destination_format=format_map.get(destination_format, bigquery.DestinationFormat.CSV),
compression=compression_map.get(compression, bigquery.Compression.NONE),
)
table_ref = client._client.dataset(dataset_id).table(table_id)
job = client._client.extract_table(table_ref, destination_uri, job_config=job_config)
job.result()
return {
"job_id": job.job_id,
"destination_uri": destination_uri,
"status": "DONE" if job.state == "DONE" and not job.errors else "FAILED",
}
def bq_copy_table(
client: BQClient,
source_dataset: str,
source_table: str,
dest_dataset: str,
dest_table: str,
write_disposition: str = "WRITE_EMPTY",
) -> dict:
"""Copia una tabla BigQuery a otro dataset o tabla dentro del mismo proyecto.
Usa copy_table del SDK para crear un CopyJob. Espera la finalizacion
del job con job.result(). No copia datos entre proyectos distintos.
Args:
client: Cliente autenticado de BigQuery.
source_dataset: ID del dataset de origen.
source_table: ID de la tabla de origen.
dest_dataset: ID del dataset de destino.
dest_table: ID de la tabla de destino.
write_disposition: Comportamiento si la tabla destino existe: "WRITE_EMPTY"
(falla si existe), "WRITE_APPEND", "WRITE_TRUNCATE".
Default: "WRITE_EMPTY".
Returns:
{"job_id": str, "status": "DONE"|"FAILED"}.
Raises:
google.api_core.exceptions.NotFound: Si la tabla de origen no existe.
google.api_core.exceptions.Conflict: Si destino existe y write_disposition es WRITE_EMPTY.
google.api_core.exceptions.GoogleAPICallError: Si el job falla.
Example:
>>> result = bq_copy_table(
... client,
... "production", "users",
... "staging", "users_backup",
... write_disposition="WRITE_TRUNCATE",
... )
>>> print(f"Copia completada: {result['status']}, job: {result['job_id']}")
"""
disposition_map = {
"WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND,
"WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE,
"WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY,
}
job_config = bigquery.CopyJobConfig(
write_disposition=disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_EMPTY),
)
source_ref = client._client.dataset(source_dataset).table(source_table)
dest_ref = client._client.dataset(dest_dataset).table(dest_table)
job = client._client.copy_table(source_ref, dest_ref, job_config=job_config)
job.result()
return {
"job_id": job.job_id,
"status": "DONE" if job.state == "DONE" and not job.errors else "FAILED",
}