Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java: Parallelizing quick sort via multi-threading

I am experimenting with parallelizing algorithms in Java. I began with merge sort, and posted my attempt in this question. My revised attempt is in the code below, where I now try to parallelize quick sort.

Are there any rookie mistakes in my multi-threaded implementation or approach to this problem? If not, shouldn't I expect more than a 32% speed increase between a sequential and a parallelized algorithm on a duel-core (see timings at bottom)?

Here is the multithreading algorithm:

    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

        CountDownLatch doneSignal;
        static int num_threads = 1;

        int[] my_array;
        int start, end;

        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }

        public static void reset() {
            num_threads = 1;
        }

        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }

        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;

            if (len <= 1)
                return;

            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];

            swap(array, pivot_index, end);

            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }

            swap(array, storeIndex, end);

            if (num_threads < MAX_THREADS) {
                num_threads++;

                CountDownLatch completionSignal = new CountDownLatch(1);

                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);

                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }

Here is how I start it off:

ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}

I tested this against Arrays.sort and a similar sequential quick sort algorithm. Here are the timing results on an intel duel-core dell laptop, in seconds:

Elements: 500,000, sequential: 0.068592, threaded: 0.046871, Arrays.sort: 0.079677

Elements: 1,000,000, sequential: 0.14416, threaded: 0.095492, Arrays.sort: 0.167155

Elements: 2,000,000, sequential: 0.301666, threaded: 0.205719, Arrays.sort: 0.350982

Elements: 4,000,000, sequential: 0.623291, threaded: 0.424119, Arrays.sort: 0.712698

Elements: 8,000,000, sequential: 1.279374, threaded: 0.859363, Arrays.sort: 1.487671

Each number above is the average time of 100 tests, throwing out the 3 lowest and 3 highest cases. I used Random.nextInt(Integer.MAX_VALUE) to generate an array for each test, which was initialized once every 10 tests with the same seed. Each test consisted of timing the given algorithm with System.nanoTime. I rounded to six decimal places after averaging. And obviously, I did check to see if each sort worked.

As you can see, there is about a 32% increase in speed between the sequential and threaded cases in every set of tests. As I asked above, shouldn't I expect more than that?

like image 253
Robz Avatar asked Aug 06 '10 15:08

Robz


2 Answers

Making numThreads static can cause problems, it is highly likely that you will end up with more than MAX_THREADS running at some point.

Probably the reason why you don't get a full double up in performance is that your quick sort can not be fully parallelised. Note that the first call to quicksort will do a pass through the whole array in the initial thread before it starts to really run in parallel. There is also an overhead in parallelising an algorithm in the form of context switching and mode transitions when farming off to separate threads.

Have a look at the Fork/Join framework, this problem would probably fit quite neatly there.

A couple of points on the implementation. Implement Runnable rather than extending Thread. Extending a Thread should be used only when you create some new version of Thread class. When you just want to do some job to be run in parallel you are better off with Runnable. While iplementing a Runnable you can also still extend another class which gives you more flexibility in OO design. Use a thread pool that is restricted to the number of threads you have available in the system. Also don't use numThreads to make the decision on whether to fork off a new thread or not. You can calculate this up front. Use a minimum partition size which is the size of the total array divided by the number of processors available. Something like:

public class ThreadedQuick implements Runnable {

    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);

    final int[] my_array;
    final int start, end;

    private final int minParitionSize;

    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) {
        this.minParitionSize = minParitionSize;
        this.my_array = array;
        this.start = start;
        this.end = end;
    }

    public void run() {
        quicksort(my_array, start, end);
    }

    public void quicksort(int[] array, int start, int end) {
        int len = end - start + 1;

        if (len <= 1)
            return;

        int pivot_index = medianOfThree(array, start, end);
        int pivotValue = array[pivot_index];

        swap(array, pivot_index, end);

        int storeIndex = start;
        for (int i = start; i < end; i++) {
            if (array[i] <= pivotValue) {
                swap(array, i, storeIndex);
                storeIndex++;
            }
        }

        swap(array, storeIndex, end);

        if (len > minParitionSize) {

            ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1);
            Future<?> future = executor.submit(quick);
            quicksort(array, storeIndex + 1, end);

            try {
                future.get(1000, TimeUnit.SECONDS);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } else {
            quicksort(array, start, storeIndex - 1);
            quicksort(array, storeIndex + 1, end);
        }
    }    
}

You can kick it off by doing:

ThreadedQuick quick = new ThreadedQuick(array / ThreadedQuick.MAX_THREADS, array, 0, array.length - 1);
quick.run();

This will start the sort in the same thread, which avoids an unnecessary thread hop at start up.

Caveat: Not sure the above implementation will actually be faster as I haven't benchmarked it.

like image 165
Michael Barker Avatar answered Sep 28 '22 07:09

Michael Barker


This uses a combination of quick sort and merge sort.

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ParallelSortMain {
    public static void main(String... args) throws InterruptedException {
        Random rand = new Random();
        final int[] values = new int[100*1024*1024];
        for (int i = 0; i < values.length; i++)
            values[i] = rand.nextInt();

        int threads = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(threads);
        int blockSize = (values.length + threads - 1) / threads;
        for (int i = 0; i < values.length; i += blockSize) {
            final int min = i;
            final int max = Math.min(min + blockSize, values.length);
            es.submit(new Runnable() {
                @Override
                public void run() {
                    Arrays.sort(values, min, max);
                }
            });
        }
        es.shutdown();
        es.awaitTermination(10, TimeUnit.MINUTES);
        for (int blockSize2 = blockSize; blockSize2 < values.length / 2; blockSize2 *= 2) {
            for (int i = 0; i < values.length; i += blockSize2) {
                final int min = i;
                final int mid = Math.min(min + blockSize2, values.length);
                final int max = Math.min(min + blockSize2 * 2, values.length);
                mergeSort(values, min, mid, max);
            }
        }
    }

    private static boolean mergeSort(int[] values, int left, int mid, int end) {
        int[] results = new int[end - left];
        int l = left, r = mid, m = 0;
        for (; l < left && r < mid; m++) {
            int lv = values[l];
            int rv = values[r];
            if (lv < rv) {
                results[m] = lv;
                l++;
            } else {
                results[m] = rv;
                r++;
            }
        }
        while (l < mid)
            results[m++] = values[l++];
        while (r < end)
            results[m++] = values[r++];
        System.arraycopy(results, 0, values, left, results.length);
        return false;
    }
}
like image 44
Peter Lawrey Avatar answered Sep 28 '22 07:09

Peter Lawrey