Errors in my producer/consumer coroutines may leave items and unfinished tasks in the asyncio queue. If an error is encountered, I want to simply stop the loop, cancel the pending tasks and clear the queue. While I can finish the first two things, I cannot find an easy way to clear the queue. After reading this answer, I come up with three methods:
import asyncio
q=asyncio.Queue()
for i in range(5):
q.put_nowait(i)
q.get_nowait()
loop=asyncio.get_event_loop()
#this will raise an error if q cannot join
loop.run_until_complete(asyncio.wait_for(q.join(),1))
#method 1
q._queue.clear()
q._finished.set()
q._unfinished_tasks = 0
#method 2
for _ in range(q.qsize()):
q.get_nowait()
for _ in range(q._unfinished_tasks):
q.task_done()
#method 3
del q
q=asyncio.Queue()
So which one is better?
To clear all items from a queue in Python: queue. clear() . The clear method will remove all elements from the queue.
Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.
A Task is created and scheduled for its execution through the asyncio. create_task() function. Once scheduled, a Task can be requested for cancellation through task. cancel() method.
Getting the Size of Queue in Python If you are using Queue from the queue module, you should use the qsize() function to get the number of items in your queue.
Avoid using the "private" methods. From the documentation of Queue.task_done
:
For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
def empty_queue(q: asyncio.Queue):
for _ in range(q.qsize()):
# Depending on your program, you may want to
# catch QueueEmpty
q.get_nowait()
q.task_done()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With