Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does this python multiprocessing script slow down after a while?

Building on the script from this answer, I have the following scenario: A folder containing 2500 large text files (~ 55Mb each), all tab delimited. Web logs, basically.

I need to md5 hash the second 'column' in each row of each file, saving the modified files elsewhere. The source files are on a mechanical disk and the destination files are on an SSD.

The script processes the first 25 (or so) files really quickly. It then slows WAY down. Based on the first 25 files, it should complete all of them in 2 minutes or so. However, based on the performance after that, it will take 15 minutes (or so) to complete them all.

It's running on a server with 32 Gb of RAM and task manager rarely shows over 6 Gb in use. I have it set to launch 6 processes, but the CPU usage on the cores is low, rarely going over 15%.

Why is this slowing down? Read/write issues to the disk? Garbage collector? Bad code? Any ideas about how to speed it up?

Here's the script

import os

import multiprocessing
from multiprocessing import Process
import threading
import hashlib

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, fileset, filedirectory):
        threading.Thread.__init__(self)
        self.files_to_process = fileset
        self.filedir          = filedirectory

    def run(self):
        for current_file in self.files_to_process:

            # Open the current file as read only
            active_file_name = self.filedir + "/" + current_file
            output_file_name = "D:/hashed_data/" + "hashed_" + current_file

            active_file = open(active_file_name, "r")
            output_file = open(output_file_name, "ab+")

            for line in active_file:
                # Load the line, hash the username, save the line
                lineList = line.split("\t")

                if not lineList[1] == "-":
                    lineList[1] = hashlib.md5(lineList[1]).hexdigest()

                lineOut = '\t'.join(lineList)
                output_file.write(lineOut)

            # Always close files after you open them
            active_file.close()
            output_file.close()

            print "\nCompleted " + current_file

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads, fileset, filedirectory):
        mythreads = []
        for tid in range(numThreads):
            th = ThreadRunner(fileset, filedirectory)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads, filedirectory):
        myprocs = []
        prunner = ProcessRunner()

        # Store the file names from that directory in a list that we can iterate
        file_names = os.listdir(filedirectory)

        file_sets = []
        for i in range(numProcesses):
            file_sets.append([])

        for index, name in enumerate(file_names):
            num = index % numProcesses
            file_sets[num].append(name)


        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads, file_sets[pid], filedirectory)) 
            myprocs.append(pr) 
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()

if __name__ == '__main__':    

    file_directory = "E:/original_data"

    processes = 6
    threads   = 1

    extractor = ParallelExtractor()
    extractor.runInParallel(numProcesses=processes, numThreads=threads, filedirectory=file_directory)
like image 423
Clay Avatar asked Dec 04 '13 18:12

Clay


2 Answers

Hashing is a relatively simple task, and modern CPUs are very fast, compared to the speed of spinning disks. A quick-and-dirty benchmark on a i7 shows that it can hash about 450 MB/s using MD5, or 290 MB/s using SHA-1. Comparatively, spinning disk have a typical (sequencial raw read) speed of about 70-150 MB/s. This means that, even ignoring the overhead of the filesystem and eventual disk seeks, the CPU can hash a file about 3x faster than the disk can read it.

The performance boost you get on processing the first files probably happens because the first files are cached in memory by the operating system, so no disk I/O happens. This can be confirmed by either:

  • rebooting the server, thus flushing the cache
  • filling the cache with something else, by reading enough large files from the disk
  • listening closely for the absence of disk seeks while processing the first files

Now, since the performance bottleneck for hashing files is the disk, performing the hashing in multiple processes or threads is useless, because they'll all use the same disk. As @Max Noel mentioned, it can actually lower performance, because you'll be reading several files in parallel, so your disk will have to seek between the files. The performance will also vary depending on the I/O scheduler of the operating system you're using, as he mentioned.

Now, if you're still generating data, you have some possible solutions:

  • Use a faster disk, or a SSD, as @Max Noel suggested.
  • Read from multiple disks - either in different filesystems or in a single filesystem over RAID
  • Split the task over multiple machines (with a single or multiple disks each)

These solutions, however, are useless if all you want to do is hash those 2500 files and you already have them on a single disk. Reading them from the disk to other disks and then performing the hashing is slower, since you'll be reading the files twice, and you can hash as fast as you can read them.

Finally, based on @yaccz 's idea, I guess you could have avoided the trouble of writing a program to perform the hashing if you had installed cygwin binaries of find, xargs and md5sum.

like image 173
loopbackbee Avatar answered Oct 13 '22 19:10

loopbackbee


Why do things simple when one can make them complicated?

mount the drives via smbfs or whatnot on linux host and run

#! /bin/sh

SRC="" # FIXME
DST="" # FIXME

convert_line() {
    new_line=`echo $i | cut -f 1 -d "\t"`
    f2=`echo $i | cut -f 2 -d "\t"`
    frest=`echo $i | cut -f 1,2 --complement -d "\t"`

    if [ ! "x${f2}" = "-" ] ; then
        f2=`echo "${f2}" | md5sum | head -c-1`
        # might wanna throw in some memoization
    fi

    echo "${new_line}\t$f2\t${frest}"
}

convert_file() {
    for i in `cat $1`; do
        convert_line "${i}" >> $DST/hashed-$1
    done
}

for i in $SRC/*; do
    convert_file $i
done

not tested. might need polishing some rough edges.

like image 27
Jan Matějka Avatar answered Oct 13 '22 20:10

Jan Matějka