Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to concurrently run a infinite loop with asyncio?

How to continue to next loop when awaiting? For example:

async def get_message():
    # async get message from queue
    return message

async process_message(message):
    # make some changes on message
    return message

async def deal_with_message(message):
    # async update some network resource with given message

async def main():
    while True:
        message = await get_message()
        message = await process_message(message)
        await deal_with_message(message)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

How can I make the while True loop concurrent? If it is awaiting deal_with_message, it can go to the next loop and run get_message?

Edited

I think I have found a solution:

async def main():
    asyncio.ensure_future(main())
    message = await get_message()
    message = await process_message(message)
    await deal_with_message(message)

loop = asyncio.get_event_loop()
asyncio.ensure_future(main())
loop.run_forever()
like image 904
Sraw Avatar asked Dec 11 '17 03:12

Sraw


People also ask

Is Asyncio concurrent?

asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.

How many times should Asyncio run () be called?

How many times should Asyncio run () be called? It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.

Which function is used to run Awaitables concurrently in Asyncio?

gather() method - It runs awaitable objects (objects which have await keyword) concurrently.

What is Asyncio Get_event_loop ()?

asyncio. get_event_loop() Get the current event loop. If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.


1 Answers

Your solution will work, however I see problem with it.

async def main():
    asyncio.ensure_future(main())
    # task finishing

As soon as main started it creates new task and it happens immediately (ensure_future creates task immediately) unlike actual finishing of this task that takes time. I guess it can potentially lead to creating enormous amount of tasks which can drain your RAM.

Besides that it means that potentially any enormous amount of tasks can be ran concurrently. It can drain your network throughput or amount of sockets that can be opened same time (just imagine you're tying to download 1 000 000 urls parallely - nothing good will happen).

In concurrent world this problem usually can be solved by limiting amount of things that can be ran concurrently with some sensible value using something like Semaphore. In your case however I think it's more convenient to track amount of running tasks manually and populate it manually:

import asyncio
from random import randint


async def get_message():
    message = randint(0, 1_000)
    print(f'{message} got')
    return message


async def process_message(message):
    await asyncio.sleep(randint(1, 5))
    print(f'{message} processed')
    return message


async def deal_with_message(message):
    await asyncio.sleep(randint(1, 5))
    print(f'{message} dealt')


async def utilize_message():
    message = await get_message()
    message = await process_message(message)
    await deal_with_message(message)


parallel_max = 5  # don't utilize more than 5 msgs parallely
parallel_now = 0


def populate_tasks():
    global parallel_now
    for _ in range(parallel_max - parallel_now):
        parallel_now += 1
        task = asyncio.ensure_future(utilize_message())
        task.add_done_callback(on_utilized)


def on_utilized(_):
    global parallel_now
    parallel_now -= 1
    populate_tasks()


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        populate_tasks()
        loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

Output will be like:

939 got
816 got
737 got
257 got
528 got
939 processed
816 processed
528 processed
816 dealt
589 got
939 dealt
528 dealt
712 got
263 got
737 processed
257 processed
263 processed
712 processed
263 dealt
712 dealt
386 got
708 got
589 processed
257 dealt
386 processed
708 processed
711 got
711 processed

Important part here is how we got next message to be utilized only after amount of running tasks decreased to less than five.

Upd:

Yes, semaphore seems to be more convenient if you don't need to change max running number dynamically.

sem = asyncio.Semaphore(5)


async def main():
    async with sem:
        asyncio.ensure_future(main())
        await utilize_message()


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        asyncio.ensure_future(main())
        loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
like image 94
Mikhail Gerasimov Avatar answered Oct 20 '22 14:10

Mikhail Gerasimov