Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is asyncio queue await get() blocking?

Why is await queue.get() blocking?

import asyncio

async def producer(queue, item):
    await queue.put(item)

async def consumer(queue):
    val = await queue.get()
    print("val = %d" % val)

async def main():
    queue = asyncio.Queue()
    await consumer(queue)
    await producer(queue, 1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

If I call the producer() before consumer(), it works fine That is to say, the following works fine.

async def main():
    queue = asyncio.Queue()
    await producer(queue, 1)
    await consumer(queue)

Why isn't await queue.get() yielding control back to the event loop so that the producer coroutine can run which will populate the queue so that queue.get() can return.

like image 464
Akshay Takkar Avatar asked May 30 '19 11:05

Akshay Takkar


People also ask

Is Asyncio queue thread safe?

Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.

What does Asyncio queue do?

asyncio provides an asynchronous queue implementation that lets us do this. We can add pieces of data into a queue and have several workers running concurrently, pulling data from the queue and processing it as it becomes available. These are commonly referred to as producer-consumer workflows.

How does the Asyncio event loop work?

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.

What is Asyncio sleep ()?

What is Asyncio sleep? The Sleep() Function Of Asyncio In Python The asyncio. sleep() method suspends the execution of a coroutine. Coroutines voluntarily yield CPU leading to co-operative multitasking through the await keyword.


2 Answers

You need to start the consumer and the producer in parallel, e.g. defining main like this:

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(consumer(queue), producer(queue, 1))

If for some reason you can't use gather, then you can do (the equivalent of) this:

async def main():
    queue = asyncio.Queue()
    asyncio.create_task(consumer(queue))
    asyncio.create_task(producer(queue, 1))
    await asyncio.sleep(100)  # what your program actually does

Why isn't await queue.get() yielding control back to the event loop so that the producer coroutine can run which will populate the queue so that queue.get() can return.

await queue.get() is yielding control back to the event loop. But await means wait, so when your main coroutine says await consumer(queue), that means "resume me once consumer(queue) has completed." Since consumer(queue) is itself waiting for someone to produce something, you have a classic case of deadlock.

Reversing the order works only because your producer is one-shot, so it immediately returns to the caller. If your producer happened to await an external source (such as a socket), you would have a deadlock there as well. Starting them in parallel avoids the deadlock regardless of how producer and consumer are written.

like image 73
user4815162342 Avatar answered Nov 08 '22 20:11

user4815162342


It's because you call await consumer(queue), which means the next line (procuder) will not be called until consumer returns, which it of course never does because nobody produced yet

check out the Example in the docs and see how they use it there: https://docs.python.org/3/library/asyncio-queue.html#examples

another simple example:

import asyncio
import random


async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print('consuming item {}...'.format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
like image 1
Adam.Er8 Avatar answered Nov 08 '22 20:11

Adam.Er8