Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do multiprocessing pools give every process the same number of tasks, or are they assigned as available?

Tags:

When you map an iterable to a multiprocessing.Pool are the iterations divided into a queue for each process in the pool at the start, or is there a common queue from which a task is taken when a process comes free?

    def generate_stuff():         for foo in range(100):              yield foo      def process(moo):         print moo      pool = multiprocessing.Pool()     pool.map(func=process, iterable=generate_stuff())     pool.close() 

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

like image 909
John Mee Avatar asked Nov 07 '12 06:11

John Mee


People also ask

How do processes pools work in multiprocessing?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.

What is a multiprocessing pool?

Functional Programming in Python In this lesson, you'll dive deeper into how you can use multiprocessing. Pool . It creates multiple Python processes in the background and spreads out your computations for you across multiple CPU cores so that they all happen in parallel without you needing to do anything.

How does multiprocessing process work?

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

What is the difference between pool and process in multiprocessing?

While the Process keeps all the processes in the memory, the Pool keeps only those that are under execution. Therefore, if you have a large number of tasks, and if they have more data and take a lot of space too, then using process class might waste a lot of memory. The overhead of creating a Pool is more.


1 Answers

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

Well, the obvious answer is to test it.

As-is, the test may not tell you much, because the jobs are going to finish ASAP, and it's possible that things will end up evenly distributed even if pooled processes grab jobs as they become ready. But there's an easy way to fix that:

import collections import multiprocessing import os import random import time  def generate_stuff():     for foo in range(100):         yield foo  def process(moo):     #print moo     time.sleep(random.randint(0, 50) / 10.)     return os.getpid()  pool = multiprocessing.Pool() pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1) pool.close() print collections.Counter(pids) 

If the numbers are "jagged", you know either that pooled processes must be grabbing new jobs as ready. (I explicitly set chunksize to 1 to make sure the chunks aren't so big that each only gets one chunk in the first place.)

When I run it on an 8-core machine:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9}) 

So, it looks like the processes are getting new jobs on the fly.

Since you specifically asked about 4 workers, I changed Pool() to Pool(4) and got this:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22}) 

However, there's an even better way to find out than by testing: read the source.

As you can see, map just calls map_async, which creates a bunch of batches and puts them on a self._taskqueue object (a Queue.Queue instance). If you read further, this queue isn't shared with the other processes directly, but there's a pool manager thread that, whenever a process finishes and returns a result, pops the next job off the queue and submits it back to the process.

This is also how you can find out what the default chunksize is for map. The 2.7 implementation linked above shows that it's just len(iterable) / (len(self._pool) * 4) rounded up (slightly more verbose than that to avoid fractional arithmetic)—or, put another way, just big enough for about 4 chunks per process. But you really shouldn't rely on this; the documentation vaguely and indirectly implies that it's going to use some kind of heuristic, but doesn't give you any guarantees as to what that will be. So, if you really need "about 4 chunks per process", calculate it explicitly. More realistically, if you ever need anything besides the default, you probably need a domain-specific value that you're going to work out (by calculation, guessing, or profiling).

like image 162
abarnert Avatar answered Oct 03 '22 09:10

abarnert