Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Should I bother locking the queue when I put to or get from it?

I've been gong through the tutorials about multithreading and queue in python3. As the official tutorial goes, "The Queue class in this module implements all the required locking semantics". But in another tutorial, I've seen an example as following:

import queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
   def __init__(self, threadID, name, q):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.q = q
   def run(self):
      print ("Starting " + self.name)
      process_data(self.name, self.q)
      print ("Exiting " + self.name)

def process_data(threadName, q):
   while not exitFlag:
      queueLock.acquire()
      if not workQueue.empty():
         data = q.get()
         queueLock.release()
         print ("%s processing %s" % (threadName, data))
      else:
         queueLock.release()
         time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
   thread = myThread(threadID, tName, workQueue)
   thread.start()
   threads.append(thread)
   threadID += 1

# Fill the queue
queueLock.acquire()
for word in nameList:
   workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
   pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
   t.join()
print ("Exiting Main Thread")
like image 568
x.w zhang Avatar asked May 27 '17 16:05

x.w zhang


Video Answer


1 Answers

I believe the tutorial you're following is a bad example of how to use Python's threadsafe queue. In particular, the tutorial is using the threadsafe queue in a way that unfortunately requires an extra lock. Indeed, this extra lock means that the threadsafe queue in the tutorial could be replaced with an old-fashioned non-threadsafe queue based on a simple list.

The reason that locking is needed is hinted at by the documentation for Queue.empty():

If empty() returns False it doesn't guarantee that a subsequent call to get() will not block.

The issue is that another thread could run in-between the call to empty() and the call to get(), stealing the item that empty() otherwise reported to exist. The tutorial probably uses the lock to ensure that the thread has exclusive access to the queue from the call to empty() until the call to get(). Without this lock, two threads could enter into the if-statement and both issue a call to get(), meaning that one of them could block, waiting for an item that will never be pushed.


Let me show you how to use the threadsafe queue properly. Instead of checking empty() first, just rely directly on the blocking behavior of get():

def process_data(threadName, q):
    while True:
        data = q.get()
        if exitFlag:
            break
        print("%s processing %s" % (threadName, data))

The queue's internal locking will ensure that two threads do not interfere for the duration of the call to get(), and no queueLock is needed. Note that the tutorial's original code would check exitFlag periodically every 1 second, whereas this modified queue requires you to push a dummy object into the queue after setting exitFlag to True -- otherwise, the flag will never be checked.

The last part of the controller code would need to be modified as follows:

# Notify threads it's time to exit
exitFlag = 1
for _ in range(len(threadList)):
    # Push a dummy element causing a single thread to wake-up and stop.
    workQueue.put(None)
# Wait for all threads to exit
for t in threads:
    t.join()

There is another issue with the tutorial's use of the threadsafe queue, namely that a busy-loop is used in the main thread when waiting for the queue to empty:

# Wait for queue to empty
while not workQueue.empty():
    pass

To wait for the queue to empty it would be better to use Queue.task_done() in the threads and then call Queue.join() in the main thread. At the end of the loop body in process_data(), call q.task_done(). In the main controller code, instead of the while-loop above, call q.join().

See also the example in the bottom of Python's documentation page on the queue module.


Alternatively, you can keep the queueLock and replace the threadsafe queue with a plain old list as follows:

  • Replace workQueue = queue.Queue(10) with workQueue = []
  • Replace if not workQueue.empty() with if len(workQueue) > 0
  • Replace workQueue.get() with workQueue.pop(0)
  • Replace workQueue.put(word) with workQueue.append(word)

Note that this doesn't preserve the blocking behavior of put() present in the original version.

like image 63
Mathias Rav Avatar answered Oct 16 '22 12:10

Mathias Rav