Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to limit the number of coroutines running corcurrently in asyncio?

Tags:

I already wrote my script using asyncio but found that the number of coroutines running simultaneously is too large and it often ends up hanging around.

So I would like to limit the number of coroutines concurrently, and once it reaches the limit, I want to wait for any coroutine to be finished before another is executed.

My current code is something like the following:

loop = asyncio.get_event_loop()
p = map(my_func, players)
result = loop.run_until_complete(asyncio.gather(*p))

async def my_func(player):
    # something done with `await`

The players is of type list and contains many elements (say, 12000). It needs so much computational resource to run all of them simultaneously in asyncio.gather(*p) so I would rather like the number of players run simultaneously to be 200. Once it reaches 199, then I wish another coroutine starts to be executed.

Is this possible in asyncio?

like image 425
Blaszard Avatar asked May 12 '18 17:05

Blaszard


People also ask

How many times should Asyncio run () be called?

It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.

Is Asyncio a concurrency?

Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop.

How do I stop Asyncio from running?

Run an asyncio Event Loop run_until_complete(<some Future object>) – this function runs a given Future object, usually a coroutine defined by the async / await pattern, until it's complete. run_forever() – this function runs the loop forever. stop() – the stop function stops a running loop.

Which function is used to run Awaitables concurrently in Asyncio?

gather() method - It runs awaitable objects (objects which have await keyword) concurrently.


2 Answers

I can suggest using asyncio.BoundedSemaphore.

import asyncio

async def my_func(player, asyncio_semaphore):
    async with asyncio_semaphore:
        # do stuff

async def main():
    asyncio_semaphore = asyncio.BoundedSemaphore(200)
    jobs = []
    for i in range(12000):
        jobs.append(asyncio.ensure_future(my_func(players[i], asyncio_semaphore)))
    await asyncio.gather(*jobs)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(main())

This way, only 200 concurrent tasks can acquire semaphore and use system resources while 12000 tasks are at hand.

like image 109
Ali Yılmaz Avatar answered Sep 20 '22 19:09

Ali Yılmaz


You can wrap your gather and enforce a Semaphore:

import asyncio

async def semaphore_gather(num, coros, return_exceptions=False):
    semaphore = asyncio.Semaphore(num)

    async def _wrap_coro(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(
        *(_wrap_coro(coro) for coro in coros), return_exceptions=return_exceptions
    )

# async def a():
#     return 1

# print(asyncio.run(semaphore_gather(10, [a() for _ in range(100)])))
# [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
like image 25
ddelange Avatar answered Sep 18 '22 19:09

ddelange