Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to put an item back to a queue.Queue

How do you return an item to a queue.Queue? This would be useful in threading or multiprocessing if the task fails, so that the task can not be lost.

The docs for queue.Queue.get() say that the function can "Remove and return an item from the queue," but I believe the use of the word "return" here refers to the function returning the item to the calling thread, not placing it back into the item queue. This is demonstrated by the below sample code just blocks infinitely on the main thread's second queue.Queue.get() call, instead of making it to the print() call in the thread.

import time
import threading
import queue


def threaded_func():
    thread_task = myqueue.get()
    print('thread_task: ' + thread_task)

myqueue = queue.Queue()
myqueue.put('some kind of task')
main_task = myqueue.get()
print('main_task: ' + main_task)

t = threading.Thread(target=threaded_func)
t.daemon = True
t.start()

time.sleep(5)
myqueue.get()   # This blocks indefinitely

I have to believe that there is a simple way to put the task back, so what is it? Calling task_done() and then put() with the task to put it back into the queue in two operations is not atomic and so could result in a lost item.

One possible, but clunky, solution would be to just try to execute the task again, but then you'd have to add a few extra lines to handle that complexity and I'm not even sure that all failed tasks could necessarily recover in that way.

like image 210
Shaun Avatar asked Apr 15 '17 04:04

Shaun


People also ask

How do I add items to the end of a queue in Python?

enqueue() uses . append() to add something to the end of the queue. And . dequeue() uses .

What is queue queue () in Python?

Queue is built-in module of Python which is used to implement a queue. queue. Queue(maxsize) initializes a variable to a maximum size of maxsize. A maxsize of zero '0' means a infinite queue. This Queue follows FIFO rule.

How do I remove an item from a queue in Python?

Removing an item from a queue in Python: To remove items from a queue in Python, we will be using the “get function”. See the Python Queue example given below. To make sure that our queue is empty, we can use the empty function which will return Boolean values.

How do you traverse a queue in Python?

Use a while loop to iterate through a queue in Python, e.g. while not q. empty(): . The loop checks if the queue is not empty and iterates as long as there are items in the queue.


1 Answers

Not all failed tasks can recover. You shouldn't retry them unless there is some reason to think they will pass at a later date. For instance, if your work item is a URL and connection failed count, you could implement some sort of a max-retries thing.

Your biggest problem is that you haven't implemented a viable worker model yet. You need 2 queues to have a bidirectional conversation with a worker. One to post work items and one to receive status. Once you have that, the receiver can always decide to cram that message back on the work queue. Here is an example with a lazy worker that just passes what its told.

import threading
import queue

def worker(in_q, out_q):
    while True:
        try:
            task, data = in_q.get()
            print('worker', task, data)
            if task == "done":
                return
            elif task == "pass this":
                out_q.put(("pass", data))
            else:
                out_q.put(("fail", data))
        except Exception as e:
            print('worker exception', e)
            out_q.put("exception", data)

in_que = queue.Queue()
out_que = queue.Queue()

work_thread = threading.Thread(target=worker, args=(in_que, out_que))
work_thread.start()

# lets make every other task a fail
in_que.put(('pass this', 0))
in_que.put(('fail this', 1))
in_que.put(('pass this', 2))
in_que.put(('fail this', 3))
in_que.put(('pass this', 4))
in_que.put(('fail this', 5))

pending_tasks = 6

while pending_tasks:
    status, data = out_que.get()
    if status == "pass":
        pending_tasks -= 1
    else:
        # make failing tast pass
        in_que.put(('pass this', data))

in_que.put(("done", None))
work_thread.join()
print('done')
like image 81
tdelaney Avatar answered Sep 28 '22 09:09

tdelaney