Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fastest way to process a large file?

I have multiple 3 GB tab delimited files. There are 20 million rows in each file. All the rows have to be independently processed, no relation between any two rows. My question is, what will be faster?

  1. Reading line-by-line?

    with open() as infile:     for line in infile: 
  2. Reading the file into memory in chunks and processing it, say 250 MB at a time?

The processing is not very complicated, I am just grabbing value in column1 to List1, column2 to List2 etc. Might need to add some column values together.

I am using python 2.7 on a linux box that has 30GB of memory. ASCII Text.

Any way to speed things up in parallel? Right now I am using the former method and the process is very slow. Is using any CSVReader module going to help? I don't have to do it in python, any other language or database use ideas are welcome.

like image 888
Reise45 Avatar asked May 18 '15 02:05

Reise45


People also ask

How do you process large files?

Process Large File In Chunks (BufferdInputStream) We will use BufferedInputStream stream with the same size buffer as we used for FileChannels, and analyse the results. Next is an example of Reading and Writing Large Files in Chunks using Java BufferedInputStream. And, the performance we see is similar to the Scanner.

How does Python process large files?

Reading Large Text Files in Python We can use the file object as an iterator. The iterator will return each line one by one, which can be processed. This will not read the whole file into memory and it's suitable to read large files in Python.

How can I open a text file larger than 1gb?

Solution 1: Download a Dedicated Large File Viewer If all you need to do is read the large file, you can download a dedicated large file viewer such as the Large Text File Viewer. Such tools will open large text files with ease.


2 Answers

It sounds like your code is I/O bound. This means that multiprocessing isn't going to help—if you spend 90% of your time reading from disk, having an extra 7 processes waiting on the next read isn't going to help anything.

