Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

The correct implementation of queue.Queue when multithreading in Python

I am learning python and wrote some simple scripts to give myself practical examples of various topics. One such is this script to demonstrate how queue.Queue() can be used with threading.Thread() to create back ground workers. It acts quite strangely though. I ran some time trials. With just one thread it does as you would expect... it takes roughly 2 seconds per task taking (actually just under??) 40 seconds to complete the 20 tasks. With four threads it does again as you would expect. It does the tasks 4 at a time and so takes around 10 secs. So how an earth when I run 20 threads does it take 0.01 seconds (1 s.f.) --- surely it must take 2 secs???

Here is the code:

import threading
from queue import Queue
import time

q = Queue()
tLock = threading.Lock()

def run() :

    while True :
        task = q.get()  
        print('Just finished task number',task)
        q.task_done()   
        time.sleep(2)

def main() :
    # worker threads are activated  
    for x in range(20) :
        t = threading.Thread(target=run)
        t.daemon = True
        t.start()
    #20 jobs are put in the queue   
    for x in range(1,21) :
        q.put(x)
    #waits until queue is empty and then continues
    q.join()

if __name__ == '__main__' :
    startTime = time.time()
    main()
    print('Time taken was', time.time() - startTime)
like image 973
Sam Redway Avatar asked Apr 10 '26 16:04

Sam Redway


1 Answers

You're not actually blocking the progress of the main thread:

The "proper"(*) way is to make sure all threads are done, by joining all threads:

def main() :
    # keep reference to threads
    threads = [threading.Thread(target=run) for _ in range(20)]
    # start all threads
    for t in threads:
        t.start()

    #20 jobs are put in the queue   
    for x in range(1,21) :
        q.put(x)
    #waits until queue is empty and then continues
    q.join()
    # join all threads
    for t in threads:
        t.join()

*But, this won't work as your threads are in an infinite loop, even tasks are done.

So another way is to make sure you wait before you report back the task:

def run() :
    while True :
        task = q.get()  
        # simulate processing time *before* actual reporting
        time.sleep(2)
        print('Just finished task number',task)  
        q.task_done()

Still, threads are remained blocked. What you chould have is a message to threads telling them to quit. Something like:

def run() :
    while True :
        task = q.get()
        if task == 'stop':
            break  
        # simulate processing time *before* actual reporting
        time.sleep(2)
        print('Just finished task number',task)  
        q.task_done()

and now simply tell the main thread to put enough stop messages for all threads to finally quit their infinite loop:

def main() :
    # keep reference to threads
    threads = [threading.Thread(target=run) for _ in range(20)]
    # start all threads
    for t in threads:
        t.start()

    #20 jobs are put in the queue   
    for x in range(1,21):
        q.put(x)

    for x in range(20):
        # stop all threads after tasks are done
        q.put('stop')

    # waits until queue is empty and then continues
    q.join()

    # join all threads
    for t in threads:
        t.join()

Tip: You shouldn't use "magic numbers" such as 20. Have a global variable in the module level named THREADS_COUNT so you only have to change one place when you want to test different configrations.

like image 55
Reut Sharabani Avatar answered Apr 13 '26 06:04

Reut Sharabani



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!