Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python - How to - Big Query asynchronous tasks

This may be a dummy question but I cannot seem to be able to run python google-clood-bigquery asynchronously.

My goal is to run multiple queries concurrently and wait for all to finish in an asyncio.wait() query gatherer. I'm using asyncio.create_tast() to launch the queries. The problem is that each query waits for the precedent one to complete before starting.

Here is my query function (quite simple):

async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
  job = self.api.query(query, **kwargs)
  return job.result()

Since I cannot await job.result() should I await something else?

like image 766
Antoine Dussarps Avatar asked Oct 30 '18 13:10

Antoine Dussarps


People also ask

How many concurrent queries can BigQuery handle?

Your project can run up to 100 concurrent interactive queries.


1 Answers

If you are working inside of a coroutine and want to run different queries without blocking the event_loop then you can use the run_in_executor function which basically runs your queries in background threads without blocking the loop. Here's a good example of how to use that.

Make sure though that that's exactly what you need; jobs created to run queries in the Python API are already asynchronous and they only block when you call job.result(). This means that you don't need to use asyncio unless you are inside of a coroutine.

Here's a quick possible example of retrieving results as soon as the jobs are finished:

from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq


client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'

threads = []
results = []

executor = ThreadPoolExecutor(5)

for job in [client.query(query1), client.query(query2)]:
    threads.append(executor.submit(job.result))

# Here you can run any code you like. The interpreter is free

for future in as_completed(threads):
    results.append(list(future.result()))

results will be:

[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]
like image 146
Willian Fuks Avatar answered Nov 01 '22 08:11

Willian Fuks