Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Trying to parallelize a python algorithm using multithreading and avoiding GIL restrictions


I am implementing an algorithm in Python using Biopython. I have several alignments (sets of sequences of equal length) stored in FASTA files. Each alignment contains between 500 and 30000 seqs and each sequence is about 17000 elements long. Each sequence is stored as a Bio.SeqRecord.SeqRecord object (check the SeqRecord object's API documentation for more information) which holds not only the sequence but also some information about it. I read it from disk using Bio.AlignIO.read() (check the AlignIO module's API documentation for more information) which returns a MultipleSeqAlignment object:

seqs = AlignIO.read(seqs_filename, 'fasta')
len_seqs = seqs.get_alignment_length()
stats = {'-': [0.0] * len_seqs, 'A': [0.0] * len_seqs,
         'G': [0.0] * len_seqs, 'C': [0.0] * len_seqs,
         'T': [0.0] * len_seqs}


I include this sketch for clarity purposes: enter image description here


Because I want to paralelize the analysis of the alignment, I asign to each available cpu a fragment of it using the threading module (more details about why I made this decision later):

num_cpus = cpu_count()
num_columns = ceil(len_seqs / float(num_cpus))
start_column = 0
threads = []
for cpu in range(0, num_cpus):
    section = (start_column, start_column + num_columns)
    threads.append(CI_Thread(seqs_type, seqs, section, stats))
    threads[cpu].start()

    start_column += num_columns

The stats variable is a shared one in which I store some information about the analysis (as you can see in the first code snippet). Because each cpu is going to modify different positions of this shared variable I think there isn't any need of access control nor any synchronization primitive. The main reason I am using threads instead of processes it's because I want to avoid copying the whole MultipleSeqAlignment object for each cpu. I've done some research and found some stackoverflow posts about this.

I've also read some information about the "dreaded" Python Global Interpreter Lock (GIL) (I found great info both at stackoverflow and programmers at stack exchange) but I'm still not 100% sure if my algorithm is affected by it. As far as I know I'm loading sequences into memory one at a time, thus doing IO on every iteration. This is why I think it's a good idea to use threads as was stated in this stackoverflow post I've already mentioned before. The basic structure of the analysis (which each thread is executing) looks something like this:

for seq in seqs:
    num_column = start_column
    for column in seq.seq[start_column:end_column].upper():
        # [...]
            try:
                stats[elem][num_column] = get_some_info(column)
            except TypeError:
                raise TypeError(('"stats" argument should be a '
                                 'dict of lists of int'))

I have done some performance tests using the timeit module and the time command using the arguments -f "%e %M" to check not only the elapsed real time (in seconds) but also the maximum resident set size of the process during its lifetime (in Kbytes). It seems that the execution time using threads is the execution time of the sequential implementation divided by the number of threads. For the maximum resident set size I'm still unable to find a pattern.


If you have any other suggestions about performance or clarity I would much appreciate them.


Thanks in advance.

like image 441
Francisco Merino Avatar asked Dec 29 '14 13:12

Francisco Merino


1 Answers

Threading won't help you when you want to run a presumably CPU-intensive algorithm over some data. Try looking into the multiprocessing module, which I had great success with when working on a project that did special OCR in a 100 MB scanned image.

Consider this for solving the shared memory issue:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

Take a look at pool.imap() and pool.apply_async().

Hope that helped.

like image 72
whiterock Avatar answered Nov 04 '22 21:11

whiterock