Files
fn_registry/python/functions/bigquery/jobs.py
egutierrez 690e68a542 feat: add BigQuery Python functions and BQClient type
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries,
jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
2026-04-07 18:45:02 +02:00

106 lines
3.3 KiB
Python

"""Gestion de jobs en Google BigQuery."""
from .client import BQClient
def bq_list_jobs(
client: BQClient,
state_filter: str = "",
max_results: int = 50,
all_users: bool = False,
) -> list[dict]:
"""Lista jobs del proyecto con filtro opcional por estado.
Args:
client: Cliente autenticado.
state_filter: Filtro: "running", "pending", "done". Vacio = todos.
max_results: Numero maximo de jobs a retornar.
all_users: Si True, lista jobs de todos los usuarios (requiere permisos).
Returns:
Lista de dicts con: job_id, job_type, state, created, started, ended,
user_email, bytes_processed, error.
Example:
>>> jobs = bq_list_jobs(client, state_filter="running")
>>> for j in jobs:
... print(j["job_id"], j["state"], j["job_type"])
"""
kwargs = {"max_results": max_results, "all_users": all_users}
if state_filter:
kwargs["state_filter"] = state_filter
return [_job_to_dict(job) for job in client._client.list_jobs(**kwargs)]
def bq_get_job(client: BQClient, job_id: str) -> dict:
"""Obtiene detalles de un job por su ID.
Args:
client: Cliente autenticado.
job_id: ID del job.
Returns:
Dict con: job_id, job_type, state, created, started, ended,
user_email, bytes_processed, destination_table, query, error, errors.
Raises:
google.api_core.exceptions.NotFound: Si el job no existe.
Example:
>>> job = bq_get_job(client, "job_abc123")
>>> print(job["state"], job["bytes_processed"])
>>> if job["error"]:
... print("Error:", job["error"])
"""
job = client._client.get_job(job_id)
result = _job_to_dict(job)
# Agregar detalles extra disponibles en get pero no en list
if hasattr(job, 'destination') and job.destination:
result["destination_table"] = str(job.destination)
if hasattr(job, 'query') and job.query:
result["query"] = job.query
if job.errors:
result["errors"] = [dict(e) for e in job.errors]
return result
def bq_cancel_job(client: BQClient, job_id: str) -> dict:
"""Cancela un job en ejecucion.
Args:
client: Cliente autenticado.
job_id: ID del job a cancelar.
Returns:
Dict con: job_id, state (tras cancelacion).
Raises:
google.api_core.exceptions.NotFound: Si el job no existe.
Example:
>>> result = bq_cancel_job(client, "job_abc123")
>>> print(result["state"]) # "DONE" (cancelled)
"""
job = client._client.cancel_job(job_id)
return {"job_id": job.job_id, "state": job.state}
def _job_to_dict(job) -> dict:
"""Convierte un objeto Job del SDK a dict plano."""
result = {
"job_id": job.job_id,
"job_type": job.job_type,
"state": job.state,
"created": job.created.isoformat() if job.created else None,
"started": job.started.isoformat() if job.started else None,
"ended": job.ended.isoformat() if job.ended else None,
"user_email": job.user_email,
"error": str(job.error_result) if job.error_result else None,
}
# bytes_processed solo disponible en query jobs
try:
result["bytes_processed"] = job.total_bytes_processed
except AttributeError:
result["bytes_processed"] = None
return result