And, while using a CSV reading module (whether the stdlib's csv or something like NumPy or Pandas) may be a good idea for simplicity, it's unlikely to make much difference in performance.

Still, it's worth checking that you really are I/O bound, instead of just guessing. Run your program and see whether your CPU usage is close to 0% or close to 100% or a core. Do what Amadan suggested in a comment, and run your program with just pass for the processing and see whether that cuts off 5% of the time or 70%. You may even want to try comparing with a loop over os.open and os.read(1024*1024) or something and see if that's any faster.


Since your using Python 2.x, Python is relying on the C stdio library to guess how much to buffer at a time, so it might be worth forcing it to buffer more. The simplest way to do that is to use readlines(bufsize) for some large bufsize. (You can try different numbers and measure them to see where the peak is. In my experience, usually anything from 64K-8MB is about the same, but depending on your system that may be different—especially if you're, e.g., reading off a network filesystem with great throughput but horrible latency that swamps the throughput-vs.-latency of the actual physical drive and the caching the OS does.)

So, for example:

bufsize = 65536 with open(path) as infile:      while True:         lines = infile.readlines(bufsize)         if not lines:             break         for line in lines:             process(line) 

Meanwhile, assuming you're on a 64-bit system, you may want to try using mmap instead of reading the file in the first place. This certainly isn't guaranteed to be better, but it may be better, depending on your system. For example:

with open(path) as infile:     m = mmap.mmap(infile, 0, access=mmap.ACCESS_READ) 

A Python mmap is sort of a weird object—it acts like a str and like a file at the same time, so you can, e.g., manually iterate scanning for newlines, or you can call readline on it as if it were a file. Both of those will take more processing from Python than iterating the file as lines or doing batch readlines (because a loop that would be in C is now in pure Python… although maybe you can get around that with re, or with a simple Cython extension?)… but the I/O advantage of the OS knowing what you're doing with the mapping may swamp the CPU disadvantage.

Unfortunately, Python doesn't expose the madvise call that you'd use to tweak things in an attempt to optimize this in C (e.g., explicitly setting MADV_SEQUENTIAL instead of making the kernel guess, or forcing transparent huge pages)—but you can actually ctypes the function out of libc.

like image 93
abarnert Avatar answered Oct 18 '22 20:10

abarnert


I know this question is old; but I wanted to do a similar thing, I created a simple framework which helps you read and process a large file in parallel. Leaving what I tried as an answer.

This is the code, I give an example in the end

def chunkify_file(fname, size=1024*1024*1000, skiplines=-1):     """     function to divide a large text file into chunks each having size ~= size so that the chunks are line aligned      Params :          fname : path to the file to be chunked         size : size of each chink is ~> this         skiplines : number of lines in the begining to skip, -1 means don't skip any lines     Returns :          start and end position of chunks in Bytes     """     chunks = []     fileEnd = os.path.getsize(fname)     with open(fname, "rb") as f:         if(skiplines > 0):             for i in range(skiplines):                 f.readline()          chunkEnd = f.tell()         count = 0         while True:             chunkStart = chunkEnd             f.seek(f.tell() + size, os.SEEK_SET)             f.readline()  # make this chunk line aligned             chunkEnd = f.tell()             chunks.append((chunkStart, chunkEnd - chunkStart, fname))             count+=1              if chunkEnd > fileEnd:                 break     return chunks  def parallel_apply_line_by_line_chunk(chunk_data):     """     function to apply a function to each line in a chunk      Params :         chunk_data : the data for this chunk      Returns :         list of the non-None results for this chunk     """     chunk_start, chunk_size, file_path, func_apply = chunk_data[:4]     func_args = chunk_data[4:]      t1 = time.time()     chunk_res = []     with open(file_path, "rb") as f:         f.seek(chunk_start)         cont = f.read(chunk_size).decode(encoding='utf-8')         lines = cont.splitlines()          for i,line in enumerate(lines):             ret = func_apply(line, *func_args)             if(ret != None):                 chunk_res.append(ret)     return chunk_res  def parallel_apply_line_by_line(input_file_path, chunk_size_factor, num_procs, skiplines, func_apply, func_args, fout=None):     """     function to apply a supplied function line by line in parallel      Params :         input_file_path : path to input file         chunk_size_factor : size of 1 chunk in MB         num_procs : number of parallel processes to spawn, max used is num of available cores - 1         skiplines : number of top lines to skip while processing         func_apply : a function which expects a line and outputs None for lines we don't want processed         func_args : arguments to function func_apply         fout : do we want to output the processed lines to a file     Returns :         list of the non-None results obtained be processing each line     """     num_parallel = min(num_procs, psutil.cpu_count()) - 1      jobs = chunkify_file(input_file_path, 1024 * 1024 * chunk_size_factor, skiplines)      jobs = [list(x) + [func_apply] + func_args for x in jobs]      print("Starting the parallel pool for {} jobs ".format(len(jobs)))      lines_counter = 0      pool = mp.Pool(num_parallel, maxtasksperchild=1000)  # maxtaskperchild - if not supplied some weird happend and memory blows as the processes keep on lingering      outputs = []     for i in range(0, len(jobs), num_parallel):         print("Chunk start = ", i)         t1 = time.time()         chunk_outputs = pool.map(parallel_apply_line_by_line_chunk, jobs[i : i + num_parallel])          for i, subl in enumerate(chunk_outputs):             for x in subl:                 if(fout != None):                     print(x, file=fout)                 else:                     outputs.append(x)                 lines_counter += 1         del(chunk_outputs)         gc.collect()         print("All Done in time ", time.time() - t1)      print("Total lines we have = {}".format(lines_counter))      pool.close()     pool.terminate()     return outputs 

Say for example, I have a file in which I want to count the number of words in each line, then the processing of each line would look like

def count_words_line(line):     return len(line.strip().split()) 

and then call the function like:

parallel_apply_line_by_line(input_file_path, 100, 8, 0, count_words_line, [], fout=None) 

Using this, I get a speed up of ~8 times as compared to vanilla line by line reading on a sample file of size ~20GB in which I do some moderately complicated processing on each line.

like image 27
Deepak Saini Avatar answered Oct 18 '22 20:10

Deepak Saini