In the following code, I have two queues for running different kinds of threads. These threads add to each other's queues recursively (Queue 1 grabs some info, Queue 2 processes it and adds more to Queue 1).
I want to wait until all items in both queues are fully processed. Currently I'm using this code
queue.join()
out_queue.join()
The problem is when the first queue temporarily runs out of stuff to do, it closes out, so it never sees what the queue 2 (the out_queue) adds to it after that point.
I added in the time.sleep() function which is a very hacked fix, by 30s both queues have filled up enough to not run out.
What is the standard Python way of fixing this? Do I have to have just one queue, and tag items in it as to which thread they should be handled by?
queue = Queue.Queue()
out_queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue
def run(self):
while True:
row = self.queue.get()
request = urllib2.Request(row[0], None, req_headers)
# ... some processing ...
self.out_queue.put([row, http_status, page])
self.queue.task_done()
class DatamineThread(threading.Thread):
def __init__(self, out_queue, mysql):
threading.Thread.__init__(self)
self.out_queue = out_queue
self.mysql = mysql
def run(self):
while True:
row = self.out_queue.get()
# ... some processing ...
queue.put(newrow)
self.out_queue.task_done()
queue = Queue.Queue()
out_queue = Queue.Queue()
for i in range(URL_THREAD_COUNT):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for row in rows:
queue.put(row)
#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')
#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()
time.sleep(30)
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
Change the workers so that they need a sentinel value to exit, instead of exiting when they don't have any more work in the queue. In the following code the howdy
worker reads items from the input queue. If the value is the sentinel (None
, but it could be anything), the worker exits.
As a consequence, you don't need to mess with timeouts, which as you've found can be rather dodgy. Another consequence is that if you have N threads, you have to append N sentinels to the input queue to kill off your workers. Otherwise you'll wind up with a worker who will wait forever. A zombie worker, if you will.
import threading, Queue
def howdy(q):
for msg in iter(q.get, None):
print 'howdy,',msg
inq = Queue.Queue()
for word in 'whiskey syrup bitters'.split():
inq.put(word)
inq.put( None ) # tell worker to exit
thread = threading.Thread(target=howdy, args=[inq])
thread.start()
thread.join()
howdy, whiskey
howdy, syrup
howdy, bitters
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With