Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Python, how can I wait until all items in multiple Queues are processed?

In the following code, I have two queues for running different kinds of threads. These threads add to each other's queues recursively (Queue 1 grabs some info, Queue 2 processes it and adds more to Queue 1).

I want to wait until all items in both queues are fully processed. Currently I'm using this code

queue.join()
out_queue.join()

The problem is when the first queue temporarily runs out of stuff to do, it closes out, so it never sees what the queue 2 (the out_queue) adds to it after that point.

I added in the time.sleep() function which is a very hacked fix, by 30s both queues have filled up enough to not run out.

What is the standard Python way of fixing this? Do I have to have just one queue, and tag items in it as to which thread they should be handled by?

queue = Queue.Queue()
out_queue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def run(self):
        while True:
            row = self.queue.get()
            
            request = urllib2.Request(row[0], None, req_headers)
            
            # ... some processing ...
            
            self.out_queue.put([row, http_status, page])
            
            self.queue.task_done()

class DatamineThread(threading.Thread):
    def __init__(self, out_queue, mysql):
        threading.Thread.__init__(self)
        self.out_queue = out_queue
        self.mysql = mysql

    def run(self):
        while True:
            row = self.out_queue.get()
            
            # ... some processing ...

            queue.put(newrow)
                        
            self.out_queue.task_done()

queue = Queue.Queue()
out_queue = Queue.Queue()

for i in range(URL_THREAD_COUNT):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

#populate queue with data
for row in rows:
    queue.put(row)

#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')

#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()

time.sleep(30)

#wait on the queue until everything has been processed
queue.join()
out_queue.join()
like image 688
Josh.F Avatar asked Sep 16 '14 20:09

Josh.F


1 Answers

Change the workers so that they need a sentinel value to exit, instead of exiting when they don't have any more work in the queue. In the following code the howdy worker reads items from the input queue. If the value is the sentinel (None, but it could be anything), the worker exits.

As a consequence, you don't need to mess with timeouts, which as you've found can be rather dodgy. Another consequence is that if you have N threads, you have to append N sentinels to the input queue to kill off your workers. Otherwise you'll wind up with a worker who will wait forever. A zombie worker, if you will.

source

import threading, Queue

def howdy(q):
    for msg in iter(q.get, None):
        print 'howdy,',msg

inq = Queue.Queue()
for word in 'whiskey syrup bitters'.split():
    inq.put(word)
inq.put( None )        # tell worker to exit

thread = threading.Thread(target=howdy, args=[inq])
thread.start()
thread.join()

output

howdy, whiskey
howdy, syrup
howdy, bitters
like image 189
johntellsall Avatar answered Sep 26 '22 03:09

johntellsall