Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asyncio.Queue Stuck With 1 Coroutine Adding to Queue, 1 Coroutine Getting from Queue

In my simple asyncio code below, the app has one task self.add_item_loop_task continuously adding an integer to the asyncio.Queue named self.queue, while a second task self.get_item_loop_task continuously waits for something to be added to the queue and print it out.

However, this app only prints out 0 once when I run it, and gets stuck there. I believe the loop in self.get_item_loop_task is not proceeding. Why is this happening?

import asyncio

class App:
    def __init__(self):
        self.queue = asyncio.Queue()

    async def start(self):
        self.add_item_loop_task = asyncio.create_task(self.add_item_loop())
        self.get_item_loop_task = asyncio.create_task(self.get_item_loop())
        await asyncio.wait(
            [
                self.add_item_loop_task,
                self.get_item_loop_task,
            ]
        )

    async def stop(self):
        self.add_item_loop_task.cancel()
        self.get_item_loop_task.cancel()

    async def add_item_loop(self):
        i = 0
        while True:
            await self.queue.put(i)
            i += 1
            await asyncio.sleep(1)

    async def get_item_loop(self):
        while True:
            item = await self.queue.get()
            print(item)


app = App()
try:
    asyncio.run(app.start())
except KeyboardInterrupt:
    asyncio.run(app.stop())
like image 470
Nyxynyx Avatar asked Feb 04 '26 12:02

Nyxynyx


2 Answers

This is caused by some dubious implementation details of asyncio. When you say self.queue = asyncio.Queue() this will actually create an event loop if one does not already exist. Meanwhile, when you call asyncio.run() it will always create a new event loop. This means if you create a queue before you call asyncio.run() you can get some strange behavior because there are two event loops, the one that your queue uses and the one that asyncio.run is using.

You can fix this issue by moving the creation of App into a coroutine function that you pass in to asyncio.run() like below. Doing this your application works as intended.

async def main():
    app = App()
    await app.start()

asyncio.run(main())
like image 188
Matt Fowler Avatar answered Feb 07 '26 03:02

Matt Fowler


As @Matt Fowler explained the code is creating two simultaneous event loops. One created when initializing the class and the second one when running asyncio.run(main()).

You can simply move self.queue = asyncio.Queue() from the def __init__(self) method to the beginning async def start(self) method. The newly created asyncio.Queue() object will be available to both tasks.

The following code works:

import asyncio

class App:
# def __init__(self):
#     self.queue = asyncio.Queue()

async def start(self):
    # self.queue in the following line will be available to both tasks
    self.queue = asyncio.Queue()
    self.add_item_loop_task = asyncio.create_task(self.add_item_loop())
    self.get_item_loop_task = asyncio.create_task(self.get_item_loop())
    await asyncio.wait(
        [
            self.add_item_loop_task,
            self.get_item_loop_task,
        ]
    )

async def stop(self):
    self.add_item_loop_task.cancel()
    self.get_item_loop_task.cancel()

async def add_item_loop(self):
    i = 0
    while True:
        await self.queue.put(i)
        i += 1
        await asyncio.sleep(1)

async def get_item_loop(self):
    while True:
        item = await self.queue.get()
        print(item)

app = App()
try:
  asyncio.run(app.start())
except KeyboardInterrupt:
  asyncio.run(app.stop())
like image 30
Arjuna Deva Avatar answered Feb 07 '26 03:02

Arjuna Deva