I'm processing large CSV files (on the order of several GBs with 10M lines) using a Python script.
The files have different row lengths, and cannot be loaded fully into memory for analysis.
Each line is handled separately by a function in my script. It takes about 20 minutes to analyze one file, and it appears disk access speed is not an issue, but rather processing/function calls.
The code looks something like this (very straightforward). The actual code uses a Class structure, but this is similar:
csvReader = csv.reader(open("file","r") for row in csvReader: handleRow(row, dataStructure)
Given the calculation requires a shared data structure, what would be the best way to run the analysis in parallel in Python utilizing multiple cores?
In general, how do I read multiple lines at once from a .csv in Python to transfer to a thread/process? Looping with for
over the rows doesn't sound very efficient.
Thanks!
Process-Based Parallelism With this approach, it is possible to start several processes at the same time (concurrently). This way, they can concurrently perform calculations. Starting from Python 3, the multiprocessing package is preinstalled and gives us a convenient syntax for launching concurrent processes.
read_csv(chunksize) One way to process large files is to read the entries in chunks of reasonable size, which are read into the memory and are processed before reading the next chunk. We can use the chunk size parameter to specify the size of the chunk, which is the number of lines.
This might be too late, but just for future users I'll post anyway. Another poster mentioned using multiprocessing. I can vouch for it and can go into more detail. We deal with files in the hundreds of MB/several GB every day using Python. So it's definitely up to the task. Some of files we deal with aren't CSVs, so the parsing can be fairly complex and take longer than the disk access. However, the methodology is the same no matter what file type.
You can process pieces of the large files concurrently. Here's pseudo code of how we do it:
import os, multiprocessing as mp # process file function def processfile(filename, start=0, stop=0): if start == 0 and stop == 0: ... process entire file... else: with open(file, 'r') as fh: fh.seek(start) lines = fh.readlines(stop - start) ... process these lines ... return results if __name__ == "__main__": # get file size and set chuck size filesize = os.path.getsize(filename) split_size = 100*1024*1024 # determine if it needs to be split if filesize > split_size: # create pool, initialize chunk start location (cursor) pool = mp.Pool(cpu_count) cursor = 0 results = [] with open(file, 'r') as fh: # for every chunk in the file... for chunk in xrange(filesize // split_size): # determine where the chunk ends, is it the last one? if cursor + split_size > filesize: end = filesize else: end = cursor + split_size # seek to end of chunk and read next line to ensure you # pass entire lines to the processfile function fh.seek(end) fh.readline() # get current file location end = fh.tell() # add chunk to process pool, save reference to get results proc = pool.apply_async(processfile, args=[filename, cursor, end]) results.append(proc) # setup next chunk cursor = end # close and wait for pool to finish pool.close() pool.join() # iterate through results for proc in results: processfile_result = proc.get() else: ...process normally...
Like I said, that's only pseudo code. It should get anyone started who needs to do something similar. I don't have the code in front of me, just doing it from memory.
But we got more than a 2x speed up from this on the first run without fine tuning it. You can fine tune the number of processes in the pool and how large the chunks are to get an even higher speed up depending on your setup. If you have multiple files as we do, create a pool to read several files in parallel. Just be careful no to overload the box with too many processes.
Note: You need to put it inside an "if main" block to ensure infinite processes aren't created.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With