Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading CSV with multiprocessing pool is taking longer than CSV reader

From one of our client's requirement, I have to develop an application which should be able to process huge CSV files. File size could be in the range of 10 MB - 2GB in size.

Depending on size, module decides whether to read the file using Multiprocessing pool or by using normal CSV reader. But from observation, multi processing taking longer time than normal CSV reading when tested both the modes for a file with size of 100 MB.

Is this correct behaviour? OR Am I doing something wrong?

Here is my code:

def set_file_processing_mode(self, fpath):
   """ """
   fsize = self.get_file_size(fpath)
   if fsize > FILE_SIZE_200MB:
      self.read_in_async_mode = True
   else:
      self.read_in_async_mode = False

def read_line_by_line(self, filepath):
    """Reads CSV line by line"""
    with open(filepath, 'rb') as csvin:
        csvin = csv.reader(csvin, delimiter=',')
        for row in iter(csvin):
          yield row

def read_huge_file(self, filepath):
    """Read file in chunks"""
    pool = mp.Pool(1)
    for chunk_number in range(self.chunks): #self.chunks = 20
        proc = pool.apply_async(read_chunk_by_chunk, 
                        args=[filepath, self.chunks, chunk_number])
        reader = proc.get()
        yield reader
    pool.close()
    pool.join()

def iterate_chunks(self, filepath):
    """Read huge file rows"""
    for chunklist in self.read_huge_file(filepath):
        for row in chunklist:
            yield row
@timeit #-- custom decorator
def read_csv_rows(self, filepath):
    """Read CSV rows and pass it to processing"""
    if self.read_in_async_mode:
        print("Reading in async mode")
        for row in self.iterate_chunks(filepath):
            self.process(row)
    else:
        print("Reading in sync mode")
        for row in self.read_line_by_line(filepath):
            self.process(row)

def process(self, formatted_row):
    """Just prints the line"""
    self.log(formatted_row)

def read_chunk_by_chunk(filename, number_of_blocks, block):
  '''
  A generator that splits a file into blocks and iterates
  over the lines of one of the blocks.
  '''
  results = []
  assert 0 <= block and block < number_of_blocks
  assert 0 < number_of_blocks
  with open(filename) as fp :
    fp.seek(0,2)
    file_size = fp.tell()
    ini = file_size * block / number_of_blocks
    end = file_size * (1 + block) / number_of_blocks
    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()
    while fp.tell() < end:
        results.append(fp.readline())
  return results

if __name__ == '__main__':
    classobj.read_csv_rows(sys.argv[1])    

Here is a test:

$ python csv_utils.py "input.csv"
Reading in async mode
FINISHED  IN 3.75 sec
$ python csv_utils.py "input.csv"
Reading in sync mode
FINISHED  IN 0.96 sec

Question is :

Why Async mode is taking longer?

NOTE: Removed unnecessary functions/lines to avoid complexity in the code

like image 338
Laxmikant Avatar asked Feb 06 '23 03:02

Laxmikant


1 Answers

Is this correct behaviour?

Yes - it may not be what you expect, but it is consistent with the way you implemented it and how multiprocessing works.

Why Async mode is taking longer?

The way your example works is perhaps best illustrated by a parable - bear with me please:

Let's say you ask your friend to engage in an experiment. You want him to go through a book and mark each page with a pen, as fast as he can. There are two rounds with a distinct setup, and you are going to time each round and then compare which one was faster:

  1. open the book on the first page, mark it, then flip the page and mark the following pages as they come up. Pure sequential processing.

  2. process the book in chunks. For this he should run through the book's pages chunk by chunk. That is he should first make a list of page numbers as starting points, say 1, 10, 20, 30, 40, etc. Then for each chunk, he should close the book, open it on the page for the starting point, process all pages before the next starting point comes up, close the book, then start all over again for the next chunk.

Which of these approaches will be faster?

Am I doing something wrong?

You decide both approaches take too long. What you really want to do is ask multiple people (processes) to do the marking in parallel. Now with a book (as with a file) that's difficult because, well, only one person (process) can access the book (file) at any one point. Still it can be done if the order of processing doesn't matter and it is the marking itself - not the accessing - that should run in parallel. So the new approach is like this:

  1. cut the pages out of the book and sort them into say 10 stacks
  2. ask ten people to mark one stack each

This approach will most certainly speed up the whole process. Perhaps surprisingly though the speed up will be less than a factor of 10 because step 1 takes some time, and only one person can do it. That's called Amdahl's law [wikipedia]:

$$ S_\text{latency}(s) = \frac{1}{(1 - p) + \frac{p}{s}}

Essentially what it means is that the (theoretical) speed-up of any process can only be as fast as the parallel processing part p is reduced in speed in relation to the part's sequential processing time (p/s).

Intuitively, the speed-up can only come from the part of the task that is processed in parallel, all the sequential parts are not affected and take the same amount of time, whether p is processed in parallel or not.

That said, in our example, obviously the speed-up can only come from step 2 (marking pages in parallel by multiple people), as step 1 (tearing up the book) is clearly sequential.

develop an application which should be able to process huge CSV files

Here's how to approach this:

  1. determine what part of the processing can be done in parallel, i.e. process each chunk sepearately and out of sequence
  2. read the file sequentially, splitting it up into chunks as you go
  3. use multiprocessing to run multiple processing steps in parallel

Something like this:

def process(rows):
    # do all the processing
    ...
    return result

if __name__ == '__main__':
    pool = mp.Pool(N) # N > 1
    chunks = get_chunks(...)
    for rows in chunks:
       result += pool.apply_async(process, rows)
    pool.close()
    pool.join() 

I'm not defining get_chunks here because there are several documented approaches to doing this e.g. here or here.

Conclusion

Depending on the kind of processing required for each file, it may well be that the sequential approach to processing any one file is the fastest possible approach, simply because the processing parts don't gain much from being done in parallel. You may still end up processing it chunk by chunk due to e.g. memory constraints. If that is the case, you probably don't need multiprocessing.

If you have multiple files that can be processed in parallel, multiprocessing is a very good approach. It works the same way as shown above, where the chunks are not rows but filenames.

like image 171
miraculixx Avatar answered May 13 '23 14:05

miraculixx