690e68a542
Funciones CRUD completas para BigQuery: auth, datasets, tables, queries, jobs, routines, load/export. Tipo BQClient como wrapper del SDK oficial.
106 lines
3.3 KiB
Python
106 lines
3.3 KiB
Python
"""Gestion de jobs en Google BigQuery."""
|
|
|
|
from .client import BQClient
|
|
|
|
|
|
def bq_list_jobs(
|
|
client: BQClient,
|
|
state_filter: str = "",
|
|
max_results: int = 50,
|
|
all_users: bool = False,
|
|
) -> list[dict]:
|
|
"""Lista jobs del proyecto con filtro opcional por estado.
|
|
|
|
Args:
|
|
client: Cliente autenticado.
|
|
state_filter: Filtro: "running", "pending", "done". Vacio = todos.
|
|
max_results: Numero maximo de jobs a retornar.
|
|
all_users: Si True, lista jobs de todos los usuarios (requiere permisos).
|
|
|
|
Returns:
|
|
Lista de dicts con: job_id, job_type, state, created, started, ended,
|
|
user_email, bytes_processed, error.
|
|
|
|
Example:
|
|
>>> jobs = bq_list_jobs(client, state_filter="running")
|
|
>>> for j in jobs:
|
|
... print(j["job_id"], j["state"], j["job_type"])
|
|
"""
|
|
kwargs = {"max_results": max_results, "all_users": all_users}
|
|
if state_filter:
|
|
kwargs["state_filter"] = state_filter
|
|
return [_job_to_dict(job) for job in client._client.list_jobs(**kwargs)]
|
|
|
|
|
|
def bq_get_job(client: BQClient, job_id: str) -> dict:
|
|
"""Obtiene detalles de un job por su ID.
|
|
|
|
Args:
|
|
client: Cliente autenticado.
|
|
job_id: ID del job.
|
|
|
|
Returns:
|
|
Dict con: job_id, job_type, state, created, started, ended,
|
|
user_email, bytes_processed, destination_table, query, error, errors.
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si el job no existe.
|
|
|
|
Example:
|
|
>>> job = bq_get_job(client, "job_abc123")
|
|
>>> print(job["state"], job["bytes_processed"])
|
|
>>> if job["error"]:
|
|
... print("Error:", job["error"])
|
|
"""
|
|
job = client._client.get_job(job_id)
|
|
result = _job_to_dict(job)
|
|
# Agregar detalles extra disponibles en get pero no en list
|
|
if hasattr(job, 'destination') and job.destination:
|
|
result["destination_table"] = str(job.destination)
|
|
if hasattr(job, 'query') and job.query:
|
|
result["query"] = job.query
|
|
if job.errors:
|
|
result["errors"] = [dict(e) for e in job.errors]
|
|
return result
|
|
|
|
|
|
def bq_cancel_job(client: BQClient, job_id: str) -> dict:
|
|
"""Cancela un job en ejecucion.
|
|
|
|
Args:
|
|
client: Cliente autenticado.
|
|
job_id: ID del job a cancelar.
|
|
|
|
Returns:
|
|
Dict con: job_id, state (tras cancelacion).
|
|
|
|
Raises:
|
|
google.api_core.exceptions.NotFound: Si el job no existe.
|
|
|
|
Example:
|
|
>>> result = bq_cancel_job(client, "job_abc123")
|
|
>>> print(result["state"]) # "DONE" (cancelled)
|
|
"""
|
|
job = client._client.cancel_job(job_id)
|
|
return {"job_id": job.job_id, "state": job.state}
|
|
|
|
|
|
def _job_to_dict(job) -> dict:
|
|
"""Convierte un objeto Job del SDK a dict plano."""
|
|
result = {
|
|
"job_id": job.job_id,
|
|
"job_type": job.job_type,
|
|
"state": job.state,
|
|
"created": job.created.isoformat() if job.created else None,
|
|
"started": job.started.isoformat() if job.started else None,
|
|
"ended": job.ended.isoformat() if job.ended else None,
|
|
"user_email": job.user_email,
|
|
"error": str(job.error_result) if job.error_result else None,
|
|
}
|
|
# bytes_processed solo disponible en query jobs
|
|
try:
|
|
result["bytes_processed"] = job.total_bytes_processed
|
|
except AttributeError:
|
|
result["bytes_processed"] = None
|
|
return result
|