"""Gestion de jobs en Google BigQuery.""" from .client import BQClient def bq_list_jobs( client: BQClient, state_filter: str = "", max_results: int = 50, all_users: bool = False, ) -> list[dict]: """Lista jobs del proyecto con filtro opcional por estado. Args: client: Cliente autenticado. state_filter: Filtro: "running", "pending", "done". Vacio = todos. max_results: Numero maximo de jobs a retornar. all_users: Si True, lista jobs de todos los usuarios (requiere permisos). Returns: Lista de dicts con: job_id, job_type, state, created, started, ended, user_email, bytes_processed, error. Example: >>> jobs = bq_list_jobs(client, state_filter="running") >>> for j in jobs: ... print(j["job_id"], j["state"], j["job_type"]) """ kwargs = {"max_results": max_results, "all_users": all_users} if state_filter: kwargs["state_filter"] = state_filter return [_job_to_dict(job) for job in client._client.list_jobs(**kwargs)] def bq_get_job(client: BQClient, job_id: str) -> dict: """Obtiene detalles de un job por su ID. Args: client: Cliente autenticado. job_id: ID del job. Returns: Dict con: job_id, job_type, state, created, started, ended, user_email, bytes_processed, destination_table, query, error, errors. Raises: google.api_core.exceptions.NotFound: Si el job no existe. Example: >>> job = bq_get_job(client, "job_abc123") >>> print(job["state"], job["bytes_processed"]) >>> if job["error"]: ... print("Error:", job["error"]) """ job = client._client.get_job(job_id) result = _job_to_dict(job) # Agregar detalles extra disponibles en get pero no en list if hasattr(job, 'destination') and job.destination: result["destination_table"] = str(job.destination) if hasattr(job, 'query') and job.query: result["query"] = job.query if job.errors: result["errors"] = [dict(e) for e in job.errors] return result def bq_cancel_job(client: BQClient, job_id: str) -> dict: """Cancela un job en ejecucion. Args: client: Cliente autenticado. job_id: ID del job a cancelar. Returns: Dict con: job_id, state (tras cancelacion). Raises: google.api_core.exceptions.NotFound: Si el job no existe. Example: >>> result = bq_cancel_job(client, "job_abc123") >>> print(result["state"]) # "DONE" (cancelled) """ job = client._client.cancel_job(job_id) return {"job_id": job.job_id, "state": job.state} def _job_to_dict(job) -> dict: """Convierte un objeto Job del SDK a dict plano.""" result = { "job_id": job.job_id, "job_type": job.job_type, "state": job.state, "created": job.created.isoformat() if job.created else None, "started": job.started.isoformat() if job.started else None, "ended": job.ended.isoformat() if job.ended else None, "user_email": job.user_email, "error": str(job.error_result) if job.error_result else None, } # bytes_processed solo disponible en query jobs try: result["bytes_processed"] = job.total_bytes_processed except AttributeError: result["bytes_processed"] = None return result