Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asyncio.gather() on list of dict which has a field of coroutine?

I have the following two async function

from tornado.httpclient import AsyncHTTPClient

async def get_categories(): # return a list of str
    # ....
    http = AsyncHTTPClient()
    resp = await http.fetch(....)
    return [....]

async def get_details(category): # return a list of dict
    # ....
    http = AsyncHTTPClient()
    resp = await http.fetch(....)
    return [....]

Now I need to create a function to get the details (run http fetch concurrently) for all the categories and combine them together.

async def get_all_details():
    categories = await get_categories()
    tasks = list(map(lambda x: {'category': x, 'task':get_details(x)}, categories))
    r = await asyncio.gather(*tasks) # error

# need to return [
#   {'category':'aaa', 'detail':'aaa detail 1'}, 
#   {'category':'aaa', 'detail':'aaa detail 2'}, 
#   {'category':'bbb', 'detail':'bbb detail 1'}, 
#   {'category':'bbb', 'detail':'bbb detail 2'}, 
#   {'category':'bbb', 'detail':'bbb detail 3'}, 
#   {'category':'ccc', 'detail':'ccc detail 1'}, 
#   {'category':'ccc', 'detail':'aaa detail 2'}, 
# ]

However, the list line return the error:

TypeError: unhashable type: 'dict'

The tasks has the following values:

[{'category': 'aaa',
  'task': <coroutine object get_docker_list at 0x000001B12B8560C0>},
 {'category': 'bbb',
  'task': <coroutine object get_docker_list at 0x000001B12B856F40>},
 {'category': 'ccc',
  'task': <coroutine object get_docker_list at 0x000001B12B856740>}]

BTW, is it a way to throttle the http fetch calls? For example, at most four fetches running at the same time.

like image 912
ca9163d9 Avatar asked Mar 01 '23 22:03

ca9163d9


1 Answers

gather accepts coroutine (or other awaitable) arguments and returns a tuple of their results in the same order. You are passing it a sequence of dicts some of whose values are coroutines. gather doesn't know what to do with that and attempts to treat the dicts as awaitable objects, which fails soon enough.

The correct way to generate the list of dicts would be to pass just the coroutines to gather, await the results, and process them into a new dict:

async def get_all_details():
    category_list = await get_categories()
    details_list = await asyncio.gather(
        *[get_details(category) for category in category_list]
    )
    return [
        {'category': category, 'details': details}
        for (category, details) in zip(category_list, details_list)
    ]

BTW, is it a way to throttle the http fetch calls? For example, at most four fetches running at the same time.

The convenient and idiomatic way to limit parallel calls is by using a semaphore:

async def get_details(category, limit):
    # acquiring the semaphore passed as `limit` will allow at most a
    # fixed number of coroutines to proceed concurrently
    async with limit:
        ... the rest of the code ...

async def get_all_details():
    limit = asyncio.Semaphore(4)
    category_list = await get_categories()
    details_list = await asyncio.gather(
        *[get_details(category, limit) for category in category_list]
    )
    ... the rest of the code ...
like image 117
user4815162342 Avatar answered Mar 16 '23 22:03

user4815162342