Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python, process a large text file in parallel

Samples records in the data file (SAM file):

M01383  0  chr4  66439384  255  31M  *  0  0  AAGAGGA GFAFHGD  MD:Z:31 NM:i:0
M01382  0  chr1  241995435  255 31M  *  0  0  ATCCAAG AFHTTAG  MD:Z:31 NM:i:0
......
  • The data files are line-by-line based
  • The size of the data files are varies from 1G - 5G.

I need to go through the record in the data file line by line, get a particular value (e.g. 4th value, 66439384) from each line, and pass this value to another function for processing. Then some results counter will be updated.

the basic workflow is like this:

# global variable, counters will be updated in search function according to the value passed. 
counter_a = 0    
counter_b = 0
counter_c = 0

open textfile:
    for line in textfile:
        value = line.split()[3]
        search_function(value)    # this function takes abit long time to process

def search_function (value):
    some conditions checking:
        update the counter_a or counter_b or counter_c

With single process code and about 1.5G data file, it took about 20 hours to run through all the records in one data file. I need much faster code because there are more than 30 of this kind data file.

I was thinking to process the data file in N chunks in parallel, and each chunk will perform above workflow and update the global variable (counter_a, counter_b, counter_c) simultaneously. But I don't know how to achieve this in code, or wether this will work.

I have access to a server machine with: 24 processors and around 40G RAM.

Anyone could help with this? Thanks very much.

like image 771
Xiangwu Avatar asked Feb 21 '15 01:02

Xiangwu


People also ask

Does Python allow parallel processing?

Multiprocessing in Python enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. Multiprocessing enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. This parallelization leads to significant speedup in tasks that involve a lot of computation.

How do you process large files in Python?

Reading Large Text Files in Python We can use the file object as an iterator. The iterator will return each line one by one, which can be processed. This will not read the whole file into memory and it's suitable to read large files in Python.

How do you perform a parallel execution in Python?

Pool class can be used for parallel execution of a function for different input data. The multiprocessing. Pool() class spawns a set of processes called workers and can submit tasks using the methods apply/apply_async and map/map_async . For parallel mapping, you should first initialize a multiprocessing.

What is parallel processing in vision?

In parallel processing, we take in multiple different forms of information at the same time. This is especially important in vision. For example, when you see a bus coming towards you, you see its color, shape, depth, and motion all at once. If you had to assess those things one at a time, it would take far too long.


2 Answers

The simplest approach would probably be to do all 30 files at once with your existing code -- would still take all day, but you'd have all the files done at once. (ie, "9 babies in 9 months" is easy, "1 baby in 1 month" is hard)

If you really want to get a single file done faster, it will depend on how your counters actually update. If almost all the work is just in analysing value you can offload that using the multiprocessing module:

import time
import multiprocessing

def slowfunc(value):
    time.sleep(0.01)
    return value**2 + 0.3*value + 1

counter_a = counter_b = counter_c = 0
def add_to_counter(res):
    global counter_a, counter_b, counter_c
    counter_a += res
    counter_b -= (res - 10)**2
    counter_c += (int(res) % 2)

pool = multiprocessing.Pool(50)
results = []

for value in range(100000):
    r = pool.apply_async(slowfunc, [value])
    results.append(r)

    # don't let the queue grow too long
    if len(results) == 1000:
        results[0].wait()

    while results and results[0].ready():
        r = results.pop(0)
        add_to_counter(r.get())

for r in results:
    r.wait()
    add_to_counter(r.get())

print counter_a, counter_b, counter_c

That will allow 50 slowfuncs to run in parallel, so instead of taking 1000s (=100k*0.01s), it takes 20s (100k/50)*0.01s to complete. If you can restructure your function into "slowfunc" and "add_to_counter" like the above, you should be able to get a factor of 24 speedup.

like image 178
Anthony Towns Avatar answered Oct 09 '22 04:10

Anthony Towns


Read one file at a time, use all CPUs to run search_function():

#!/usr/bin/env python
from multiprocessing import Array, Pool

def init(counters_): # called for each child process
    global counters
    counters = counters_

def search_function (value): # assume it is CPU-intensive task
    some conditions checking:
        update the counter_a or counter_b or counter_c
        counter[0] += 1 # counter 'a'
        counter[1] += 1 # counter 'b'
    return value, result, error

if __name__ == '__main__':
    counters = Array('i', [0]*3)
    pool = Pool(initializer=init, initargs=[counters])
    values = (line.split()[3] for line in textfile)
    for value, result, error in pool.imap_unordered(search_function, values,
                                                    chunksize=1000):
        if error is not None:
            print('value: {value}, error: {error}'.format(**vars()))
    pool.close()
    pool.join()
    print(list(counters))

Make sure (for example, by writing wrappers) that exceptions do not escape next(values), search_function().

like image 29
jfs Avatar answered Oct 09 '22 03:10

jfs