Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing writing to pandas dataframe

So what I am trying to do with the following code is to read a list of lists and put them through function called checker and then have log_result deal with the result of the function checker. I am trying to do this using multithreading because the variable name rows_to_parse in reality has millions of rows, so using multiple cores should speed up this process by a considerable amount.

The code at present moment doesn't work and crashes Python.

Concerns and Issues I have:

  1. Want the existing df which held in the variable df to maintain the index throughout process because otherwise log_result will get confused as to which row needs updating.
  2. I am quite certain that apply_async is not the appropriate multiprocessing function to perform this duty because I believe the order at which the computer reads and writes the df can possibly corrupt it???
  3. I think that a queue may need to be set up to write and read df but I am unsure as to how I would go about doing that.

Thank you for any assistance.

import pandas as pd
import multiprocessing
from functools import partial

def checker(a,b,c,d,e):
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
    index_of_match = match.index.tolist()
    if len(index_of_match) == 1: #one match in df
        return index_of_match
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
        return [index_of_match[0]]
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
        return [a,b,c,d,e]



def log_result(result, dataf):
    if len(result) == 1: #
        dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df
        new_row = pd.DataFrame([result],columns=cols)
        dataf = dataf.append(new_row,ignore_index=True)


def apply_async_with_callback(parsing_material, dfr):
    pool = multiprocessing.Pool()
    for var_a, var_b, var_c, var_d, var_e in parsing_material:
        pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
    pool.close()
    pool.join()



if __name__ == '__main__':
    #setting up main dataframe
    cols = ['a','b','c','d','e']
    existing_data = [["YES","A","16052011","13031999",3],
                    ["NO","Q","11022003","15081999",3],
                    ["YES","A","22082010","03012001",9]]

    #main dataframe
    df = pd.DataFrame(existing_data,columns=cols)

    #new data
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
                    ['YES', 'W', '17061992', '26032012', 6],
                    ['YES', 'G', '01122006', '07082014', 2],
                    ['YES', 'N', '06081992', '21052008', 9],
                    ['YES', 'Y', '18051995', '24011996', 6],
                    ['NO', 'Q', '11022003', '15081999', 3],
                    ['NO', 'O', '20112004', '28062008', 0],
                    ['YES', 'R', '10071994', '03091996', 8],
                    ['NO', 'C', '09091998', '22051992', 1],
                    ['YES', 'Q', '01051995', '02012000', 3],
                    ['YES', 'Q', '26022015', '26092007', 5],
                    ['NO', 'F', '15072002', '17062001', 8],
                    ['YES', 'I', '24092006', '03112003', 2],
                    ['YES', 'A', '22082010', '03012001', 9],
                    ['YES', 'I', '15072016', '30092005', 7],
                    ['YES', 'Y', '08111999', '02022006', 3],
                    ['NO', 'V', '04012016', '10061996', 1],
                    ['NO', 'I', '21012003', '11022001', 6],
                    ['NO', 'P', '06041992', '30111993', 6],
                    ['NO', 'W', '30081992', '02012016', 6]]


    apply_async_with_callback(rows_to_parse, df)
like image 904
user3374113 Avatar asked Nov 05 '15 16:11

user3374113


People also ask

How does Python handle multiprocessing?

While using multiprocessing in Python, Pipes acts as the communication channel. Pipes are helpful when you want to initiate communication between multiple processes. They return two connection objects, one for each end of the Pipe, and use the send() & recv() methods to communicate.

Does multiprocessing work in Python?

The Python language allows for something called multiprocess, a term that describes the act of running many processes simultaneously. With it, you can write a program and assign many tasks to be completed at the same time, saving time and energy.

Is multiprocessing faster in Python?

Using multiprocessing won't make the program any faster. Another use case for threading is programs that are IO bound or network bound, such as web-scrapers. In this case, multiple threads can take care of scraping multiple webpages in parallel.


1 Answers

Updating DataFrames like this in MultiProcessing isn't going to work:

dataf = dataf.append(new_row,ignore_index=True)

For one thing this is very inefficient (O(n) for each append so O(n^2) in total. The preferred way is to concat some objects together in one pass.

For another, and more importantly, dataf is not locking for each update, so there's no guarantee that two operations won't conflict (I'm guessing this is crashing python).

Finally, append doesn't act in place, so the variable dataf is discarded once the callback is finished!! and no changes are made to the parent dataf.


We could use MultiProcessing list or a dict. list if you don't care about order or dict if you do (e.g. enumerate), as you must note that the values are returned not in a well-defined order from async.
(or we could create an object which implements Lock ourselves, see Eli Bendersky.)
So the following changes are made:

df = pd.DataFrame(existing_data,columns=cols)
# becomes
df = pd.DataFrame(existing_data,columns=cols)
d = MultiProcessing.list([df])

dataf = dataf.append(new_row,ignore_index=True)
# becomes
d.append(new_row)

Now, once the async has finished you have a MultiProcessing.list of DataFrames. You can concat these (and ignore_index) to get the desired result:

pd.concat(d, ignore_index=True)

Should do the trick.


Note: creating the newrow DataFrame at each stage is also less efficient that letting pandas parse the list of lists directly to a DataFrame in one go. Hopefully this is a toy example, really you want your chunks to be quite large to get wins with MultiProcessing (I've heard 50kb as a rule-of-thumb...), a row at a time is never going to be a win here.


Aside: You should avoid using globals (like df) in your code, it's much cleaner to pass them around in your functions (in this case, as an argument to checker).

like image 198
Andy Hayden Avatar answered Sep 27 '22 21:09

Andy Hayden