I'm trying to use asyncio with both a sync (would be the rest of the python program) and an async bloc and have the sync bloc send data through asyncio.queues.
Without the queueing everythig works fine. but when I'm sending data in the queue it seems to block. I'm trying different ways with get_nowait, etc... but with no success so far.
import asyncio
import time
queue = asyncio.Queue()
async def processor() -> None:
print("Started proc")
while True:
print("waiting for quee")
msg = await queue.get()
print(f"Got command from queue: {msg}")
# do something
await asyncio.sleep(5)
def run_sync(url: str) -> int:
while 1:
print("Sending HTTP request")
input("enter to send message to queue\n")
queue.put_nowait(url)
#do other work
time.sleep(10)
async def run_sync_threaded( url: str) -> int:
return await asyncio.to_thread(run_sync, url)
async def main() -> None:
await asyncio.gather(
processor(),
run_sync_threaded("https://www.example.com"),
)
asyncio.run(main())
EDIT: Got this working, but looks like a work around instead of a proper solution. I don't know feels not very stable
import asyncio
import time
queue = asyncio.Queue()
async def processor() -> None:
print("Started proc")
while True:
print("waiting for quee")
msg = await queue.get()
print(f"Got command from queue: {msg}")
# do something
await asyncio.sleep(5)
async def async_send(url):
print(f'Adding {url} to queue')
queue.put_nowait(url)
def send(url, loop):
asyncio.run_coroutine_threadsafe(async_send(url), loop)
def run_sync(url: str, loop) -> int:
while 1:
input("enter to send message to queue\n")
send(url, loop)
#do other work
time.sleep(3)
async def run_sync_threaded( url: str, loop) -> int:
return await asyncio.to_thread(run_sync, url, loop)
async def main() -> None:
loop = asyncio.get_event_loop()
t = asyncio.create_task( processor())
t2 = asyncio.create_task(run_sync_threaded("https://www.example.com", loop))
asyncio.gather(
await t,
await t2
)
# This does not work
# asyncio.gather(
# await processor(),
# await run_sync_threaded("https://www.example.com", loop)
# )
asyncio.run(main())
Your problem is that put_nowait() under the hood is trying to wake up the next waiting task. But asyncio doesn't handle the situation when this method is called from a synchronous function outside the event loop, since it uses call_soon() for scheduling, which only works inside the event loop. So your event loop doesn't get a notification from an external thread to wake up - it doesn't wake up.
I don't recommend using run_coroutine_threadsafe() because it will make your external thread dependent on the event loop: it will be blocked until the event loop executes your coroutine if you wait for the futurе. If you don't wait for the future or use call_soon_threadsafe(), the queue will actually update only after the event loop has handled your callback. This leaves the only reasonable solution, which is to use a queue that is both async-aware and thread-aware.
You can use Janus as a stable solution with a compatible API. It's still a maintained package that is known for many years. Just create an instance of janus.Queue and use its sync_q property in queue.Queue style and async_q in asyncio.Queue style.
from janus import Queue
queue = janus.Queue
queue.sync_q.put_nowait(42) # sync
item = queue.sync_q.get() # sync
item = await queue.async_q.get() # async
However, if your application is performance sensitive, it may not suit you, as API-level compatibility incurs additional costs. In that case, I can suggest aiologic.SimpleQueue (I'm the creator of aiologic), which has a different API but is also very fast.
from aiologic import SimpleQueue
queue = SimpleQueue()
queue.put(42) # sync
item = queue.green_get() # sync
item = await queue.async_get() # async
Unlike janus.Queue, it never creates additional tasks to notify an external thread.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With