Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avoiding race conditions in Python 3's multiprocessing Queues

I'm trying to find the maximum weight of about 6.1 billion (custom) items and I would like to do this with parallel processing. For my particular application there are better algorithms that don't require my iterating over 6.1 billion items, but the textbook that explains them is over my head and my boss wants this done in 4 days. I figured I have a better shot with my company's fancy server and parallel processing. However, everything I know about parallel processing comes from reading the Python documentation. Which is to say I'm pretty lost...

My current theory is to set up a feeder process, an input queue, a whole bunch (say, 30) of worker processes, and an output queue (finding the maximum element in the output queue will be trivial). What I don't understand is how the feeder process can tell the worker processes when to stop waiting for items to come through the input queue.

I had thought about using multiprocessing.Pool.map_async on my iterable of 6.1E9 items, but it takes nearly 10 minutes just to iterate through the items without doing anything to them. Unless I'm misunderstanding something..., having map_async iterate through them to assign them to processes could be done while the processes begin their work. (Pool also provides imap but the documentation says it's similar to map, which doesn't appear to work asynchronously. I want asynchronous, right?)

Related questions: Do I want to use concurrent.futures instead of multiprocessing? I couldn't be the first person to implement a two-queue system (that's exactly how the lines at every deli in America work...) so is there a more Pythonic/built-in way to do this?

Here's a skeleton of what I'm trying to do. See the comment block in the middle.

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try:
            current_item = bathtub.get()
        # The following line three lines are the ones that I can't
        # quite figure out how to trigger without a race condition.
        # What I would love is to trigger them AFTER faucet calls
        # bathtub.close and the bathtub queue is empty.
        except queue.Empty:
            drain.put((max_weight, max_item))
            return
        else:
            bathtub.task_done()
        if not item.is_relevant():
            continue
        current_weight = item.weight
        if current_weight > max_weight:
            max_weight = current_weight
            max_item = current_item

def parallel_max(items, nprocs=30):
    """The elements of items should have a method `is_relevant`
    and an attribute `weight`. `items` itself is an immutable
    iterator object.
    """
    bathtub_q = mp.JoinableQueue()
    drain_q = mp.Queue()

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
    worker_procs = mp.Pool(processes=nprocs)

    faucet_proc.start()
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q)

    finalists = []
    for i in range(nprocs):
        finalists.append(drain_q.get())

    return max(finalists)


HERE'S THE ANSWER

I found a very thorough answer to my question, and a gentle introduction to multitasking from Python Foundation communications director Doug Hellman. What I wanted was the "poison pill" pattern. Check it out here: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

Props to @MRAB for posting the kernel of that concept.

like image 311
wkschwartz Avatar asked May 15 '12 19:05

wkschwartz


People also ask

Is multiprocessing queue process safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.

How do you stop a multiprocess in Python?

Solution. The solution is simple: just use the terminate() method of multiprocess. Process .

How does multiprocessing queue work in Python?

Python multiprocessing Queue classPython Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.


1 Answers

You could put a special terminating item, such as None, into the queue. When a worker sees it, it can put it back for the other workers to see, and then terminate. Alternatively, you could put one special terminating item per worker into the queue.

like image 178
MRAB Avatar answered Oct 19 '22 13:10

MRAB