Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to gather task results in Trio?

I wrote a script that uses a nursery and the asks module to loop through and call an API based upon the loop variables. I get responses but don't know how to return the data like you would with asyncio.

I also have a question on limiting the APIs to 5 per second.

from datetime import datetime
import asks
import time
import trio

asks.init("trio")
s = asks.Session(connections=4)

async def main():
    start_time = time.time()

    api_key = 'API-KEY'
    org_id = 'ORG-ID'
    networkIds = ['id1','id2','idn']

    url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
    headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}

    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers)

    print("Total time:", time.time() - start_time)



async def fetch(url, headers):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)




if __name__ == "__main__":
    trio.run(main)

When I run nursery.start_soon(fetch...) , I am printing data within fetch, but how do I return the data? I didn't see anything similar to asyncio.gather(*tasks) function.

Also, I can limit the number of sessions to 1-4, which helps get down below the 5 API per second limit, but was wondering if there was a built in way to ensure that no more than 5 APIs get called in any given second?

like image 744
jleatham Avatar asked Oct 05 '18 18:10

jleatham


People also ask

Is trio better than Asyncio?

Trio makes your code simpler In terms of the actual libraries, they're also very different. The main argument for trio is that it makes writing concurrent code much, much simpler than using asyncio.

What is Python trio?

Trio is a modern Python library for writing asynchronous applications – that is, programs that want to do multiple things at the same time with parallelized I/O, like a web spider that fetches lots of pages in parallel, a web server juggling lots of simultaneous downloads… that sort of thing.


3 Answers

Based on this answers, you can define the following function:

async def gather(*tasks):

    async def collect(index, task, results):
        task_func, *task_args = task
        results[index] = await task_func(*task_args)

    results = {}
    async with trio.open_nursery() as nursery:
        for index, task in enumerate(tasks):
            nursery.start_soon(collect, index, task, results)
    return [results[i] for i in range(len(tasks))]

You can then use trio in the exact same way as asyncio by simply patching trio (adding the gather function):

import trio
trio.gather = gather

Here is a practical example:

async def child(x):
    print(f"Child sleeping {x}")
    await trio.sleep(x)
    return 2*x

async def parent():
    tasks = [(child, t) for t in range(3)]
    return await trio.gather(*tasks)

print("results:", trio.run(parent))
like image 57
cglacet Avatar answered Sep 28 '22 07:09

cglacet


Technically, trio.Queue has been deprecated in trio 0.9. It has been replaced by trio.open_memory_channel.

Short example:

sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, sender, url.format(i), headers)

async for value in receiver:
    # Do your job here
    pass

And in your fetch function you should call async sender.send(value) somewhere.

like image 35
Adrien Clerc Avatar answered Sep 28 '22 06:09

Adrien Clerc


Returning data: pass the networkID and a dict to the fetch tasks:

async def main():
    …
    results = {}
    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers, results, i)
    ## results are available here

async def fetch(url, headers, results, i):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)
    results[i] = response

Alternately, create a trio.Queue to which you put the results; your main task can then read the results from the queue.

API limit: create a trio.Queue(10) and start a task along these lines:

async def limiter(queue):
    while True:
        await trio.sleep(0.2)
        await queue.put(None)

Pass that queue to fetch, as another argument, and call await limit_queue.get() before each API call.

like image 44
Matthias Urlichs Avatar answered Sep 28 '22 06:09

Matthias Urlichs