690e68a542
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries, jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
398 lines
14 KiB
Python
398 lines
14 KiB
Python
"""Queries y operaciones de datos para Google BigQuery."""
|
|
|
|
from .client import BQClient
|
|
from google.cloud import bigquery
|
|
|
|
|
|
def bq_query(
|
|
client: BQClient,
|
|
sql: str,
|
|
params: list[dict] | None = None,
|
|
dry_run: bool = False,
|
|
) -> dict:
|
|
"""Ejecuta una query SQL en BigQuery con soporte para parametros y dry-run.
|
|
|
|
Si dry_run=True, estima el costo sin ejecutar la query. Si params no es
|
|
None, los convierte a ScalarQueryParameter usando la sintaxis @nombre en SQL.
|
|
|
|
Args:
|
|
client: Cliente autenticado de BigQuery.
|
|
sql: Query SQL a ejecutar. Usar @nombre para referencias a parametros.
|
|
params: Lista de parametros. Cada elemento: {"name": "x", "type": "STRING", "value": "hello"}.
|
|
Tipos soportados: STRING, INT64, FLOAT64, BOOL, DATE, TIMESTAMP.
|
|
dry_run: Si True, retorna estimacion de bytes sin ejecutar.
|
|
|
|
Returns:
|
|
Si dry_run=True: {"total_bytes_processed": N, "total_bytes_billed": N}.
|
|
Si dry_run=False: {"columns": [...], "rows": [[...], ...],
|
|
"total_rows": N, "bytes_processed": N, "cache_hit": bool}.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.GoogleAPICallError: Si la query falla.
|
|
ValueError: Si un tipo de parametro no es soportado.
|
|
|
|
Example:
|
|
>>> result = bq_query(client, "SELECT COUNT(*) as n FROM `project.dataset.table`")
|
|
>>> print(result["rows"])
|
|
>>> # Con parametros:
|
|
>>> result = bq_query(client, "SELECT * FROM t WHERE status = @s",
|
|
... params=[{"name": "s", "type": "STRING", "value": "active"}])
|
|
"""
|
|
job_config = bigquery.QueryJobConfig()
|
|
|
|
if params:
|
|
query_params = []
|
|
for p in params:
|
|
query_params.append(
|
|
bigquery.ScalarQueryParameter(p["name"], p["type"], p["value"])
|
|
)
|
|
job_config.query_parameters = query_params
|
|
|
|
if dry_run:
|
|
job_config.dry_run = True
|
|
job_config.use_query_cache = False
|
|
job = client._client.query(sql, job_config=job_config)
|
|
return {
|
|
"total_bytes_processed": job.total_bytes_processed,
|
|
"total_bytes_billed": job.total_bytes_billed,
|
|
}
|
|
|
|
job = client._client.query(sql, job_config=job_config)
|
|
result = job.result()
|
|
|
|
columns = [field.name for field in result.schema]
|
|
rows = [list(row.values()) for row in result]
|
|
|
|
return {
|
|
"columns": columns,
|
|
"rows": rows,
|
|
"total_rows": result.total_rows,
|
|
"bytes_processed": job.total_bytes_processed,
|
|
"cache_hit": job.cache_hit,
|
|
}
|
|
|
|
|
|
def bq_insert_rows(
|
|
client: BQClient,
|
|
dataset_id: str,
|
|
table_id: str,
|
|
rows: list[dict],
|
|
) -> dict:
|
|
"""Inserta filas en una tabla BigQuery usando el streaming insert.
|
|
|
|
Usa la API de streaming (insert_rows_json) para insercion en tiempo real.
|
|
Los errores de filas individuales se retornan en el campo "errors" sin
|
|
lanzar excepcion — revisar siempre ese campo.
|
|
|
|
Args:
|
|
client: Cliente autenticado de BigQuery.
|
|
dataset_id: ID del dataset de destino.
|
|
table_id: ID de la tabla de destino.
|
|
rows: Lista de dicts con los datos a insertar. Cada dict debe tener
|
|
claves que coincidan con las columnas de la tabla.
|
|
|
|
Returns:
|
|
{"inserted": N, "errors": [...]} donde errors es la lista de errores
|
|
de streaming insert retornada por la API (vacia si todo OK).
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
|
google.api_core.exceptions.GoogleAPICallError: Si la API falla.
|
|
|
|
Example:
|
|
>>> result = bq_insert_rows(client, "my_dataset", "my_table", [
|
|
... {"id": 1, "name": "Alice", "created_at": "2024-01-01"},
|
|
... {"id": 2, "name": "Bob", "created_at": "2024-01-02"},
|
|
... ])
|
|
>>> if result["errors"]:
|
|
... print("Errores:", result["errors"])
|
|
"""
|
|
table_ref = client._client.dataset(dataset_id).table(table_id)
|
|
errors = client._client.insert_rows_json(table_ref, rows)
|
|
return {
|
|
"inserted": len(rows) - len(errors),
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
def bq_load_from_gcs(
|
|
client: BQClient,
|
|
uri: str | list[str],
|
|
dataset_id: str,
|
|
table_id: str,
|
|
source_format: str = "CSV",
|
|
write_disposition: str = "WRITE_APPEND",
|
|
autodetect: bool = True,
|
|
skip_leading_rows: int = 0,
|
|
) -> dict:
|
|
"""Carga datos desde Google Cloud Storage a una tabla BigQuery.
|
|
|
|
Configura y ejecuta un LoadJob desde uno o varios URIs de GCS. Espera
|
|
la finalizacion del job con job.result().
|
|
|
|
Args:
|
|
client: Cliente autenticado de BigQuery.
|
|
uri: URI de GCS. Puede ser un string ("gs://bucket/file.csv") o una
|
|
lista de strings para multiples archivos.
|
|
dataset_id: ID del dataset de destino.
|
|
table_id: ID de la tabla de destino.
|
|
source_format: Formato del archivo: "CSV", "NEWLINE_DELIMITED_JSON",
|
|
"AVRO", "PARQUET", "ORC". Default: "CSV".
|
|
write_disposition: Comportamiento si la tabla existe: "WRITE_APPEND",
|
|
"WRITE_TRUNCATE", "WRITE_EMPTY". Default: "WRITE_APPEND".
|
|
autodetect: Si True, detecta el schema automaticamente. Default: True.
|
|
skip_leading_rows: Numero de filas a saltar al inicio (cabeceras CSV).
|
|
|
|
Returns:
|
|
{"job_id": str, "rows_loaded": N, "status": "DONE"|"FAILED"}.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.GoogleAPICallError: Si el job falla.
|
|
google.cloud.exceptions.NotFound: Si el bucket o dataset no existe.
|
|
|
|
Example:
|
|
>>> result = bq_load_from_gcs(
|
|
... client, "gs://my-bucket/data/*.csv",
|
|
... "my_dataset", "my_table",
|
|
... skip_leading_rows=1,
|
|
... )
|
|
>>> print(f"Cargadas {result['rows_loaded']} filas, job: {result['job_id']}")
|
|
"""
|
|
format_map = {
|
|
"CSV": bigquery.SourceFormat.CSV,
|
|
"NEWLINE_DELIMITED_JSON": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
|
|
"AVRO": bigquery.SourceFormat.AVRO,
|
|
"PARQUET": bigquery.SourceFormat.PARQUET,
|
|
"ORC": bigquery.SourceFormat.ORC,
|
|
}
|
|
disposition_map = {
|
|
"WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND,
|
|
"WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE,
|
|
"WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY,
|
|
}
|
|
|
|
job_config = bigquery.LoadJobConfig(
|
|
source_format=format_map.get(source_format, bigquery.SourceFormat.CSV),
|
|
write_disposition=disposition_map.get(source_format, bigquery.WriteDisposition.WRITE_APPEND),
|
|
autodetect=autodetect,
|
|
skip_leading_rows=skip_leading_rows,
|
|
)
|
|
job_config.write_disposition = disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_APPEND)
|
|
|
|
table_ref = client._client.dataset(dataset_id).table(table_id)
|
|
uris = uri if isinstance(uri, list) else [uri]
|
|
|
|
job = client._client.load_table_from_uri(uris, table_ref, job_config=job_config)
|
|
result = job.result()
|
|
|
|
return {
|
|
"job_id": job.job_id,
|
|
"rows_loaded": result.output_rows,
|
|
"status": "DONE" if job.state == "DONE" and not job.errors else "FAILED",
|
|
}
|
|
|
|
|
|
def bq_load_from_file(
|
|
client: BQClient,
|
|
file_path: str,
|
|
dataset_id: str,
|
|
table_id: str,
|
|
source_format: str = "CSV",
|
|
write_disposition: str = "WRITE_APPEND",
|
|
autodetect: bool = True,
|
|
skip_leading_rows: int = 0,
|
|
) -> dict:
|
|
"""Carga datos desde un archivo local a una tabla BigQuery.
|
|
|
|
Abre el archivo y usa load_table_from_file del SDK. Equivalente a
|
|
bq_load_from_gcs pero para archivos en disco local.
|
|
|
|
Args:
|
|
client: Cliente autenticado de BigQuery.
|
|
file_path: Ruta absoluta o relativa al archivo local a cargar.
|
|
dataset_id: ID del dataset de destino.
|
|
table_id: ID de la tabla de destino.
|
|
source_format: Formato del archivo: "CSV", "NEWLINE_DELIMITED_JSON",
|
|
"AVRO", "PARQUET", "ORC". Default: "CSV".
|
|
write_disposition: Comportamiento si la tabla existe: "WRITE_APPEND",
|
|
"WRITE_TRUNCATE", "WRITE_EMPTY". Default: "WRITE_APPEND".
|
|
autodetect: Si True, detecta el schema automaticamente. Default: True.
|
|
skip_leading_rows: Numero de filas a saltar al inicio (cabeceras CSV).
|
|
|
|
Returns:
|
|
{"job_id": str, "rows_loaded": N, "status": "DONE"|"FAILED"}.
|
|
|
|
Raises:
|
|
FileNotFoundError: Si el archivo local no existe.
|
|
google.api_core.exceptions.GoogleAPICallError: Si el job falla.
|
|
|
|
Example:
|
|
>>> result = bq_load_from_file(
|
|
... client, "/tmp/data.csv",
|
|
... "my_dataset", "my_table",
|
|
... skip_leading_rows=1,
|
|
... )
|
|
>>> print(f"Cargadas {result['rows_loaded']} filas")
|
|
"""
|
|
format_map = {
|
|
"CSV": bigquery.SourceFormat.CSV,
|
|
"NEWLINE_DELIMITED_JSON": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
|
|
"AVRO": bigquery.SourceFormat.AVRO,
|
|
"PARQUET": bigquery.SourceFormat.PARQUET,
|
|
"ORC": bigquery.SourceFormat.ORC,
|
|
}
|
|
disposition_map = {
|
|
"WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND,
|
|
"WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE,
|
|
"WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY,
|
|
}
|
|
|
|
job_config = bigquery.LoadJobConfig(
|
|
source_format=format_map.get(source_format, bigquery.SourceFormat.CSV),
|
|
write_disposition=disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_APPEND),
|
|
autodetect=autodetect,
|
|
skip_leading_rows=skip_leading_rows,
|
|
)
|
|
|
|
table_ref = client._client.dataset(dataset_id).table(table_id)
|
|
|
|
with open(file_path, "rb") as f:
|
|
job = client._client.load_table_from_file(f, table_ref, job_config=job_config)
|
|
|
|
result = job.result()
|
|
|
|
return {
|
|
"job_id": job.job_id,
|
|
"rows_loaded": result.output_rows,
|
|
"status": "DONE" if job.state == "DONE" and not job.errors else "FAILED",
|
|
}
|
|
|
|
|
|
def bq_export_to_gcs(
|
|
client: BQClient,
|
|
dataset_id: str,
|
|
table_id: str,
|
|
destination_uri: str,
|
|
destination_format: str = "CSV",
|
|
compression: str = "NONE",
|
|
) -> dict:
|
|
"""Exporta una tabla BigQuery a Google Cloud Storage.
|
|
|
|
Usa extract_table del SDK para crear un ExtractJob. Espera la finalizacion
|
|
del job con job.result().
|
|
|
|
Args:
|
|
client: Cliente autenticado de BigQuery.
|
|
dataset_id: ID del dataset de origen.
|
|
table_id: ID de la tabla de origen.
|
|
destination_uri: URI de destino en GCS. Para multiples archivos usar
|
|
wildcard: "gs://bucket/prefix_*.csv".
|
|
destination_format: Formato de exportacion: "CSV", "NEWLINE_DELIMITED_JSON",
|
|
"AVRO", "PARQUET". Default: "CSV".
|
|
compression: Compresion: "NONE", "GZIP", "SNAPPY", "DEFLATE". Default: "NONE".
|
|
|
|
Returns:
|
|
{"job_id": str, "destination_uri": str, "status": "DONE"|"FAILED"}.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si la tabla no existe.
|
|
google.api_core.exceptions.GoogleAPICallError: Si el job falla.
|
|
|
|
Example:
|
|
>>> result = bq_export_to_gcs(
|
|
... client, "my_dataset", "my_table",
|
|
... "gs://my-bucket/export/data_*.csv",
|
|
... compression="GZIP",
|
|
... )
|
|
>>> print(f"Exportado a {result['destination_uri']}, job: {result['job_id']}")
|
|
"""
|
|
format_map = {
|
|
"CSV": bigquery.DestinationFormat.CSV,
|
|
"NEWLINE_DELIMITED_JSON": bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON,
|
|
"AVRO": bigquery.DestinationFormat.AVRO,
|
|
"PARQUET": bigquery.DestinationFormat.PARQUET,
|
|
}
|
|
compression_map = {
|
|
"NONE": bigquery.Compression.NONE,
|
|
"GZIP": bigquery.Compression.GZIP,
|
|
"SNAPPY": bigquery.Compression.SNAPPY,
|
|
"DEFLATE": bigquery.Compression.DEFLATE,
|
|
}
|
|
|
|
job_config = bigquery.ExtractJobConfig(
|
|
destination_format=format_map.get(destination_format, bigquery.DestinationFormat.CSV),
|
|
compression=compression_map.get(compression, bigquery.Compression.NONE),
|
|
)
|
|
|
|
table_ref = client._client.dataset(dataset_id).table(table_id)
|
|
job = client._client.extract_table(table_ref, destination_uri, job_config=job_config)
|
|
job.result()
|
|
|
|
return {
|
|
"job_id": job.job_id,
|
|
"destination_uri": destination_uri,
|
|
"status": "DONE" if job.state == "DONE" and not job.errors else "FAILED",
|
|
}
|
|
|
|
|
|
def bq_copy_table(
|
|
client: BQClient,
|
|
source_dataset: str,
|
|
source_table: str,
|
|
dest_dataset: str,
|
|
dest_table: str,
|
|
write_disposition: str = "WRITE_EMPTY",
|
|
) -> dict:
|
|
"""Copia una tabla BigQuery a otro dataset o tabla dentro del mismo proyecto.
|
|
|
|
Usa copy_table del SDK para crear un CopyJob. Espera la finalizacion
|
|
del job con job.result(). No copia datos entre proyectos distintos.
|
|
|
|
Args:
|
|
client: Cliente autenticado de BigQuery.
|
|
source_dataset: ID del dataset de origen.
|
|
source_table: ID de la tabla de origen.
|
|
dest_dataset: ID del dataset de destino.
|
|
dest_table: ID de la tabla de destino.
|
|
write_disposition: Comportamiento si la tabla destino existe: "WRITE_EMPTY"
|
|
(falla si existe), "WRITE_APPEND", "WRITE_TRUNCATE".
|
|
Default: "WRITE_EMPTY".
|
|
|
|
Returns:
|
|
{"job_id": str, "status": "DONE"|"FAILED"}.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si la tabla de origen no existe.
|
|
google.api_core.exceptions.Conflict: Si destino existe y write_disposition es WRITE_EMPTY.
|
|
google.api_core.exceptions.GoogleAPICallError: Si el job falla.
|
|
|
|
Example:
|
|
>>> result = bq_copy_table(
|
|
... client,
|
|
... "production", "users",
|
|
... "staging", "users_backup",
|
|
... write_disposition="WRITE_TRUNCATE",
|
|
... )
|
|
>>> print(f"Copia completada: {result['status']}, job: {result['job_id']}")
|
|
"""
|
|
disposition_map = {
|
|
"WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND,
|
|
"WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE,
|
|
"WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY,
|
|
}
|
|
|
|
job_config = bigquery.CopyJobConfig(
|
|
write_disposition=disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_EMPTY),
|
|
)
|
|
|
|
source_ref = client._client.dataset(source_dataset).table(source_table)
|
|
dest_ref = client._client.dataset(dest_dataset).table(dest_table)
|
|
|
|
job = client._client.copy_table(source_ref, dest_ref, job_config=job_config)
|
|
job.result()
|
|
|
|
return {
|
|
"job_id": job.job_id,
|
|
"status": "DONE" if job.state == "DONE" and not job.errors else "FAILED",
|
|
}
|