How do you return an item to a queue.Queue? This would be useful in threading or multiprocessing if the task fails, so that the task can not be lost.
The docs for queue.Queue.get() say that the function can "Remove and return an item from the queue," but I believe the use of the word "return" here refers to the function returning the item to the calling thread, not placing it back into the item queue. This is demonstrated by the below sample code just blocks infinitely on the main thread's second queue.Queue.get()
call, instead of making it to the print()
call in the thread.
import time
import threading
import queue
def threaded_func():
thread_task = myqueue.get()
print('thread_task: ' + thread_task)
myqueue = queue.Queue()
myqueue.put('some kind of task')
main_task = myqueue.get()
print('main_task: ' + main_task)
t = threading.Thread(target=threaded_func)
t.daemon = True
t.start()
time.sleep(5)
myqueue.get() # This blocks indefinitely
I have to believe that there is a simple way to put the task back, so what is it? Calling task_done()
and then put()
with the task to put it back into the queue in two operations is not atomic and so could result in a lost item.
One possible, but clunky, solution would be to just try to execute the task again, but then you'd have to add a few extra lines to handle that complexity and I'm not even sure that all failed tasks could necessarily recover in that way.
enqueue() uses . append() to add something to the end of the queue. And . dequeue() uses .
Queue is built-in module of Python which is used to implement a queue. queue. Queue(maxsize) initializes a variable to a maximum size of maxsize. A maxsize of zero '0' means a infinite queue. This Queue follows FIFO rule.
Removing an item from a queue in Python: To remove items from a queue in Python, we will be using the “get function”. See the Python Queue example given below. To make sure that our queue is empty, we can use the empty function which will return Boolean values.
Use a while loop to iterate through a queue in Python, e.g. while not q. empty(): . The loop checks if the queue is not empty and iterates as long as there are items in the queue.
Not all failed tasks can recover. You shouldn't retry them unless there is some reason to think they will pass at a later date. For instance, if your work item is a URL and connection failed count, you could implement some sort of a max-retries thing.
Your biggest problem is that you haven't implemented a viable worker model yet. You need 2 queues to have a bidirectional conversation with a worker. One to post work items and one to receive status. Once you have that, the receiver can always decide to cram that message back on the work queue. Here is an example with a lazy worker that just passes what its told.
import threading
import queue
def worker(in_q, out_q):
while True:
try:
task, data = in_q.get()
print('worker', task, data)
if task == "done":
return
elif task == "pass this":
out_q.put(("pass", data))
else:
out_q.put(("fail", data))
except Exception as e:
print('worker exception', e)
out_q.put("exception", data)
in_que = queue.Queue()
out_que = queue.Queue()
work_thread = threading.Thread(target=worker, args=(in_que, out_que))
work_thread.start()
# lets make every other task a fail
in_que.put(('pass this', 0))
in_que.put(('fail this', 1))
in_que.put(('pass this', 2))
in_que.put(('fail this', 3))
in_que.put(('pass this', 4))
in_que.put(('fail this', 5))
pending_tasks = 6
while pending_tasks:
status, data = out_que.get()
if status == "pass":
pending_tasks -= 1
else:
# make failing tast pass
in_que.put(('pass this', data))
in_que.put(("done", None))
work_thread.join()
print('done')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With