Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pandas df.iterrows() parallelization

I would like to parallelize the following code:

for row in df.iterrows():     idx = row[0]     k = row[1]['Chromosome']     start,end = row[1]['Bin'].split('-')      sequence = sequence_from_coordinates(k,1,start,end) #slow download form http      df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))     df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))     df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)])) 

I have tried to use multiprocessing.Pool() since each row can be processed independently, but I can't figure out how to share the DataFrame. I am also not sure that this is the best approach to do parallelization with pandas. Any help?

like image 342
alec_djinn Avatar asked Nov 01 '16 09:11

alec_djinn


People also ask

What does DF Iterrows () do?

Pandas DataFrame iterrows() Method The iterrows() method generates an iterator object of the DataFrame, allowing us to iterate each row in the DataFrame. Each iteration produces an index object and a row object (a Pandas Series object).

What does DF Iterrows return?

iterrows() is used to iterate over a pandas Data frame rows in the form of (index, series) pair. This function iterates over the data frame column, it will return a tuple with the column name and content in form of series.

What is the use of Iterrows () and Iteritems () Explain with proper examples?

This function returns each index value along with a series that contain the data in each row. iterrows() - used for iterating over the rows as (index, series) pairs. iteritems() - used for iterating over the (key, value) pairs. itertuples() - used for iterating over the rows as namedtuples.

What is better than Iterrows?

Vectorization is always the best choice. Pandas come with df. values() function to convert the data frame to a list of list format. It took 14 seconds to iterate through a data frame with 10 million records that are around 56x times faster than iterrows().


2 Answers

As @Khris said in his comment, you should split up your dataframe into a few large chunks and iterate over each chunk in parallel. You could arbitrarily split the dataframe into randomly sized chunks, but it makes more sense to divide the dataframe into equally sized chunks based on the number of processes you plan on using. Luckily someone else has already figured out how to do that part for us:

# don't forget to import import pandas as pd import multiprocessing  # create as many processes as there are CPUs on your machine num_processes = multiprocessing.cpu_count()  # calculate the chunk size as an integer chunk_size = int(df.shape[0]/num_processes)  # this solution was reworked from the above link. # will work even if the length of the dataframe is not evenly divisible by num_processes chunks = [df.iloc[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)] 

This creates a list that contains our dataframe in chunks. Now we need to pass it into our pool along with a function that will manipulate the data.

def func(d):    # let's create a function that squares every value in the dataframe    return d * d  # create our pool with `num_processes` processes pool = multiprocessing.Pool(processes=num_processes)  # apply our function to each chunk in the list result = pool.map(func, chunks) 

At this point, result will be a list holding each chunk after it has been manipulated. In this case, all values have been squared. The issue now is that the original dataframe has not been modified, so we have to replace all of its existing values with the results from our pool.

for i in range(len(result)):    # since result[i] is just a dataframe    # we can reassign the original dataframe based on the index of each chunk    df.iloc[result[i].index] = result[i] 

Now, my function to manipulate my dataframe is vectorized and would likely have been faster if I had simply applied it to the entirety of my dataframe instead of splitting into chunks. However, in your case, your function would iterate over each row of the each chunk and then return the chunk. This allows you to process num_process rows at a time.

def func(d):    for row in d.iterrow():       idx = row[0]       k = row[1]['Chromosome']       start,end = row[1]['Bin'].split('-')        sequence = sequence_from_coordinates(k,1,start,end) #slow download form http       d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))       d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))       d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))    # return the chunk!    return d 

Then you reassign the values in the original dataframe, and you have successfully parallelized this process.

How Many Processes Should I Use?

Your optimal performance is going to depend on the answer to this question. While "ALL OF THE PROCESSES!!!!" is one answer, a better answer is much more nuanced. After a certain point, throwing more processes at a problem actually creates more overhead than it's worth. This is known as Amdahl's Law. Again, we are fortunate that others have already tackled this question for us:

  1. Python multiprocessing's Pool process limit
  2. How many processes should I run in parallel?

A good default is to use multiprocessing.cpu_count(), which is the default behavior of multiprocessing.Pool. According to the documentation "If processes is None then the number returned by cpu_count() is used." That's why I set num_processes at the beginning to multiprocessing.cpu_count(). This way, if you move to a beefier machine, you get the benefits from it without having to change the num_processes variable directly.

like image 100
TheF1rstPancake Avatar answered Oct 16 '22 18:10

TheF1rstPancake


A faster way (about 10% in my case):

Main differences to accepted answer: use pd.concat and np.array_split to split and join the dataframre.

import multiprocessing import numpy as np   def parallelize_dataframe(df, func):     num_cores = multiprocessing.cpu_count()-1  #leave one free to not freeze machine     num_partitions = num_cores #number of partitions to split dataframe     df_split = np.array_split(df, num_partitions)     pool = multiprocessing.Pool(num_cores)     df = pd.concat(pool.map(func, df_split))     pool.close()     pool.join()     return df 

where func is the function you want to apply to df. Use partial(func, arg=arg_val) for more that one argument.

like image 34
ic_fl2 Avatar answered Oct 16 '22 18:10

ic_fl2