I'm trying to find the maximum weight of about 6.1 billion (custom) items and I would like to do this with parallel processing. For my particular application there are better algorithms that don't require my iterating over 6.1 billion items, but the textbook that explains them is over my head and my boss wants this done in 4 days. I figured I have a better shot with my company's fancy server and parallel processing. However, everything I know about parallel processing comes from reading the Python documentation. Which is to say I'm pretty lost...
My current theory is to set up a feeder process, an input queue, a whole bunch (say, 30) of worker processes, and an output queue (finding the maximum element in the output queue will be trivial). What I don't understand is how the feeder process can tell the worker processes when to stop waiting for items to come through the input queue.
I had thought about using multiprocessing.Pool.map_async
on my iterable of 6.1E9 items, but it takes nearly 10 minutes just to iterate through the items without doing anything to them. Unless I'm misunderstanding something..., having map_async
iterate through them to assign them to processes could be done while the processes begin their work. (Pool
also provides imap
but the documentation says it's similar to map
, which doesn't appear to work asynchronously. I want asynchronous, right?)
Related questions: Do I want to use concurrent.futures
instead of multiprocessing
? I couldn't be the first person to implement a two-queue system (that's exactly how the lines at every deli in America work...) so is there a more Pythonic/built-in way to do this?
Here's a skeleton of what I'm trying to do. See the comment block in the middle.
import multiprocessing as mp
import queue
def faucet(items, bathtub):
"""Fill bathtub, a process-safe queue, with 6.1e9 items"""
for item in items:
bathtub.put(item)
bathtub.close()
def drain_filter(bathtub, drain):
"""Put maximal item from bathtub into drain.
Bathtub and drain are process-safe queues.
"""
max_weight = 0
max_item = None
while True:
try:
current_item = bathtub.get()
# The following line three lines are the ones that I can't
# quite figure out how to trigger without a race condition.
# What I would love is to trigger them AFTER faucet calls
# bathtub.close and the bathtub queue is empty.
except queue.Empty:
drain.put((max_weight, max_item))
return
else:
bathtub.task_done()
if not item.is_relevant():
continue
current_weight = item.weight
if current_weight > max_weight:
max_weight = current_weight
max_item = current_item
def parallel_max(items, nprocs=30):
"""The elements of items should have a method `is_relevant`
and an attribute `weight`. `items` itself is an immutable
iterator object.
"""
bathtub_q = mp.JoinableQueue()
drain_q = mp.Queue()
faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
worker_procs = mp.Pool(processes=nprocs)
faucet_proc.start()
worker_procs.apply_async(drain_filter, bathtub_q, drain_q)
finalists = []
for i in range(nprocs):
finalists.append(drain_q.get())
return max(finalists)
I found a very thorough answer to my question, and a gentle introduction to multitasking from Python Foundation communications director Doug Hellman. What I wanted was the "poison pill" pattern. Check it out here: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html
Props to @MRAB for posting the kernel of that concept.
Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.
Solution. The solution is simple: just use the terminate() method of multiprocess. Process .
Python multiprocessing Queue classPython Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.
You could put a special terminating item, such as None, into the queue. When a worker sees it, it can put it back for the other workers to see, and then terminate. Alternatively, you could put one special terminating item per worker into the queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With