Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Proper way to clear an asyncio queue in python3?

Errors in my producer/consumer coroutines may leave items and unfinished tasks in the asyncio queue. If an error is encountered, I want to simply stop the loop, cancel the pending tasks and clear the queue. While I can finish the first two things, I cannot find an easy way to clear the queue. After reading this answer, I come up with three methods:

import asyncio

q=asyncio.Queue()
for i in range(5):
    q.put_nowait(i)
q.get_nowait()

loop=asyncio.get_event_loop()

#this will raise an error if q cannot join
loop.run_until_complete(asyncio.wait_for(q.join(),1))

#method 1
q._queue.clear()
q._finished.set()
q._unfinished_tasks = 0

#method 2
for _ in range(q.qsize()):
    q.get_nowait()
for _ in range(q._unfinished_tasks):
    q.task_done()

#method 3
del q
q=asyncio.Queue()

So which one is better?

like image 406
lovetl2002 Avatar asked Sep 10 '17 12:09

lovetl2002


People also ask

How do I empty a queue in Python?

To clear all items from a queue in Python: queue. clear() . The clear method will remove all elements from the queue.

Is Asyncio queue thread safe?

Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.

How do I stop Asyncio tasks?

A Task is created and scheduled for its execution through the asyncio. create_task() function. Once scheduled, a Task can be requested for cancellation through task. cancel() method.

How do I find the size of a python queue?

Getting the Size of Queue in Python If you are using Queue from the queue module, you should use the qsize() function to get the number of items in your queue.


1 Answers

Avoid using the "private" methods. From the documentation of Queue.task_done:

For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.


def empty_queue(q: asyncio.Queue):
  for _ in range(q.qsize()):
    # Depending on your program, you may want to
    # catch QueueEmpty
    q.get_nowait()
    q.task_done()

like image 157
coxley Avatar answered Oct 13 '22 14:10

coxley