Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Given N generators, is it possible to create a generator that runs them in parallel processes and yields the zip of those generators?

Suppose I have N generators gen_1, ..., gen_N where each on them will yield the same number of values. I would like a generator gen such that it runs gen_1, ..., gen_N in N parallel processes and yields (next(gen_1), next(gen_2), ... next(gen_N))

That is I would like to have:

def gen():
   yield (next(gen_1), next(gen_2), ... next(gen_N))

in such a way that each gen_i is running on its own process. Is it possible to do this? I have tried doing this in the following dummy example with no success:

A = range(4)

def gen(a):
    B = ['a', 'b', 'c']
    for b in B:
        yield b + str(a)

def target(g):
    return next(g)

processes = [Process(target=target, args=(gen(a),)) for a in A]

for p in processes:
    p.start()

for p in processes:
    p.join()

However I get the error TypeError: cannot pickle 'generator' object.

EDIT:

I have modified @darkonaut answer's a bit to fit my needs. I am posting it in case some of you find it useful. We first define a couple of utility functions:

from itertools import zip_longest
from typing import List, Generator


def grouper(iterable, n, fillvalue=iter([])):
    "Collect data into fixed-length chunks or blocks"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def split_generators_into_batches(generators: List[Generator], n_splits):
    chunks = grouper(generators, len(generators) // n_splits + 1)

    return [zip_longest(*chunk) for chunk in chunks]

The following class is responsible for splitting any number of generators into n (number of processes) batches and proccessing them yielding the desired result:

import multiprocessing as mp

class GeneratorParallelProcessor:
SENTINEL = 'S'

def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
    self.n_processes = n_processes
    self.generators = split_generators_into_batches(list(generators), n_processes)
    self.queue = mp.SimpleQueue()
    self.barrier = mp.Barrier(n_processes + 1)
    self.sentinels = [self.SENTINEL] * n_processes

    self.processes = [
        mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
    ]

def process(self):
    for p in self.processes:
        p.start()

    while True:
        results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
        if results != self.sentinels:
            yield results
            self.barrier.wait()
        else:
            break

    for p in self.processes:
        p.join()

def _worker(self, barrier, queue, generator):
    for x in generator:
        queue.put(x)
        barrier.wait()
    queue.put(self.SENTINEL)

To use it just do the following:

parallel_processor = GeneratorParallelProcessor(generators)

    for grouped_generator in parallel_processor.process():
        output_handler(grouped_generator)
like image 710
creyesk Avatar asked Oct 09 '20 23:10

creyesk


People also ask

Can generators be connected in parallel?

Is Paralleling Generators Safe? Generators designed to be paralleled are perfectly safe as long as you follow the manufacturer's instructions, use the proper required cables, and ensure that the generators you are using to parallel are compatible with each other as determined by the manufacturer.

How are two generators operated in parallel?

When individual generators are coupled together, their engine speeds are locked into the overall speed of the entire system. Load Balance: The load shared by each generator determines the speed of its engine. In a parallel system, the entire load is shared by all generators.

Why generators are operated in parallel explain briefly?

Parallel generators provide the maximum output when the power demand peaks and minimum output when the load necessities are low. Hence, parallel generators increase the reliability of power systems and make them more efficient.

What happens when you parallel generators?

To put it in simpler terms, paralleling generators is connecting two generators (often the same brand, type, and size are necessary) together to provide double the wattage. This connection will work together in a way that increases the wattage amount that goes into the equipment or building you are powering.

What is a parallel generator and how does it work?

Paralleling generators or running generators in parallel means that simply connect two generators in parallel to double the power output. Generally, it is easier to connect the generators having the same power output or the same brand generators together.

How do I connect two generators to one generator?

The first step is the make sure the two generators are rated the same for the amperage and voltage they produce. Attempting to connect two different generators that do not match can cause damage to one or both. The second step is to connect both generators using a parallel kit designed for that purpose.

Can You 100% run generators in parallel?

Using generators in parallel makes it easier for you to service your set up. You can remove one from the system and service it while still getting power from the other. To sum up this section, you can 100% run generators in parallel and doing so has many advantages]

Why do we run two generators in power plants?

To sum up this section, two or even more, generators are run in apparel in power plants, industries, and ships to share the load. Besides, two generators are used for increasing power output, providing maintenance ease and reduce g of operational cost.


2 Answers

It's possible to get such an "Unified Parallel Generator (UPG)" (attempt to coin a name) with some effort, but as @jasonharper already mentioned, you definitely need to assemble the sub-generators within the child-processes, since a running generator can't be pickled.

The pattern below is re-usable with only the generator function gen() being custom to this example. The design uses multiprocessing.SimpleQueue for returning generator results to the parent and multiprocessing.Barrier for synchronization.

Calling Barrier.wait() will block the caller (thread in any process) until the number of specified parties has called .wait(), whereupon all threads currently waiting on the Barrier get released simultaneously. The usage of Barrier here ensures further generator-results are only started to be computed after the parent has received all results from an iteration, which might be desirable to keep overall memory consumption in check.

The number of parallel workers used equals the number of argument-tuples you provide within the gen_args_tuples-iterable, so gen_args_tuples=zip(range(4)) will use four workers for example. See comments in code for further details.

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)

Output:

WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')

Process finished with exit code 0
like image 63
Darkonaut Avatar answered Nov 13 '22 08:11

Darkonaut


I went for a little different approach, you can modify the example below accordingly. So somewhere in the main script initialize the pool according to your needs, you need just this 2 lines

from multiprocessing import Pool

pool = Pool(processes=4)

then you can define a generator function like this: (Note that the generators input is assumed to be any iterable containing all the generators)

def parallel_generators(generators, pool):
results = ['placeholder']
while len(results) != 0:
    batch = pool.map_async(next, generators)  # defines the next round of values
    results = list(batch.get)  # actual calculation done here
    yield results
return 

We define the results condition in the while loop like this because map objects with next and generators return an empty list when the generators stop producing values. So at that point we just terminate the parallel generator.

EDIT

So apparently multiproccecing pool, and map don't play good with generators making the above code not work as intended so do not use until later update.

As for the pickle error it seems some bound functions do not support pickle which is needed in the multiprocessing library in order to transfer objects and functions, for a workaround the pathos mutliprocessing library uses dill which solves the need for pickle and is an option you might want to try, searching in Stack Overflow for your error you can also find some more complicated solutions with custom code for pickling the functions needed.

like image 33
jimakr Avatar answered Nov 13 '22 08:11

jimakr