This is an extension of my recent question Avoiding race conditions in Python 3's multiprocessing Queues. Hopefully this version of the question is more specific.
TL;DR: In a multiprocessing model where worker processes are fed from a queue using multiprocessing.Queue
, why are my worker processes so idle? Each process has its own input queue so they're not fighting each other for a shared queue's lock, but the queues spend a lot of time actually just empty. The main process is running an I/O-bound thread -- is that slowing the CPU-bound filling of the input queues?
I'm trying to find the maximal element of the Cartesian product of N sets each with M_i elements (for 0 <= i < N) under a certain constraint. Recall that the elements of the Cartesian product are length-N tuples whose elements are are elements of the N sets. I'll call these tuples 'combinations' to emphasize the fact that I'm looping over every combination of the original sets. A combination meets the constraint when my function is_feasible
returns True
. In my problem, I'm trying to find the combination whose elements have the greatest weight: sum(element.weight for element in combination)
.
My problem size is large, but so is my company's server. I'm trying to rewrite the following serial algorithm as a parallel algorithm.
from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
"""Return the largest (total-weight, combination) tuple from all
possible combinations of the elements in the several sets, subject
to the constraint that is_feasible(combo) returns True."""
return max(
map(
lambda combination: (
sum(element.weight for element in combination),
combination
),
filter(
is_feasible, # Returns True if combo meets constraint
product(*sets)
)
),
key=itemgetter(0) # Only maximize based on sum of weight
)
My current multiprocessing approach is to create worker processes and feed them combinations with an input queue. When the workers receive a poison pill they place the best combination they've seen on an output queue and exit. I fill the input queue from the main thread of the main process. One advantage of this technique is that I can spawn a secondary thread from the main process to run a monitoring tool (just a REPL I can use to see how many combinations have been processed so far and how full the queues are).
+-----------+
in_q0 | worker0 |----\
/-------+-----------+ \
+-----------+ in_q1 +-----------+ \ out_q +-----------+
| main |-----------| worker1 |-----------| main |
+-----------+ +-----------+ / +-----------+
\-------+-----------+ /
in_q2 | worker2 |----/
+-----------+
I originally had all the workers reading from one input queue but found that none of them were hitting the CPU. Figuring that they were spending all their time waiting for queue.get() to unblock, I gave them their own queues. That increased pressure on the CPU, so I figured the workers were active more often. However, the queues spend most of their time empty! (I know this from the monitoring REPL I mentioned). This suggests to me that the main loop filling up the queues is slow. Here is that loop:
from itertools import cycle
main():
# (Create workers, each with its own input queue)
# Cycle through each worker's queue and add a combination to that queue
for combo, worker in zip(product(*sets), cycle(workers)):
worker.in_q.put(combo)
# (Collect results and return)
I'm guessing the bottleneck is worker.in_q.put()
. How do I make that faster? My first instinct was to make the workers slower, but that just doesn't make sense... Is the problem that the monitor thread is stopping the loop too often? How would I be able to tell?
Alternatively, is there another way to implement this that doesn't involve so much waiting on locks?
What do your elements look like? It could be that pickling them to put them in the queue is slow, which would obviously be a bottleneck. Note that each element is being independently pickled over and over and over again.
If this is the case, this approach might help:
pickle.dumps
once and then transmitting the same string to each worker, or possibly through shared memory or whatever else).product(my_A_subset, *other_sets)
(possibly ordered differently), polling for some kind of stop signal between each job (or every three jobs or whatever). This doesn't need to be through a queue, a one-bit shared-memory value works fine.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With