How to continue to next loop when awaiting? For example:
async def get_message():
# async get message from queue
return message
async process_message(message):
# make some changes on message
return message
async def deal_with_message(message):
# async update some network resource with given message
async def main():
while True:
message = await get_message()
message = await process_message(message)
await deal_with_message(message)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
How can I make the while True
loop concurrent? If it is awaiting deal_with_message
, it can go to the next loop and run get_message
?
I think I have found a solution:
async def main():
asyncio.ensure_future(main())
message = await get_message()
message = await process_message(message)
await deal_with_message(message)
loop = asyncio.get_event_loop()
asyncio.ensure_future(main())
loop.run_forever()
asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
How many times should Asyncio run () be called? It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
gather() method - It runs awaitable objects (objects which have await keyword) concurrently.
asyncio. get_event_loop() Get the current event loop. If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.
Your solution will work, however I see problem with it.
async def main():
asyncio.ensure_future(main())
# task finishing
As soon as main
started it creates new task and it happens immediately (ensure_future
creates task immediately) unlike actual finishing of this task that takes time. I guess it can potentially lead to creating enormous amount of tasks which can drain your RAM.
Besides that it means that potentially any enormous amount of tasks can be ran concurrently. It can drain your network throughput or amount of sockets that can be opened same time (just imagine you're tying to download 1 000 000 urls parallely - nothing good will happen).
In concurrent world this problem usually can be solved by limiting amount of things that can be ran concurrently with some sensible value using something like Semaphore. In your case however I think it's more convenient to track amount of running tasks manually and populate it manually:
import asyncio
from random import randint
async def get_message():
message = randint(0, 1_000)
print(f'{message} got')
return message
async def process_message(message):
await asyncio.sleep(randint(1, 5))
print(f'{message} processed')
return message
async def deal_with_message(message):
await asyncio.sleep(randint(1, 5))
print(f'{message} dealt')
async def utilize_message():
message = await get_message()
message = await process_message(message)
await deal_with_message(message)
parallel_max = 5 # don't utilize more than 5 msgs parallely
parallel_now = 0
def populate_tasks():
global parallel_now
for _ in range(parallel_max - parallel_now):
parallel_now += 1
task = asyncio.ensure_future(utilize_message())
task.add_done_callback(on_utilized)
def on_utilized(_):
global parallel_now
parallel_now -= 1
populate_tasks()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
populate_tasks()
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Output will be like:
939 got
816 got
737 got
257 got
528 got
939 processed
816 processed
528 processed
816 dealt
589 got
939 dealt
528 dealt
712 got
263 got
737 processed
257 processed
263 processed
712 processed
263 dealt
712 dealt
386 got
708 got
589 processed
257 dealt
386 processed
708 processed
711 got
711 processed
Important part here is how we got next message to be utilized only after amount of running tasks decreased to less than five.
Upd:
Yes, semaphore seems to be more convenient if you don't need to change max running number dynamically.
sem = asyncio.Semaphore(5)
async def main():
async with sem:
asyncio.ensure_future(main())
await utilize_message()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(main())
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With