From one of our client's requirement, I have to develop an application which should be able to process huge CSV files. File size could be in the range of 10 MB - 2GB in size.
Depending on size, module decides whether to read the file using Multiprocessing pool
or by using normal CSV reader
.
But from observation, multi processing
taking longer time than normal CSV reading
when tested both the modes for a file with size of 100 MB.
Is this correct behaviour? OR Am I doing something wrong?
Here is my code:
def set_file_processing_mode(self, fpath):
""" """
fsize = self.get_file_size(fpath)
if fsize > FILE_SIZE_200MB:
self.read_in_async_mode = True
else:
self.read_in_async_mode = False
def read_line_by_line(self, filepath):
"""Reads CSV line by line"""
with open(filepath, 'rb') as csvin:
csvin = csv.reader(csvin, delimiter=',')
for row in iter(csvin):
yield row
def read_huge_file(self, filepath):
"""Read file in chunks"""
pool = mp.Pool(1)
for chunk_number in range(self.chunks): #self.chunks = 20
proc = pool.apply_async(read_chunk_by_chunk,
args=[filepath, self.chunks, chunk_number])
reader = proc.get()
yield reader
pool.close()
pool.join()
def iterate_chunks(self, filepath):
"""Read huge file rows"""
for chunklist in self.read_huge_file(filepath):
for row in chunklist:
yield row
@timeit #-- custom decorator
def read_csv_rows(self, filepath):
"""Read CSV rows and pass it to processing"""
if self.read_in_async_mode:
print("Reading in async mode")
for row in self.iterate_chunks(filepath):
self.process(row)
else:
print("Reading in sync mode")
for row in self.read_line_by_line(filepath):
self.process(row)
def process(self, formatted_row):
"""Just prints the line"""
self.log(formatted_row)
def read_chunk_by_chunk(filename, number_of_blocks, block):
'''
A generator that splits a file into blocks and iterates
over the lines of one of the blocks.
'''
results = []
assert 0 <= block and block < number_of_blocks
assert 0 < number_of_blocks
with open(filename) as fp :
fp.seek(0,2)
file_size = fp.tell()
ini = file_size * block / number_of_blocks
end = file_size * (1 + block) / number_of_blocks
if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()
while fp.tell() < end:
results.append(fp.readline())
return results
if __name__ == '__main__':
classobj.read_csv_rows(sys.argv[1])
Here is a test:
$ python csv_utils.py "input.csv"
Reading in async mode
FINISHED IN 3.75 sec
$ python csv_utils.py "input.csv"
Reading in sync mode
FINISHED IN 0.96 sec
Question is :
Why Async mode is taking longer?
NOTE: Removed unnecessary functions/lines to avoid complexity in the code
Is this correct behaviour?
Yes - it may not be what you expect, but it is consistent with the way you implemented it and how multiprocessing
works.
Why Async mode is taking longer?
The way your example works is perhaps best illustrated by a parable - bear with me please:
Let's say you ask your friend to engage in an experiment. You want him to go through a book and mark each page with a pen, as fast as he can. There are two rounds with a distinct setup, and you are going to time each round and then compare which one was faster:
open the book on the first page, mark it, then flip the page and mark the following pages as they come up. Pure sequential processing.
process the book in chunks. For this he should run through the book's pages chunk by chunk. That is he should first make a list of page numbers as starting points, say 1, 10, 20, 30, 40, etc. Then for each chunk, he should close the book, open it on the page for the starting point, process all pages before the next starting point comes up, close the book, then start all over again for the next chunk.
Which of these approaches will be faster?
Am I doing something wrong?
You decide both approaches take too long. What you really want to do is ask multiple people (processes) to do the marking in parallel. Now with a book (as with a file) that's difficult because, well, only one person (process) can access the book (file) at any one point. Still it can be done if the order of processing doesn't matter and it is the marking itself - not the accessing - that should run in parallel. So the new approach is like this:
This approach will most certainly speed up the whole process. Perhaps surprisingly though the speed up will be less than a factor of 10 because step 1 takes some time, and only one person can do it. That's called Amdahl's law [wikipedia]:
Essentially what it means is that the (theoretical) speed-up of any process can only be as fast as the parallel processing part p is reduced in speed in relation to the part's sequential processing time (p/s).
Intuitively, the speed-up can only come from the part of the task that is processed in parallel, all the sequential parts are not affected and take the same amount of time, whether p is processed in parallel or not.
That said, in our example, obviously the speed-up can only come from step 2 (marking pages in parallel by multiple people), as step 1 (tearing up the book) is clearly sequential.
develop an application which should be able to process huge CSV files
Here's how to approach this:
Something like this:
def process(rows):
# do all the processing
...
return result
if __name__ == '__main__':
pool = mp.Pool(N) # N > 1
chunks = get_chunks(...)
for rows in chunks:
result += pool.apply_async(process, rows)
pool.close()
pool.join()
I'm not defining get_chunks
here because there are several documented approaches to doing this e.g. here or here.
Conclusion
Depending on the kind of processing required for each file, it may well be that the sequential approach to processing any one file is the fastest possible approach, simply because the processing parts don't gain much from being done in parallel. You may still end up processing it chunk by chunk due to e.g. memory constraints. If that is the case, you probably don't need multiprocessing.
If you have multiple files that can be processed in parallel, multiprocessing is a very good approach. It works the same way as shown above, where the chunks are not rows but filenames.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With