Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Queue work in python

the test code is very simple:

import threading, Queue
import time, random

class Worker(threading.Thread):
    def __init__(self, index, queue):
        threading.Thread.__init__(self)
        self.index = index
        self.queue = queue
    def run(self):
        while 1:
            time.sleep(random.random())
            item = self.queue.get()
            if item is None:
                break
            print "index:", self.index, "task", item, "finished"
            self.queue.task_done()

queue = Queue.Queue(0)
for i in range(2):
    Worker(i, queue).start()
for i in range(10):
    queue.put(i)
for i in range(2):
    queue.put(None)
print "Main OK"

and the result is a little different every time i run it, here is just one :

Main OK
index: 1 task 0 finished
index: 0 task 1 finished
index: 0 task 2 finished
index: 1 task 3 finished
index: 1 task 4 finished
index: 0 task 5 finished
index: 1 task 6 finished
index: 0 task 7 finished
index: 1 task 8 finished
index: 1 task 9 finished

IMO when the main thread is terminated, "Main OK" will be printed, then the first thread will be executed until it comes into time.sleep(random.random()), then the first thread will sleep, and the second thread will continue. same to the first thread, the second thread will sleep when run into time.sleep(random.random()), then the first thread will continue again. and it will print index:0 task 0 finished right after Main OK, but in reality what follows Main OK is index: 1... not index: 0...! why? and it seems that the Queue doesnt run as normal multi-thread, sometimes the same index thread will execute three times or more continuously! what intheworld does the Queue mechanism works? any help appreciated!

like image 320
Searene Avatar asked Apr 03 '12 08:04

Searene


3 Answers

There are no guarantees on which order the threads will execute. If there was a guarantee on order, it would involve heavy synchronization and hence be equivalent to serialized performance.

Also you should use queue.join() to wait (blocking) for all workers to finish.

like image 109
Preet Kukreti Avatar answered Oct 06 '22 17:10

Preet Kukreti


You have three threads; two Worker threads and one Main thread. All three are running at the same time, virtually. What you suggest (that clear switch at points you will know beforehand) is not true. You start the Worker threads before the Queue is filled, so they start running into their sleep() at once. Then you fill the Queue. Very likely one of the threads will leave its sleep() before the other does and get the first item from the Queue, will process it (print) and go into the next sleep() again. It is possible (due to random) that the first Worker will sleep 0.01s each time while the other is sleeping for 0.4s from the start, then all items will be processed by the first process.

If more than one Worker thread is blocking in the Queue.get() method (can only happen if both left their sleep() while the Queue isn't filled yet), you have no determinism on which Worker thread is woken up to process the item.

Your random sleep does not synchronize enough to have a clear to/fro-switch between the two workers because one might sleep so long with that random sleep that the other thread processes two items in the same time. Try this for a clear switch between the two processes all the time:

def run(self):
    if self.index == 0:
        time.sleep(0.1)
    while 1:
        time.sleep(0.2)
        item = self.queue.get()
        if item is None:
            break
        print "index:", self.index, "task", item, "finished"
        self.queue.task_done()
like image 44
Alfe Avatar answered Oct 06 '22 17:10

Alfe


While it's true that the order of thread running it's not guaranteed, you also have a time.sleep(random.random()) there.

>>> random.random()
0.044693605707810558
>>> random.random()
0.16270424255105465
>>> random.random()
0.74068552817650446
like image 31
Mihai Avatar answered Oct 06 '22 16:10

Mihai