googleapis / python-bigquery

Apache License 2.0
746 stars 306 forks source link

BigQuery: make jobs awaitable #18

Open dkapitan opened 5 years ago

dkapitan commented 5 years ago

I know BigQuery jobs are asynchronous by default. However, I am struggling to make my datapipeline async end-to-end.

Looking at this JS example, I thought it would be the most Pythonic to make a BigQuery job awaitable. However, I can't get that to work in Python i.e. errors when await client.query(query). Looking at the source code, I don't see which method returns an awaitable object.

I have little experience in writing async Python code and found this example that wraps jobs in a async def coroutine.

class BQApi(object):                                                                                                 
    def __init__(self):                                                                                              
        self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                               

    async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                       
        job = self.api.query(query, **kwargs)                                                                        
        task = asyncio.create_task(self.coroutine_job(job))                                                          
        return await task                                                                                            

    @staticmethod                                                                                                    
    async def coroutine_job(job):                                                                                    
        return job.result()   

The google.api_core.operation.Operation shows how to use add_done_callback to asynchronously wait for long-running operations. I have tried that, but the following yields AttributeError: 'QueryJob' object has no attribute '_condition' :

from concurrent.futures import ThreadPoolExecutor, as_completed
query1 = 'SELECT 1'
query2 = 'SELECT 2'

def my_callback(future):
    result = future.result()

operations = [bq.query(query1), bq.query(query2)]
[operation.add_done_callback(my_callback) for operation in operations]
results2 = []
for future in as_completed(operations):
  results2.append(list(future.result()))

Given that jobs are already asynchronous, would it make sense to add a method that returns an awaitable?

Or am I missing something and is there an Pythonic way to use the BigQuery client with the async/await pattern?

tseaver commented 5 years ago

@dkapitan Re: the async await / async def keywords: our codebase straddles both Python2 and Python3 until at least the end of this year, and thus cannot adopt the syntax directly in the codebase until we drop Python2 support.

We might be able to tweak the google.api_core.operation.Operation class to support that usage at a low level. Can you please post the traceback for the AttributeError?

Also, for your example above, your callback isn't doing anything with result: it is being called from a helper thread, and would need to do something to signal the main thread

dkapitan commented 5 years ago

@tseaver thanks for clarifying. I have read up more on concurrent.futures and asyncio. I understand the latter is indeed quite new and Python 3 only. Will investigate the sample code a bit more, and get to you with the results and/or the traceback.

dkapitan commented 5 years ago

@tseaver here's traceback from the AttributeError

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-19-e9025f68a561> in <module>
      9 [operation.add_done_callback(my_callback) for operation in operations]
     10 results2 = []
---> 11 for future in as_completed(operations):
     12   results2.append(list(future.result()))

/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py in as_completed(fs, timeout)
    217     fs = set(fs)
    218     total_futures = len(fs)
--> 219     with _AcquireFutures(fs):
    220         finished = set(
    221                 f for f in fs

/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py in __enter__(self)
    144     def __enter__(self):
    145         for future in self.futures:
--> 146             future._condition.acquire()
    147 
    148     def __exit__(self, *args):

AttributeError: 'QueryJob' object has no attribute '_condition'
dkapitan commented 5 years ago

@tseaver For now, I have decided to do the following:

import numpy as np
from time import sleep

query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'

def dummy_callback(future):
    global jobs_done
    jobs_done[future.job_id] = True

jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]

# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

Rather than using as_completed I prefer to use the built-in async functionality from the bigquery jobs themselves. This also makes it possible for me to decompose the datapipeline into separate Cloud Functions, without having to keep the main ThreadPoolExecutor live for the duration of the whole pipeline. Incidentally, this was the reason why I was looking into this: my pipelines are longer than the max timeout of 9 minutes for Cloud Functions (or even 15 minutes for Cloud Run).

Downside is I need to keep track of all the job_ids across the various functions, but that is relatively easy to solve when configuring the pipeline by specifying inputs and outputs such that they form a directed acyclic graph.

Any suggestions are welcome. Would be nice if at some point in the future, Google's api future plays nicely with Python's future (no pun intended).

tseaver commented 5 years ago

@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be awaited.

You could simplify your example code using a set, e.g.:

from time import sleep

query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""

query2 = 'SELECT 2'

