I am new to the futures module and have a task that could benefit from parallelization; but I don't seem to be able to figure out exactly how to setup the function for a thread and the function for a process. I'd appreciate any help anyone can shed on the matter.
I'm running a particle swarm optimization (PSO). Without getting into too much detail about PSO itself, here's the basic layout of my code:
There is a Particle
class, with a getFitness(self)
method (which computes some metric and stores it in self.fitness
). A PSO simulation has multiple particle instances (easily over 10; 100s or even 1000s for some simulations).
Every so often, I have to compute the fitness of the particles. Currently, I do this in for-loop:
for p in listOfParticles:
p.getFitness(args)
However, I notice that the fitness of each particle can be computed independently of each other. This makes this fitness computation a prime candidate for parallelization. Indeed, I could do map(lambda p: p.getFitness(args), listOfParticles)
.
Now, I can easily do this with futures.ProcessPoolExecutor
:
with futures.ProcessPoolExecutor() as e:
e.map(lambda p: p.getFitness(args), listOfParticles)
Since the side-effects of calling p.getFitness
are stored in each particle itself, I don't have to worry about getting a return from futures.ProcessPoolExecutor()
.
So far, so good. But now I notice that ProcessPoolExecutor
creates new processes, which means that it copies memory, which is slow. I'd like to be able to share memory - so I should be using threads. That's well and good, until I realize that running several processes with several threads inside each process will likely be faster, since multiple threads still run only on one processor of my sweet, 8-core machine.
Here's where I run into trouble:
Based on the examples I've seen, ThreadPoolExecutor
operates on a list
. So does ProcessPoolExecutor
. So I can't do anything iterative in ProcessPoolExecutor
to farm out to ThreadPoolExecutor
because then ThreadPoolExecutor
is going to get a single object to work on (see my attempt, posted below).
On the other hand, I cant slice listOfParticles
myself, because I want ThreadPoolExecutor
to do its own magic to figure out how many threads are required.
So, the big question (at long last):
How should I structure my code so that I can effectively parallelize the following using both processes AND threads:
for p in listOfParticles:
p.getFitness()
This is what I've been trying, but I wouldn't dare try to run it, for I know it won't work:
>>> def threadize(func, L, mw):
... with futures.ThreadpoolExecutor(max_workers=mw) as executor:
... for i in L:
... executor.submit(func, i)
...
>>> def processize(func, L, mw):
... with futures.ProcessPoolExecutor() as executor:
... executor.map(lambda i: threadize(func, i, mw), L)
...
I'd appreciate any thoughts on how to fix this, or even on how to improve my approach
In case it matters, I'm on python3.3.2
Perhaps the most important difference is the type of workers used by each class. As their names suggest, the ThreadPoolExecutor uses threads internally, whereas the ProcessPoolExecutor uses processes. A process has a main thread and may have additional threads. A thread belongs to a process.
ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.
The Python ThreadPoolExecutor allows you to create and manage thread pools in Python. Although the ThreadPoolExecutor has been available since Python 3.2, it is not widely used, perhaps because of misunderstandings of the capabilities and limitations of Threads in Python.
It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.
I'll give you working code that mixes processes with threads for solving the problem, but it's not what you're expecting ;-) First thing is to make a mock program that doesn't endanger your real data. Experiment with something harmless. So here's the start:
class Particle:
def __init__(self, i):
self.i = i
self.fitness = None
def getfitness(self):
self.fitness = 2 * self.i
Now we have something to play with. Next some constants:
MAX_PROCESSES = 3
MAX_THREADS = 2 # per process
CHUNKSIZE = 100
Fiddle those to taste. CHUNKSIZE
will be explained later.
The first surprise for you is what my lowest-level worker function does. That's because you're overly optimistic here:
Since the side-effects of calling p.getFitness are stored in each particle itself, I don't have to worry about getting a return from futures.ProcessPoolExecutor().
Alas, nothing done in a worker process can have any effect on the Particle
instances in your main program. A worker process works on copies of Particle
instances, whether via a copy-on-write implementation of fork()
or because it's working on a copy made from unpickling a Particle
pickle passed across processes.
So if you want your main program to see the fitness results, you need to arrange to send information back to the main program. Because I don't know enough about your actual program, here I'm assuming that Particle().i
is a unique integer, and that the main program can easily map integers back to Particle
instances. With that in mind, the lowest-level worker function here needs to return a pair: the unique integer and the fitness result:
def thread_worker(p):
p.getfitness()
return (p.i, p.fitness)
Given that, it's easy to spread a list of Particle
s across threads, and return a list of (particle_id, fitness)
results:
def proc_worker(ps):
import concurrent.futures as cf
with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
result = list(e.map(thread_worker, ps))
return result
Notes:
list()
to force e.map()
to materialize all the results in a list.It only remains to write code to spread a list of Particle
s across processes, and retrieve the results. This is dead easy to do with multiprocessing
, so that's what I'm going to use. I have no idea whether concurrent.futures
can do it (given that we're also mixing in threads), but don't care. But because I'm giving you working code, you can play with that and report back ;-)
if __name__ == "__main__":
import multiprocessing
particles = [Particle(i) for i in range(100000)]
# Note the code below relies on that particles[i].i == i
assert all(particles[i].i == i for i in range(len(particles)))
pool = multiprocessing.Pool(MAX_PROCESSES)
for result_list in pool.imap_unordered(proc_worker,
(particles[i: i+CHUNKSIZE]
for i in range(0, len(particles), CHUNKSIZE))):
for i, fitness in result_list:
particles[i].fitness = fitness
pool.close()
pool.join()
assert all(p.fitness == 2*p.i for p in particles)
Notes:
Particle
s into chunks "by hand". That's what CHUNKSIZE
is for. That's because a worker process wants a list of Particle
s to work on, and in turn that's because that's what the futures
map()
function wants. It's a Good Idea to chunk up work regardless, so you get some real bang for the buck in return for the per-invocation interprocess overheads.imap_unordered()
makes no guarantees about the order in which results are returned. That gives the implementation more freedom to arrange work as efficiently as possible. And we don't care about the order here, so that's fine.(particle_id, fitness)
results, and modifies the Particle
instances accordingly. Perhaps your real .getfitness
makes other mutations to Particle
instances - can't guess. Regardless, the main program will never see any mutations made in workers "by magic" - you have to explicitly arrange for that. In the limit, you could return (particle_id, particle_instance)
pairs instead, and replace the Particle
instances in the main program. Then they'd reflect all mutations made in worker processes.Have fun :-)
Turns out it was very easy to replace multiprocessing
. Here are the changes. This also (as mentioned earlier) replaces the original Particle
instances, so as to capture all mutations. There's a tradeoff here, though: pickling an instance requires "a lot more" bytes than pickling a single "fitness" result. More network traffic. Pick your poison ;-)
Returning the mutated instance just requires replacing the last line of thread_worker()
, like so:
return (p.i, p)
Then replace all of the "main" block with this:
def update_fitness():
import concurrent.futures as cf
with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
for result_list in e.map(proc_worker,
(particles[i: i+CHUNKSIZE]
for i in range(0, len(particles), CHUNKSIZE))):
for i, p in result_list:
particles[i] = p
if __name__ == "__main__":
particles = [Particle(i) for i in range(500000)]
assert all(particles[i].i == i for i in range(len(particles)))
update_fitness()
assert all(particles[i].i == i for i in range(len(particles)))
assert all(p.fitness == 2*p.i for p in particles)
The code is very similar to the multiprocessor
dance. Personally, I'd use the multiprocessing
version, because imap_unordered
is valuable. That's a problem with simplified interfaces: they often buy simplicity at the cost of hiding useful possibilities.
First, are you sure to leverage from running multiple thread while loading all your cores with processes? If it is cpu-bound, hardly yes. At least some tests has to be made.
If adding threads leverage your performance, the next question is whether one can achive better performance with hand-made load balancing, or automatic. By hand-made I mean careful workload partitioning into chunks of similar computational complexity and instatiating a new task processor per chunk, your orinal but doubted solution. By automatic, creation of pool of processes/threads and communication on work queue for new tasks, that one you strive for. In my view, first approach is one of Apache Hadoop paradigm, second is implemented by works queue processors, such as Celery. First approach may suffer from some tasks chunks being slower and running while others completed, second adds commutication and waiting-on-task overheads, and this is second point of performance tests to be made.
Last, if you wish to have a static collection of processes with multithreads within, AFAIK, you can't achive it with concurrent.futures
as is, and have to modify it a bit. I don't know, whether there are existing solutions for this task, but as concurrent
is a pure python solution (with no C code), it can easely be done. Work processor is defined in _adjust_process_count
routine of ProcessPoolExecutor
class, and subclassing and overriding it with multi-threaded approach is rather straigtforward, you just have to supply your custom _process_worker
, based on concurrent.features.thread
Original ProcessPoolExecutor._adjust_process_count
for reference:
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
p.start()
self._processes[p.pid] = p
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With