Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I parallelize a pipeline of generators/iterators in Python?

Suppose I have some Python code like the following:

input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

This code reads each line from the input file, runs it through several functions, and writes the output to the output file. Now I know that the functions process_line, process_item, and generate_output_line will never interfere with each other, and let's assume that the input and output files are on separate disks, so that reading and writing will not interfere with each other.

But Python probably doesn't know any of this. My understanding is that Python will read one line, apply each function in turn, and write the result to the output, and then it will read the second line only after sending the first line to the output, so that the second line does not enter the pipeline until the first one has exited. Do I understand correctly how this program will flow? If this is how it works, is there any easy way to make it so that multiple lines can be in the pipeline at once, so that the program is reading, writing, and processing each step in parallel?

like image 406
Ryan C. Thompson Avatar asked Apr 16 '11 06:04

Ryan C. Thompson


People also ask

Can you parallelize in Python?

There are several common ways to parallelize Python code. You can launch several application instances or a script to perform jobs in parallel. This approach is great when you don't need to exchange data between parallel jobs.

How do you parallelize a process?

How parallel processing works. Typically a computer scientist will divide a complex task into multiple parts with a software tool and assign each part to a processor, then each processor will solve its part, and the data is reassembled by a software tool to read the solution or execute the task.

Are generators faster than iterators?

Generators are not faster than iterators. Generators are iterators. Usually generator functions are actually slower, but more memory efficient.

What is the relationship between generators and iterators?

Iterators are the objects that use the next() method to get the next value of the sequence. A generator is a function that produces or yields a sequence of values using a yield statement. Classes are used to Implement the iterators. Functions are used to implement the generator.


2 Answers

You can't really parallelize reading from or writing to files; these will be your bottleneck, ultimately. Are you sure your bottleneck here is CPU, and not I/O?

Since your processing contains no dependencies (according to you), it's trivially simple to use Python's multiprocessing.Pool class.

There are a couple ways to write this, but the easier w.r.t. debugging is to find independent critical paths (slowest part of the code), which we will make run parallel. Let's presume it's process_item.

…And that's it, actually. Code:

import multiprocessing.Pool

p = multiprocessing.Pool() # use all available CPUs

input = open("input.txt")
x = (process_line(line) for line in input)
y = p.imap(process_item, x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

I haven't tested it, but this is the basic idea. Pool's imap method makes sure results are returned in the right order.

like image 95
Samat Jain Avatar answered Nov 03 '22 18:11

Samat Jain


is there any easy way to make it so that multiple lines can be in the pipeline at once

I wrote a library to do just this: https://github.com/michalc/threaded-buffered-pipeline, that iterates over each iterable in a separate thread.

So what was

input = open("input.txt")

x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)

output = open("output.txt", "w")
output.writelines(z)

becomes

from threaded_buffered_pipeline import buffered_pipeline

input = open("input.txt")

buffer_iterable = buffered_pipeline()
x = buffer_iterable((process_line(line) for line in input))
y = buffer_iterable((process_item(item) for item in x))
z = buffer_iterable((generate_output_line(item) + "\n" for item in y))

output = open("output.txt", "w")
output.writelines(z)

How much actual parallelism this adds depends on what's actually happening in each iterable, and how many CPU cores you have/how busy they are.

The classic example is the Python GIL: if each step is fairly CPU heavy, and just uses Python, then not much parallelism would be added, and this might not be faster than the serial version. On the other hand, if each is network IO heavy, then I think it's likely to be faster.

like image 20
Michal Charemza Avatar answered Nov 03 '22 20:11

Michal Charemza