Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel processing of a large .csv file in Python

Tags:

I'm processing large CSV files (on the order of several GBs with 10M lines) using a Python script.

The files have different row lengths, and cannot be loaded fully into memory for analysis.

Each line is handled separately by a function in my script. It takes about 20 minutes to analyze one file, and it appears disk access speed is not an issue, but rather processing/function calls.

The code looks something like this (very straightforward). The actual code uses a Class structure, but this is similar:

csvReader = csv.reader(open("file","r") for row in csvReader:    handleRow(row, dataStructure) 

Given the calculation requires a shared data structure, what would be the best way to run the analysis in parallel in Python utilizing multiple cores?

In general, how do I read multiple lines at once from a .csv in Python to transfer to a thread/process? Looping with for over the rows doesn't sound very efficient.

Thanks!

like image 716
Ron Avatar asked Dec 08 '11 00:12

Ron


People also ask

Is Python good for parallel processing?

Process-Based Parallelism With this approach, it is possible to start several processes at the same time (concurrently). This way, they can concurrently perform calculations. Starting from Python 3, the multiprocessing package is preinstalled and gives us a convenient syntax for launching concurrent processes.

Can Python read large CSV files?

read_csv(chunksize) One way to process large files is to read the entries in chunks of reasonable size, which are read into the memory and are processed before reading the next chunk. We can use the chunk size parameter to specify the size of the chunk, which is the number of lines.


1 Answers

This might be too late, but just for future users I'll post anyway. Another poster mentioned using multiprocessing. I can vouch for it and can go into more detail. We deal with files in the hundreds of MB/several GB every day using Python. So it's definitely up to the task. Some of files we deal with aren't CSVs, so the parsing can be fairly complex and take longer than the disk access. However, the methodology is the same no matter what file type.

You can process pieces of the large files concurrently. Here's pseudo code of how we do it:

import os, multiprocessing as mp  # process file function def processfile(filename, start=0, stop=0):     if start == 0 and stop == 0:         ... process entire file...     else:         with open(file, 'r') as fh:             fh.seek(start)             lines = fh.readlines(stop - start)             ... process these lines ...      return results  if __name__ == "__main__":      # get file size and set chuck size     filesize = os.path.getsize(filename)     split_size = 100*1024*1024      # determine if it needs to be split     if filesize > split_size:          # create pool, initialize chunk start location (cursor)         pool = mp.Pool(cpu_count)         cursor = 0         results = []         with open(file, 'r') as fh:              # for every chunk in the file...             for chunk in xrange(filesize // split_size):                  # determine where the chunk ends, is it the last one?                 if cursor + split_size > filesize:                     end = filesize                 else:                     end = cursor + split_size                  # seek to end of chunk and read next line to ensure you                  # pass entire lines to the processfile function                 fh.seek(end)                 fh.readline()                  # get current file location                 end = fh.tell()                  # add chunk to process pool, save reference to get results                 proc = pool.apply_async(processfile, args=[filename, cursor, end])                 results.append(proc)                  # setup next chunk                 cursor = end          # close and wait for pool to finish         pool.close()         pool.join()          # iterate through results         for proc in results:             processfile_result = proc.get()      else:         ...process normally... 

Like I said, that's only pseudo code. It should get anyone started who needs to do something similar. I don't have the code in front of me, just doing it from memory.

But we got more than a 2x speed up from this on the first run without fine tuning it. You can fine tune the number of processes in the pool and how large the chunks are to get an even higher speed up depending on your setup. If you have multiple files as we do, create a pool to read several files in parallel. Just be careful no to overload the box with too many processes.

Note: You need to put it inside an "if main" block to ensure infinite processes aren't created.

like image 69
max Avatar answered Sep 19 '22 18:09

max