queries = [query_1, query_2]

awaiting_jobs = set()

def callback(future):
    awaiting_jobs.discard(future.job_id)

for query in queries:
    job = bq.query(query)
    awaiting_jobs.add(job.job_id)
    job.add_done_callback(callback)

while awaiting_jobs:
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')
dkapitan commented 5 years ago

@tseaver Nice, thanks. Never thought of using a set to do a kind of countdown ... :smile:

northtree commented 5 years ago

@tseaver Thanks for the simplify example. Could you fix two typos?

awaiting_jobs.discard(future.job_id)

job = bq.query(query)

tseaver commented 5 years ago

@northtree Thanks for catching those: updated.

sambcom commented 5 years ago

How do you get the rows from your different queries? Won't this block on job.result() What am I missing?

dkapitan commented 5 years ago

@sambcom apologies for the late response; forgot to subscribe to this issue. In my use case, the results of the query are explicitly written to new tables, as part of an ELT pipeline. So that's no issue.

I understand that generically speaking, you could write query results to (temporary) tables in BigQuery, so this should not be a blocking issue.

zbarnett commented 5 years ago

I just ran into this problem myself and found out that even though the job itself is "asynchronous" there are actually two places where synchronous I/O is happening.

From the example usage on https://googleapis.dev/python/bigquery/latest/index.html

from google.cloud import bigquery

client = bigquery.Client()

# Perform a query.
QUERY = (
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
    'WHERE state = "TX" '
    'LIMIT 100')
query_job = client.query(QUERY)  # API request
rows = query_job.result()  # Waits for query to finish

for row in rows:
    print(row.name)

client.query(QUERY) makes a synchronous network request as does query_job.result()

Usually, blocking on result() takes the majority of the time, but I've seen cases where query() can take over a second to complete by itself.

As a workaround for now, I'm running all the BigQuery data fetching code in a separate thread to free up the event loop but it would be fantastic if the BigQuery API supported async I/O.

Example workaround (code fragment):

def get_data(client, query):
    return client.query(query).result()

loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, get_data, client, query)
yan-hic commented 5 years ago

We too have hundreds of load and dedupe queries running async. We do use futures but only to submit the jobs at once and get the id's.

From there we poke regularly the Stackdriver logging (using airflow) for the status of all in one API call. Waiting for blocking result() is inefficient if you only want to know when jobs are done.

I wish get_job() could support multiple jobids but the API doesn't and an enhancement request has stalled.

tswast commented 5 years ago

We've identified EOL for new versions of the BigQuery client library as January 1, 2020 in https://github.com/googleapis/google-cloud-python/pull/9036 We can revisit this request after that date.

jsigee87 commented 4 years ago

Wanted to bump this thread, I am currently trying to write async BigQuery jobs myself and it would be great if the job were awaitable.

vgrabovets commented 3 years ago

Any news here? You've already dropped support for Python 2.

yanhong-zhao-ef commented 3 years ago

still waiting here... do you have an approximate timeline for this?

tswast commented 3 years ago

@yanhong-zhao-ef I'd love to have this feature, but it's going to take some design work to get right. In other Google Cloud libraries, asynchronous support is provided by a completely separate "AsyncClient". Since this BigQuery library is handwritten, I'd like to avoid having to maintain two copies of the same methods.

tswast commented 3 years ago

I talked with some folks on Firestore who've implemented this. They suggest a trio of classes: async_batch, base_batch, batch https://github.com/googleapis/python-firestore/tree/master/google/cloud/firestore_v1 in order to avoid too much duplicated effort.

jarednash0 commented 3 years ago

@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be awaited.

You could simplify your example code using a set, e.g.:

from time import sleep

query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""

query2 = 'SELECT 2'

queries = [query_1, query_2]

awaiting_jobs = set()

def callback(future):
    awaiting_jobs.discard(future.job_id)

for query in queries:
    job = bq.query(query)
    awaiting_jobs.add(job.job_id)
    job.add_done_callback(callback)

while awaiting_jobs:
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

I tried implementing this with copy_table, but I added "assert future.done()" as the first line in callback() and found that the assertion was failing. This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality?

tswast commented 3 years ago

This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality?

@jarednash0 Definitely not intended. It'd be worth filing a separate issue for this so that we can address it, ideally with the code example that reproduces it.

