I'm using the multiprocessing module to split up a very large task. It works for the most part, but I must be missing something obvious with my design, because this way it's very hard for me to effectively tell when all of the data has been processed.
I have two separate tasks that run; one that feeds the other. I guess this is a producer/consumer problem. I use a shared Queue between all processes, where the producers fill up the queue, and the consumers read from the queue and do the processing. The problem is that there is a finite amount of data, so at some point everyone needs to know that all of the data has been processed so the system can shut down gracefully.
It would seem to make sense to use the map_async() function, but since the producers are filling up the queue, I don't know all of the items up front, so I have to go into a while loop and use apply_async() and try to detect when everything is done with some sort of timeout...ugly.
I feel like I'm missing something obvious. How can this be better designed?
PRODCUER
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
CONSUMER
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an item
# if new records don't arrive in 10 seconds, process what you have
# and let the next process pick up more items.
record = queue.get(True, 10)
except Queue.Empty:
break
do_stuff_with_record(record)
MAIN
if __name__ == "__main__":
manager = multiprocessing.Manager()
consumer_queue = manager.Queue(1024*1024)
producer_queue = manager.Queue()
producer_items = xrange(0,10)
for item in producer_items:
producer_queue.put(item)
p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
p.start()
consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
Here is where it gets cheesy. I can't use map, because the list to consume is being filled up at the same time. So I have to go into a while loop and try to detect a timeout. The consumer_queue can become empty while the producers are still trying to fill it up, so I can't just detect an empty queue an quit on that.
timed_out = False
timeout= 1800
while 1:
try:
result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
if timed_out:
timed_out = False
except Queue.Empty:
if timed_out:
break
timed_out = True
time.sleep(timeout)
time.sleep(1)
consumer_queue.join()
consumer_pool.close()
consumer_pool.join()
I thought that maybe I could get() the records in the main thread and pass those into the consumer instead of passing the queue in, but I think I end up with the same problem that way. I still have to run a while loop and use apply_async() Thank you in advance for any advice!
You could use a manager.Event
to signal the end of the work. This event can be shared between all of your processes and then when you signal it from your main process the other workers can then gracefully shutdown.
while not event.is_set():
...rest of code...
So, your consumers would wait for the event to be set and handle the cleanup once it is set.
To determine when to set this flag you can do a join
on the producer threads and when those are all complete you can then join on the consumer threads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With