Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why are the parallel tasks always slow at the first time?

I have some classifiers which I want to evaluate on the one sample. This task can be ran in parallel since they are independent of each other. This means that I want to parallelize it.

I tried it with python and also as a bash script. The problem is that when I run it the program for the first time, it takes like 30s-40s to finish. When I run the program multiple times consecutively, it takes just 1s-3s to finish. Even If I fed classifiers with different input I got different result so it seems that there is no caching. When I run some other program and afterwards rerun the program then it again takes 40s to finish.

I also observed in htop that CPUs are not that much utilized when the program is run for the first time but then when I rerun it again and again the CPUs are fully utilized.

Can someone please explain me this strange behaviour? How can I avoid it so that even the first run of the program will be fast?

Here is the python code:

import time
import os
from fastText import load_model
from joblib import delayed, Parallel, cpu_count
import json

os.system("taskset -p 0xff %d" % os.getpid())

def format_duration(start_time, end_time):
    m, s = divmod(end_time - start_time, 60)
    h, m = divmod(m, 60)
    return "%d:%02d:%02d" % (h, m, s)

def classify(x, classifier_name, path):
    f = load_model(path + os.path.sep + classifier_name)    
    labels, probabilities = f.predict(x, 2)
    if labels[0] == '__label__True':
        return classifier_name
    else:
        return None

if __name__ == '__main__':
    with open('classifier_names.json') as json_data:
        classifiers = json.load(json_data)
    x = "input_text"

    Parallel(n_jobs=cpu_count(), verbose=100, backend='multiprocessing', pre_dispatch='all') \
        (delayed(perform_binary_classification)
         (x, classifier, 'clfs/') for
         classifier in classifiers)

    end_time = time.time()
    print(format_duration(start_time, end_time))

Here is the bash code:

#!/usr/bin/env bash
N=4
START_TIME=$SECONDS
open_sem(){
    mkfifo pipe-$$
    exec 3<>pipe-$$
    rm pipe-$$
    local i=$1
    for((;i>0;i--)); do
        printf %s 000 >&3
    done
}
run_with_lock(){
    local x
    read -u 3 -n 3 x && ((0==x)) || exit $x
    (
    "$@" 
    printf '%.3d' $? >&3
    )&
}
open_sem $N
for d in classifiers/* ; do
    run_with_lock ~/fastText/fasttext predict "$d" test.txt 
done

ELAPSED_TIME=$(($SECONDS - $START_TIME))
echo time taken $ELAPSED_TIME seconds

EDITED

The bigger picture is that I am running flask app with 2 API methods. Each of them calls the function that parallelize the classification. When I am doing requests, it behaves the same way like this program below. First request to method A takes a lot and then subsequent requests take like 1s. When I switch to method B it is the same behavior as with method A. If I switch between method A and method B several times like A,B,A,B then each request takes like 40s to finish.

like image 947
mark Avatar asked Nov 08 '22 12:11

mark


1 Answers

One approach is to modify your python code to use an event loop, stay running all the time, and execute new jobs in parallel whenever new jobs are detected. One way to do this is is to have a job directory, and place a file in that directory whenever there is a new job todo. The python script should also move completed jobs out of that directory to prevent running them more than once. How to run an function when anything changes in a dir with Python Watchdog?

Another option is to use a fifo file which is piped to the python script, and add new lines to that file for new jobs. https://www.linuxjournal.com/content/using-named-pipes-fifos-bash

I personally dislike parallelizing in python, and prefer to parallelize in bash using GNU parallel. To do it this way, I would

  • implement the event loop and jobs directory or the fifo file job queue using bash and GNU parallel
  • modify the python script to remove all the parallel code
  • read each jobspec from stdin
  • process each one serially in a loop
  • pipe jobs to parallel, which pipes them to ncpu python processes, which each runs forever waiting for the next job from stdin

e.g., something like:

run_jobs.sh:
mkfifo jobs
cat jobs | parallel --pipe --round-robin -n1 ~/fastText/fasttext

queue_jobs.sh:
echo jobspec >> jobs

.py:
for jobspec in sys.stdin:
    ...

This has the disadvantage that all ncpu python processes may have the slow startup problem, but they can stay running indefinitely, so the problem becomes insignificant, and the code is much simpler and easier to debug and maintain.

Using a jobs directory and a file for each jobspec instead of a fifo jobs queue requires slightly more code, but it also makes it more straightforward to see which jobs are queued and which jobs are done.

like image 95
webb Avatar answered Nov 14 '22 20:11

webb