theophile-dh commented 3 years ago

Any news on this issue? It would be a very useful feature

eboddington commented 3 years ago

Would definitely appreciate this as well

adamserafini commented 2 years ago

Any update from Google on asyncio compatibility for BigQuery lib?

jasonwillschiu commented 2 years ago

This is probably not the best, but right now a simple way to get queries running synchronously is to chain the queries together in one mega(big)query haha.

My clunky example:

Q1 = f"""
  CREATE OR REPLACE TABLE `{project}.{dataset}.{table}`
  AS
  SELECT * EXCEPT({currency_pair}) 
  FROM `{project}.{dataset}.{table}`;
  """ 

Q2 = f"""
  CREATE OR REPLACE TABLE `{project}.{dataset}.{table}`
  AS
  (SELECT *
  FROM `{project}.{dataset}.{table}` tab1
  JOIN (
  SELECT {date_col}, {currency_pair}
  FROM `{project}.currency_conversion.{base_currency}_conversion_table` 
  ) currency
  ON tab1.date=currency.{date_col}
  );
  """

Chained_Query1 = Q1 + Q2
query_job = client.query(Chained_Query1)
adamserafini commented 2 years ago

AFAICT the above proposal isn't actually making the query awaitable though, is it? we want to run the query asynchronously, not synchronously.

jasonwillschiu commented 2 years ago

My bad, I thought my data was getting stuffed around because these bigquery processes were asynchronous already

chalmerlowe commented 2 years ago

Gonna close this out as "Will not work", due to conflicting priorities.

dkapitan commented 11 months ago

+1 for reopening!

kiraksi commented 8 months ago

For awareness: This is an internship project and my internship ends in 2 weeks. I plan on implementing an AsyncClient for async_query_and_wait rpc, there has been other rpc's requested to be made asynchronous which I may not have time for. For future maintainers I will document my progress here, but please refer to the internal design doc on this feature which can be found at my internship documentation website go/kirnendra-internship

RPCs to make async:

adamserafini commented 8 months ago

Amazing @kiraksi, that is awesome.

But, @Google leadership: I find it sad that a 5 year old issue to move the library to properly support asyncio Python has taken this long and treated as a side project for the organisation.

The message I take from it is that the Google leadership simply doesn't care about supporting Python as a first-class citizen of the Google Cloud Developer Experience.

chalmerlowe commented 8 months ago

@adamserafini Thank you for your input. We will take it into consideration.

kiraksi commented 8 months ago

New updates: there are some blocking issues in that in order to make a completely asynchronous RowIterator object for the query and query_andwait methods, which is a child of HTTPIterator in google-api-core library, I would need an AsyncHTTPIterator, I have optioned this issue here: https://github.com/googleapis/python-api-core/issues/627 and I may make this PR myself. However until then I will be focusing on the get methods that won't have this blocking issue. Additionally, there is current work on this but I would like this method exposed: https://github.com/googleapis/google-auth-library-python/issues/613

Tunneller commented 7 months ago

Any news kiraksi? I guess that your internship has ended. Will the PR still go forward? Can you elaborate more on what we will see? You mention that your solution will not be completely asynchronous?

tommyhutcheson commented 6 months ago

Hey @tswast @chalmerlowe Is there an update for this issue and @kiraksi PR https://github.com/googleapis/python-bigquery/pull/1853 the python / BigQuery communities would love this feature to be available.

chalmerlowe commented 6 months ago

At this moment, this task is on hold. We will revisit it when we have manpower. I am not gonna close it so that it remains on the radar.

pingyeh commented 6 months ago

Before the async libraries are ready, it seems the done callback is the best way to run multiple queries in parallel unless the developer is willing to do multi-threading.

Can you please improve the documentation on add_done_callback? It is unclear what the argument to the callback is, and what the callback can do with it. It's a Future class, but which Future class is it? Is it asyncio.Future or google.api_core.future.base.Future or something else?

alexandernst commented 1 week ago

I have just red the entire thread and I'm not sure if I'm reading comments written by paid devs working for one of the wealthiest organizations in the world (certainly one of the top 3 cloud providers) or by a couple of people working on their spare time on their least important side project.

I just refuse to believe that Google is capable of neglecting the official library for BigQuery in such a way. What a sorry state. 😞