Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python read csv files in parallel and concatenate the dataframe

I have an application which would read say 50 large size csvs file around 400MB each. Now i am reading these to create a dataframe and ultimately concatenate all these into 1 single dataframe. I want to do that in parallel to speed up the overall process. So my code below looks something like this:

import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool
from time import time 

Class dataProvider:
    def __init__(self):
        self.df=pd.DataFrame()
        self.pool = ThreadPool(processes=40)
        self.df_abc=pd.DataFrame()
        self.df_xyz=pd.DataFrame()
        self.start=time()

     def get_csv_data(self,filename):
        return pd.read_csv(filename)

     def get_all_csv_data(self,filename):
         self.start=time()
         df_1 = self.pool.apply_sync(self.get_csv_data,('1.csv',), callback=concatDf)
         df_2 = self.pool.apply_sync(self.get_csv_data,('2.csv',), callback=concatDf)
         total_time=time()-self.start

     def concatDf(self):
         self.df_abc=pd.concat([df_1,df_2])
         self.df_xyz=self.df_abc.iloc[:,1:]
         return self.df_xyz

I see below problem with the code:

  1. If the same callback is invoked by my apply_sync call then how do i know present callback has been invoked by exactly which call in above df_1 line or df_2 ? 2)I want to concatenate the output of the different apply_sync, how can i do it in concatDf callback function?
  2. How do i know that callbacks of all apply_sync call has completed so that i can return back concatenated dataframe all 50 csvs ?
  3. Is there a better and efficient way to do this ?

Thanks

like image 995
Invictus Avatar asked Jun 28 '26 15:06

Invictus


1 Answers

Edit: Use this solution only if you have enough RAM available.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import pandas as pd
from glob import glob 

files = glob("*.csv")

def read_file(file):
    return pd.read_csv(file)

# I would recommend to try out whether ThreadPoolExecutor or 
# ProcessPoolExecutor is faster on your system:
with ThreadPoolExecutor(4) as pool:
    df = pd.concat(pool.map(read_file, files))
print(df)
like image 119
mrzo Avatar answered Jul 01 '26 05:07

mrzo