"""Queries y operaciones de datos para Google BigQuery.""" from .client import BQClient from google.cloud import bigquery def bq_query( client: BQClient, sql: str, params: list[dict] | None = None, dry_run: bool = False, ) -> dict: """Ejecuta una query SQL en BigQuery con soporte para parametros y dry-run. Si dry_run=True, estima el costo sin ejecutar la query. Si params no es None, los convierte a ScalarQueryParameter usando la sintaxis @nombre en SQL. Args: client: Cliente autenticado de BigQuery. sql: Query SQL a ejecutar. Usar @nombre para referencias a parametros. params: Lista de parametros. Cada elemento: {"name": "x", "type": "STRING", "value": "hello"}. Tipos soportados: STRING, INT64, FLOAT64, BOOL, DATE, TIMESTAMP. dry_run: Si True, retorna estimacion de bytes sin ejecutar. Returns: Si dry_run=True: {"total_bytes_processed": N, "total_bytes_billed": N}. Si dry_run=False: {"columns": [...], "rows": [[...], ...], "total_rows": N, "bytes_processed": N, "cache_hit": bool}. Raises: google.api_core.exceptions.GoogleAPICallError: Si la query falla. ValueError: Si un tipo de parametro no es soportado. Example: >>> result = bq_query(client, "SELECT COUNT(*) as n FROM `project.dataset.table`") >>> print(result["rows"]) >>> # Con parametros: >>> result = bq_query(client, "SELECT * FROM t WHERE status = @s", ... params=[{"name": "s", "type": "STRING", "value": "active"}]) """ job_config = bigquery.QueryJobConfig() if params: query_params = [] for p in params: query_params.append( bigquery.ScalarQueryParameter(p["name"], p["type"], p["value"]) ) job_config.query_parameters = query_params if dry_run: job_config.dry_run = True job_config.use_query_cache = False job = client._client.query(sql, job_config=job_config) return { "total_bytes_processed": job.total_bytes_processed, "total_bytes_billed": job.total_bytes_billed, } job = client._client.query(sql, job_config=job_config) result = job.result() columns = [field.name for field in result.schema] rows = [list(row.values()) for row in result] return { "columns": columns, "rows": rows, "total_rows": result.total_rows, "bytes_processed": job.total_bytes_processed, "cache_hit": job.cache_hit, } def bq_insert_rows( client: BQClient, dataset_id: str, table_id: str, rows: list[dict], ) -> dict: """Inserta filas en una tabla BigQuery usando el streaming insert. Usa la API de streaming (insert_rows_json) para insercion en tiempo real. Los errores de filas individuales se retornan en el campo "errors" sin lanzar excepcion — revisar siempre ese campo. Args: client: Cliente autenticado de BigQuery. dataset_id: ID del dataset de destino. table_id: ID de la tabla de destino. rows: Lista de dicts con los datos a insertar. Cada dict debe tener claves que coincidan con las columnas de la tabla. Returns: {"inserted": N, "errors": [...]} donde errors es la lista de errores de streaming insert retornada por la API (vacia si todo OK). Raises: google.api_core.exceptions.NotFound: Si la tabla no existe. google.api_core.exceptions.GoogleAPICallError: Si la API falla. Example: >>> result = bq_insert_rows(client, "my_dataset", "my_table", [ ... {"id": 1, "name": "Alice", "created_at": "2024-01-01"}, ... {"id": 2, "name": "Bob", "created_at": "2024-01-02"}, ... ]) >>> if result["errors"]: ... print("Errores:", result["errors"]) """ table_ref = client._client.dataset(dataset_id).table(table_id) errors = client._client.insert_rows_json(table_ref, rows) return { "inserted": len(rows) - len(errors), "errors": errors, } def bq_load_from_gcs( client: BQClient, uri: str | list[str], dataset_id: str, table_id: str, source_format: str = "CSV", write_disposition: str = "WRITE_APPEND", autodetect: bool = True, skip_leading_rows: int = 0, ) -> dict: """Carga datos desde Google Cloud Storage a una tabla BigQuery. Configura y ejecuta un LoadJob desde uno o varios URIs de GCS. Espera la finalizacion del job con job.result(). Args: client: Cliente autenticado de BigQuery. uri: URI de GCS. Puede ser un string ("gs://bucket/file.csv") o una lista de strings para multiples archivos. dataset_id: ID del dataset de destino. table_id: ID de la tabla de destino. source_format: Formato del archivo: "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "PARQUET", "ORC". Default: "CSV". write_disposition: Comportamiento si la tabla existe: "WRITE_APPEND", "WRITE_TRUNCATE", "WRITE_EMPTY". Default: "WRITE_APPEND". autodetect: Si True, detecta el schema automaticamente. Default: True. skip_leading_rows: Numero de filas a saltar al inicio (cabeceras CSV). Returns: {"job_id": str, "rows_loaded": N, "status": "DONE"|"FAILED"}. Raises: google.api_core.exceptions.GoogleAPICallError: Si el job falla. google.cloud.exceptions.NotFound: Si el bucket o dataset no existe. Example: >>> result = bq_load_from_gcs( ... client, "gs://my-bucket/data/*.csv", ... "my_dataset", "my_table", ... skip_leading_rows=1, ... ) >>> print(f"Cargadas {result['rows_loaded']} filas, job: {result['job_id']}") """ format_map = { "CSV": bigquery.SourceFormat.CSV, "NEWLINE_DELIMITED_JSON": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, "AVRO": bigquery.SourceFormat.AVRO, "PARQUET": bigquery.SourceFormat.PARQUET, "ORC": bigquery.SourceFormat.ORC, } disposition_map = { "WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND, "WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE, "WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY, } job_config = bigquery.LoadJobConfig( source_format=format_map.get(source_format, bigquery.SourceFormat.CSV), write_disposition=disposition_map.get(source_format, bigquery.WriteDisposition.WRITE_APPEND), autodetect=autodetect, skip_leading_rows=skip_leading_rows, ) job_config.write_disposition = disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_APPEND) table_ref = client._client.dataset(dataset_id).table(table_id) uris = uri if isinstance(uri, list) else [uri] job = client._client.load_table_from_uri(uris, table_ref, job_config=job_config) result = job.result() return { "job_id": job.job_id, "rows_loaded": result.output_rows, "status": "DONE" if job.state == "DONE" and not job.errors else "FAILED", } def bq_load_from_file( client: BQClient, file_path: str, dataset_id: str, table_id: str, source_format: str = "CSV", write_disposition: str = "WRITE_APPEND", autodetect: bool = True, skip_leading_rows: int = 0, ) -> dict: """Carga datos desde un archivo local a una tabla BigQuery. Abre el archivo y usa load_table_from_file del SDK. Equivalente a bq_load_from_gcs pero para archivos en disco local. Args: client: Cliente autenticado de BigQuery. file_path: Ruta absoluta o relativa al archivo local a cargar. dataset_id: ID del dataset de destino. table_id: ID de la tabla de destino. source_format: Formato del archivo: "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "PARQUET", "ORC". Default: "CSV". write_disposition: Comportamiento si la tabla existe: "WRITE_APPEND", "WRITE_TRUNCATE", "WRITE_EMPTY". Default: "WRITE_APPEND". autodetect: Si True, detecta el schema automaticamente. Default: True. skip_leading_rows: Numero de filas a saltar al inicio (cabeceras CSV). Returns: {"job_id": str, "rows_loaded": N, "status": "DONE"|"FAILED"}. Raises: FileNotFoundError: Si el archivo local no existe. google.api_core.exceptions.GoogleAPICallError: Si el job falla. Example: >>> result = bq_load_from_file( ... client, "/tmp/data.csv", ... "my_dataset", "my_table", ... skip_leading_rows=1, ... ) >>> print(f"Cargadas {result['rows_loaded']} filas") """ format_map = { "CSV": bigquery.SourceFormat.CSV, "NEWLINE_DELIMITED_JSON": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, "AVRO": bigquery.SourceFormat.AVRO, "PARQUET": bigquery.SourceFormat.PARQUET, "ORC": bigquery.SourceFormat.ORC, } disposition_map = { "WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND, "WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE, "WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY, } job_config = bigquery.LoadJobConfig( source_format=format_map.get(source_format, bigquery.SourceFormat.CSV), write_disposition=disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_APPEND), autodetect=autodetect, skip_leading_rows=skip_leading_rows, ) table_ref = client._client.dataset(dataset_id).table(table_id) with open(file_path, "rb") as f: job = client._client.load_table_from_file(f, table_ref, job_config=job_config) result = job.result() return { "job_id": job.job_id, "rows_loaded": result.output_rows, "status": "DONE" if job.state == "DONE" and not job.errors else "FAILED", } def bq_export_to_gcs( client: BQClient, dataset_id: str, table_id: str, destination_uri: str, destination_format: str = "CSV", compression: str = "NONE", ) -> dict: """Exporta una tabla BigQuery a Google Cloud Storage. Usa extract_table del SDK para crear un ExtractJob. Espera la finalizacion del job con job.result(). Args: client: Cliente autenticado de BigQuery. dataset_id: ID del dataset de origen. table_id: ID de la tabla de origen. destination_uri: URI de destino en GCS. Para multiples archivos usar wildcard: "gs://bucket/prefix_*.csv". destination_format: Formato de exportacion: "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "PARQUET". Default: "CSV". compression: Compresion: "NONE", "GZIP", "SNAPPY", "DEFLATE". Default: "NONE". Returns: {"job_id": str, "destination_uri": str, "status": "DONE"|"FAILED"}. Raises: google.api_core.exceptions.NotFound: Si la tabla no existe. google.api_core.exceptions.GoogleAPICallError: Si el job falla. Example: >>> result = bq_export_to_gcs( ... client, "my_dataset", "my_table", ... "gs://my-bucket/export/data_*.csv", ... compression="GZIP", ... ) >>> print(f"Exportado a {result['destination_uri']}, job: {result['job_id']}") """ format_map = { "CSV": bigquery.DestinationFormat.CSV, "NEWLINE_DELIMITED_JSON": bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON, "AVRO": bigquery.DestinationFormat.AVRO, "PARQUET": bigquery.DestinationFormat.PARQUET, } compression_map = { "NONE": bigquery.Compression.NONE, "GZIP": bigquery.Compression.GZIP, "SNAPPY": bigquery.Compression.SNAPPY, "DEFLATE": bigquery.Compression.DEFLATE, } job_config = bigquery.ExtractJobConfig( destination_format=format_map.get(destination_format, bigquery.DestinationFormat.CSV), compression=compression_map.get(compression, bigquery.Compression.NONE), ) table_ref = client._client.dataset(dataset_id).table(table_id) job = client._client.extract_table(table_ref, destination_uri, job_config=job_config) job.result() return { "job_id": job.job_id, "destination_uri": destination_uri, "status": "DONE" if job.state == "DONE" and not job.errors else "FAILED", } def bq_copy_table( client: BQClient, source_dataset: str, source_table: str, dest_dataset: str, dest_table: str, write_disposition: str = "WRITE_EMPTY", ) -> dict: """Copia una tabla BigQuery a otro dataset o tabla dentro del mismo proyecto. Usa copy_table del SDK para crear un CopyJob. Espera la finalizacion del job con job.result(). No copia datos entre proyectos distintos. Args: client: Cliente autenticado de BigQuery. source_dataset: ID del dataset de origen. source_table: ID de la tabla de origen. dest_dataset: ID del dataset de destino. dest_table: ID de la tabla de destino. write_disposition: Comportamiento si la tabla destino existe: "WRITE_EMPTY" (falla si existe), "WRITE_APPEND", "WRITE_TRUNCATE". Default: "WRITE_EMPTY". Returns: {"job_id": str, "status": "DONE"|"FAILED"}. Raises: google.api_core.exceptions.NotFound: Si la tabla de origen no existe. google.api_core.exceptions.Conflict: Si destino existe y write_disposition es WRITE_EMPTY. google.api_core.exceptions.GoogleAPICallError: Si el job falla. Example: >>> result = bq_copy_table( ... client, ... "production", "users", ... "staging", "users_backup", ... write_disposition="WRITE_TRUNCATE", ... ) >>> print(f"Copia completada: {result['status']}, job: {result['job_id']}") """ disposition_map = { "WRITE_APPEND": bigquery.WriteDisposition.WRITE_APPEND, "WRITE_TRUNCATE": bigquery.WriteDisposition.WRITE_TRUNCATE, "WRITE_EMPTY": bigquery.WriteDisposition.WRITE_EMPTY, } job_config = bigquery.CopyJobConfig( write_disposition=disposition_map.get(write_disposition, bigquery.WriteDisposition.WRITE_EMPTY), ) source_ref = client._client.dataset(source_dataset).table(source_table) dest_ref = client._client.dataset(dest_dataset).table(dest_table) job = client._client.copy_table(source_ref, dest_ref, job_config=job_config) job.result() return { "job_id": job.job_id, "status": "DONE" if job.state == "DONE" and not job.errors else "FAILED", }