Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asyncio Queue.get delay

I have the following code:

import asyncio
import threading
import time

q = asyncio.Queue()

async def ping():
    while True:
        await asyncio.sleep(10)
        print("ping")

async def rcv():
    while True:
        item = await q.get()
        print("got item")

async def run():
    tasks = [asyncio.ensure_future(ping()), asyncio.ensure_future(rcv())]
    await asyncio.wait(tasks, return_when="FIRST_EXCEPTION")

loop = asyncio.get_event_loop()

def run_loop():
    asyncio.set_event_loop(loop)
    loop.run_until_complete(run())

threading.Thread(target=run_loop).start()

while True:
    time.sleep(2)
    q.put_nowait("item")
    print("item added")

I expected that every 2 seconds (each time item is added to the queue), I'll see the output:

item added
sleeping 2 seconds
got item

and every 10 seconds I'll also see ping.

However, this is the output I get (repeating):

sleeping 2 seconds
item added
sleeping 2 seconds
item added
sleeping 2 seconds
item added
sleeping 2 seconds
item added
sleeping 2 seconds
got item
got item
got item
got item
ping
item added
sleeping 2 seconds
...

It seems as if the part item = await q.get() wait also for asyncio.sleep(10) from the ping function.

What did I miss? and how can I fix the code so I'll get the expected output?

Thanks!

like image 412
Moshe perez Avatar asked Dec 21 '25 08:12

Moshe perez


1 Answers

What did I miss? and how can I fix the code so I'll get the expected output?

Since you are running the event loop in a separate thread, you need to change q.put_nowait("item") to:

loop.call_soon_threadsafe(q.put_nowait, "item")

The reason is that asyncio code is (intentionally) not thread-safe, so using put_nowait doesn't notify the event loop that a new item was enqueued.

like image 155
user4815162342 Avatar answered Dec 23 '25 23:12

user4815162342



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!