I have the following code:
import asyncio
import threading
import time
q = asyncio.Queue()
async def ping():
while True:
await asyncio.sleep(10)
print("ping")
async def rcv():
while True:
item = await q.get()
print("got item")
async def run():
tasks = [asyncio.ensure_future(ping()), asyncio.ensure_future(rcv())]
await asyncio.wait(tasks, return_when="FIRST_EXCEPTION")
loop = asyncio.get_event_loop()
def run_loop():
asyncio.set_event_loop(loop)
loop.run_until_complete(run())
threading.Thread(target=run_loop).start()
while True:
time.sleep(2)
q.put_nowait("item")
print("item added")
I expected that every 2 seconds (each time item is added to the queue), I'll see the output:
item added
sleeping 2 seconds
got item
and every 10 seconds I'll also see ping.
However, this is the output I get (repeating):
sleeping 2 seconds
item added
sleeping 2 seconds
item added
sleeping 2 seconds
item added
sleeping 2 seconds
item added
sleeping 2 seconds
got item
got item
got item
got item
ping
item added
sleeping 2 seconds
...
It seems as if the part item = await q.get() wait also for asyncio.sleep(10) from the ping function.
What did I miss? and how can I fix the code so I'll get the expected output?
Thanks!
What did I miss? and how can I fix the code so I'll get the expected output?
Since you are running the event loop in a separate thread, you need to change q.put_nowait("item") to:
loop.call_soon_threadsafe(q.put_nowait, "item")
The reason is that asyncio code is (intentionally) not thread-safe, so using put_nowait doesn't notify the event loop that a new item was enqueued.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With