Assume I have Python stream-processing code that looks like this:
def F1(stream):
for x in stream:
yield f1(x)
def F2(stream):
for x in stream:
yield f2(x)
def F3(stream):
for x in stream:
yield f3(x)
def F4(stream):
for x in stream:
yield f4(x)
for x in F4(F3(F2(F1(range(1000000))))):
print(x)
This is roughly equivalent to range 1000000 | F1 | F2 | F3 | F4
in Unix (assuming a range
command), but in Unix each process in the pipe runs in parallel.
Is there a simple way to parallelize the Python code?
Although individual portable generators can only put out so much power, it's possible to combine two of them to get double the wattage. This is called paralleling generators and is an easy workaround that will let you power larger loads like RV air conditioners with smaller units.
Because internally sequences and generators are the same thing, you can freely mix the generator and sequence syntaxes, even when operating on the same object. This is not recommended however. Sequences (or generators) are always stored as 64-bit integer values, regardless of the database dialect.
To put it in simpler terms, paralleling generators is connecting two generators (often the same brand, type, and size are necessary) together to provide double the wattage. This connection will work together in a way that increases the wattage amount that goes into the equipment or building you are powering.
You need pipes and blackmagic, Python has both.
from multiprocessing import Process, Pipe
def F1(stream):
for x in stream:
yield str(x)+'a'
def F2(stream):
for x in stream:
yield x+'b'
def F3(stream):
for x in stream:
yield x+'c'
def F4(stream):
for x in stream:
yield x+'d'
class PIPE_EOF:
pass
class IterableConnection(object):
def __init__(self, pipe):
self.pipe = pipe
def __iter__(self):
return self
def __next__(self):
try:
ret = self.pipe.recv()
if ret == PIPE_EOF:
raise StopIteration
return ret
except EOFError:
raise StopIteration
def next(self):
return self.__next__()
def parallel_generator_chain(*args, **kwargs):
if 'data' in kwargs:
data = kwargs['data']
else:
raise RuntimeError('Missing "data" argument.')
def decorator(func, _input, _output):
def wrapper(*args, **kwargs):
for item in func(_input):
_output.send(item)
_output.send(PIPE_EOF)
return wrapper
for func in args:
in_end, out_end = Pipe(duplex = False)
in_end = IterableConnection(in_end)
func = decorator(func, data, out_end)
p = Process(target = func)
p.start()
data = in_end
for output in data:
yield output
if 'xrange' not in globals():
xrange = range
if __name__ == '__main__':
for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000):
print(x)
#for x in F4(F3(F2(F1(range(1000000))))):
# print(x)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With