Suppose I have N generators gen_1, ..., gen_N
where each on them will yield the same number of values. I would like a generator gen
such that it runs gen_1, ..., gen_N in N parallel processes and yields (next(gen_1), next(gen_2), ... next(gen_N))
That is I would like to have:
def gen():
yield (next(gen_1), next(gen_2), ... next(gen_N))
in such a way that each gen_i is running on its own process. Is it possible to do this? I have tried doing this in the following dummy example with no success:
A = range(4)
def gen(a):
B = ['a', 'b', 'c']
for b in B:
yield b + str(a)
def target(g):
return next(g)
processes = [Process(target=target, args=(gen(a),)) for a in A]
for p in processes:
p.start()
for p in processes:
p.join()
However I get the error TypeError: cannot pickle 'generator' object
.
EDIT:
I have modified @darkonaut answer's a bit to fit my needs. I am posting it in case some of you find it useful. We first define a couple of utility functions:
from itertools import zip_longest
from typing import List, Generator
def grouper(iterable, n, fillvalue=iter([])):
"Collect data into fixed-length chunks or blocks"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def split_generators_into_batches(generators: List[Generator], n_splits):
chunks = grouper(generators, len(generators) // n_splits + 1)
return [zip_longest(*chunk) for chunk in chunks]
The following class is responsible for splitting any number of generators into n (number of processes) batches and proccessing them yielding the desired result:
import multiprocessing as mp
class GeneratorParallelProcessor:
SENTINEL = 'S'
def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
self.n_processes = n_processes
self.generators = split_generators_into_batches(list(generators), n_processes)
self.queue = mp.SimpleQueue()
self.barrier = mp.Barrier(n_processes + 1)
self.sentinels = [self.SENTINEL] * n_processes
self.processes = [
mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
]
def process(self):
for p in self.processes:
p.start()
while True:
results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
if results != self.sentinels:
yield results
self.barrier.wait()
else:
break
for p in self.processes:
p.join()
def _worker(self, barrier, queue, generator):
for x in generator:
queue.put(x)
barrier.wait()
queue.put(self.SENTINEL)
To use it just do the following:
parallel_processor = GeneratorParallelProcessor(generators)
for grouped_generator in parallel_processor.process():
output_handler(grouped_generator)
Is Paralleling Generators Safe? Generators designed to be paralleled are perfectly safe as long as you follow the manufacturer's instructions, use the proper required cables, and ensure that the generators you are using to parallel are compatible with each other as determined by the manufacturer.
When individual generators are coupled together, their engine speeds are locked into the overall speed of the entire system. Load Balance: The load shared by each generator determines the speed of its engine. In a parallel system, the entire load is shared by all generators.
Parallel generators provide the maximum output when the power demand peaks and minimum output when the load necessities are low. Hence, parallel generators increase the reliability of power systems and make them more efficient.
To put it in simpler terms, paralleling generators is connecting two generators (often the same brand, type, and size are necessary) together to provide double the wattage. This connection will work together in a way that increases the wattage amount that goes into the equipment or building you are powering.
Paralleling generators or running generators in parallel means that simply connect two generators in parallel to double the power output. Generally, it is easier to connect the generators having the same power output or the same brand generators together.
The first step is the make sure the two generators are rated the same for the amperage and voltage they produce. Attempting to connect two different generators that do not match can cause damage to one or both. The second step is to connect both generators using a parallel kit designed for that purpose.
Using generators in parallel makes it easier for you to service your set up. You can remove one from the system and service it while still getting power from the other. To sum up this section, you can 100% run generators in parallel and doing so has many advantages]
To sum up this section, two or even more, generators are run in apparel in power plants, industries, and ships to share the load. Besides, two generators are used for increasing power output, providing maintenance ease and reduce g of operational cost.
It's possible to get such an "Unified Parallel Generator (UPG)" (attempt to coin a name) with some effort, but as @jasonharper already mentioned, you definitely need to assemble the sub-generators within the child-processes, since a running generator can't be pickled.
The pattern below is re-usable with only the generator function gen()
being custom to this example. The design uses multiprocessing.SimpleQueue
for returning generator results to the parent and multiprocessing.Barrier
for synchronization.
Calling Barrier.wait()
will block the caller (thread in any process) until the number of specified parties
has called .wait()
, whereupon all threads currently waiting on the Barrier
get released simultaneously. The usage of Barrier
here ensures further generator-results are only started to be computed after the parent has received all results from an iteration, which might be desirable to keep overall memory consumption in check.
The number of parallel workers used equals the number of argument-tuples you provide within the gen_args_tuples
-iterable, so gen_args_tuples=zip(range(4))
will use four workers for example. See comments in code for further details.
import multiprocessing as mp
SENTINEL = 'SENTINEL'
def gen(a):
"""Your individual generator function."""
lst = ['a', 'b', 'c']
for ch in lst:
for _ in range(int(10e6)): # some dummy computation
pass
yield ch + str(a)
def _worker(i, barrier, queue, gen_func, gen_args):
for x in gen_func(*gen_args):
print(f"WORKER-{i} sending item.")
queue.put((i, x))
barrier.wait()
queue.put(SENTINEL)
def parallel_gen(gen_func, gen_args_tuples):
"""Construct and yield from parallel generators
build from `gen_func(gen_args)`.
"""
gen_args_tuples = list(gen_args_tuples) # ensure list
n_gens = len(gen_args_tuples)
sentinels = [SENTINEL] * n_gens
queue = mp.SimpleQueue()
barrier = mp.Barrier(n_gens + 1) # `parties`: + 1 for parent
processes = [
mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
for i, args in enumerate(gen_args_tuples)
]
for p in processes:
p.start()
while True:
results = [queue.get() for _ in range(n_gens)]
if results != sentinels:
results.sort()
yield tuple(r[1] for r in results) # sort and drop ids
barrier.wait() # all workers are waiting
# already, so this will unblock immediately
else:
break
for p in processes:
p.join()
if __name__ == '__main__':
for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
print(res)
Output:
WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')
Process finished with exit code 0
I went for a little different approach, you can modify the example below accordingly. So somewhere in the main script initialize the pool according to your needs, you need just this 2 lines
from multiprocessing import Pool
pool = Pool(processes=4)
then you can define a generator function like this: (Note that the generators input is assumed to be any iterable containing all the generators)
def parallel_generators(generators, pool):
results = ['placeholder']
while len(results) != 0:
batch = pool.map_async(next, generators) # defines the next round of values
results = list(batch.get) # actual calculation done here
yield results
return
We define the results condition in the while loop like this because map objects with next and generators return an empty list when the generators stop producing values. So at that point we just terminate the parallel generator.
EDIT
So apparently multiproccecing pool, and map don't play good with generators making the above code not work as intended so do not use until later update.
As for the pickle error it seems some bound functions do not support pickle which is needed in the multiprocessing library in order to transfer objects and functions, for a workaround the pathos mutliprocessing library uses dill which solves the need for pickle and is an option you might want to try, searching in Stack Overflow for your error you can also find some more complicated solutions with custom code for pickling the functions needed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With