Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelize a sequence of generators

Assume I have Python stream-processing code that looks like this:

def F1(stream):
    for x in stream:
        yield f1(x)

def F2(stream):
    for x in stream:
        yield f2(x)

def F3(stream):
    for x in stream:
        yield f3(x)

def F4(stream):
    for x in stream:
        yield f4(x)


for x in F4(F3(F2(F1(range(1000000))))):
    print(x)

This is roughly equivalent to range 1000000 | F1 | F2 | F3 | F4 in Unix (assuming a range command), but in Unix each process in the pipe runs in parallel.

Is there a simple way to parallelize the Python code?

like image 586
Clément Avatar asked Dec 20 '13 20:12

Clément


People also ask

Can you link generators together?

Although individual portable generators can only put out so much power, it's possible to combine two of them to get double the wattage. This is called paralleling generators and is an easy workaround that will let you power larger loads like RV air conditioners with smaller units.

Is a generator a sequence?

Because internally sequences and generators are the same thing, you can freely mix the generator and sequence syntaxes, even when operating on the same object. This is not recommended however. Sequences (or generators) are always stored as 64-bit integer values, regardless of the database dialect.

What is meant by paralleling generators?

To put it in simpler terms, paralleling generators is connecting two generators (often the same brand, type, and size are necessary) together to provide double the wattage. This connection will work together in a way that increases the wattage amount that goes into the equipment or building you are powering.


1 Answers

You need pipes and blackmagic, Python has both.

from multiprocessing import Process, Pipe


def F1(stream):
    for x in stream:
        yield str(x)+'a'

def F2(stream):
    for x in stream:
        yield x+'b'

def F3(stream):
    for x in stream:
        yield x+'c'

def F4(stream):
    for x in stream:
        yield x+'d'



class PIPE_EOF:
    pass

class IterableConnection(object):
    def __init__(self, pipe):
        self.pipe = pipe

    def __iter__(self):
        return self

    def __next__(self):
        try:
            ret = self.pipe.recv()
            if ret == PIPE_EOF:
                raise StopIteration
            return ret
        except EOFError:
            raise StopIteration

    def next(self):
        return self.__next__()


def parallel_generator_chain(*args, **kwargs):
    if 'data' in kwargs:
        data = kwargs['data']
    else:
        raise RuntimeError('Missing "data" argument.')

    def decorator(func, _input, _output):
        def wrapper(*args, **kwargs):
            for item in func(_input):
                _output.send(item)
            _output.send(PIPE_EOF)
        return wrapper

    for func in args:
        in_end, out_end = Pipe(duplex = False)
        in_end = IterableConnection(in_end)
        func = decorator(func, data, out_end)
        p = Process(target = func)
        p.start()
        data = in_end

    for output in data:
        yield output



if 'xrange' not in globals():
    xrange = range


if __name__ == '__main__':
    for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000):
        print(x)

#for x in F4(F3(F2(F1(range(1000000))))):
#    print(x)
like image 185
smeso Avatar answered Oct 13 '22 02:10

smeso