Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write a function to find a value bigger than N in parallel

So I have a function in which finds a number bigger than N in a large unsorted array of numbers shown below.

import java.util.*;

public class program {

    // Linear-search function to find the index of an element
    public static int findIndex(int arr[], int t)
    {
        // if array is Null
        if (arr == null) {
            return -1;
        }

        // find length of array
        int len = arr.length;
        int i = 0;

        // traverse in the array
        while (i < len) {

            // if the i-th element is t
            // then return the index
            if (arr[i] > t) {
                return i;
            }
            else {
                i = i + 1;
            }
        }
        return -1;
        }

   // Driver Code
   public static void main(String[] args)
   {
      int[] my_array = { 5, 4, 6, 1, 3, 2, 7, 8, 9 };

      int i = findIndex(my_array, 7);
       // find the index of 5
       System.out.println("Index position of 5 is: "
                    + my_array[i]);
   }
}

But I have to find a way to implement this in parallel. I'm not sure how to start or what to do exactly as I'm fairly new in the field of parallel programming.

Any help will be appreciated.

like image 267
tamerjar Avatar asked Jan 23 '21 11:01

tamerjar


2 Answers

The most straight forward way is to use the Parallel Stream as nicely illustrated by @Govinda Sakhare.

However, if you want to use this example as a way of learning about how to work with Threads then to parallelize your code follow the steps below:

  1. Create the threads;
  2. Assign the work to the threads, i.e., each thread will try to find a value bigger than the one passed as parameter but only for a part of the array;
  3. The first Thread that founds the value notifies the others about it so that every single one exits.

To create the threads we can do the following:

Thread[] threads = new Thread[total_threads];
for(int t = 0; t < threads.length; t++) {
    threads[t] = new Thread(/** the parallel work **/);
    threads[t].start();
}

To assign the work to the threads, we need to split the array among the threads. The easiest way is actually split the iterations instead of the array itself. The threads receive the entire array, but only work with some of its positions, for instance:

private final static int NO_FOUND = -1;

// Linear-search function to find the index of an element
public static int findIndex(int[] arr, int t, int threadId, int total_threads){
    for (int i = threadId; i < arr.length; i += total_threads)
        if ( arr[i] > t)
            return i;
    return NO_FOUND;
}

To each thread we assign an ID range from 0 to N-1, with N being the total number of threads.

To notify the threads we can use a shared Atomic Integer among threads that will be used to update the index of the value found. So the final code would look like the following:

public class program {
    private final static int NO_FOUND = -1;

    // Linear-search function to find the index of an element
    public static int findIndex(int[] arr, int t, int threadId, int total_threads, AtomicInteger shared_index){
        for (int i = threadId; i < arr.length && shared_index.get() == -1; i += total_threads)
            if ( arr[i] > t)
                return i;
        return NO_FOUND;
    }

    public static void main(String[] args) throws InterruptedException {
        final int N = 8;
        int[] my_array = { 5, 4, 6, 1, 3, 2, 7, 8, 9 };
        int total_threads = 4;

        AtomicInteger shared_index = new AtomicInteger(-1);

        Thread[] threads = new Thread[total_threads];
        for(int t = 0; t < threads.length; t++) {
            final int thread_id = t;
            threads[t] = new Thread(() ->parallel_work(N, my_array, total_threads, shared_index, thread_id));
            threads[t].start();
        }

        for (Thread thread : threads) 
            thread.join();
        
        System.out.println("Index of value bigger than " + N + " : " + shared_index.get());
    }

    private static void parallel_work(int n, int[] my_array, int total_threads, AtomicInteger shared_index, int thread_id) {
        int index_found = findIndex(my_array, n, thread_id, total_threads, shared_index);
        shared_index.compareAndExchange(NO_FOUND, index_found);
    }
}

OUTPUT:

Index of value bigger than 8 : 8
like image 101
dreamcrash Avatar answered Oct 23 '22 20:10

dreamcrash


You can use Parallel Stream to run the operation in parallel.

public static int findIndex(int arr[], int t)
{
    // if array is Null
    if (arr == null) {
        return -1;
    }

    return IntStream.rangeClosed(0, arr.length)
                    .parallel()
                    .filter(index -> arr[index] > t)
                    .findFirst()
                    .orElse(-1);
}

If you are doing it for learning purpose, you can divide the array into subarray, then submit each part to a separate thread, and then evaluate and return the result.

First, implement a callable that can take the subarray and return the index.

class RangeFinder implements Callable<Integer> {
    private final int[] arr;
    private final int startIndex;
    private final int t;

    RangeFinder(int[] arr, int startIndex, int t) {
        this.arr = arr;
        this.startIndex = startIndex;
        this.t = t;
    }


    @Override
    public Integer call() throws Exception {
        for (int i = 0; i < this.arr.length; i++) {
            if (arr[i] > t) {
                return startIndex + i;
            }
        }
        return -1;
    }
}

Now you can divide the array into chunks and submit it for parallel processing.

public static int findIndex(int arr[], int t) throws ExecutionException, InterruptedException {
    if (arr == null) {
        return -1;
    }

    int numberOfThreads = 4;
    return findIndexInParallel(arr, t, numberOfThreads);
}

private static int findIndexInParallel(int[] arr, int t, int threadCount) throws ExecutionException, InterruptedException {
    if(threadCount > arr.length) {
        threadCount = arr.length;
    }

    int startIndex = 0;
    int range = (int) Math.ceil(arr.length / (double) threadCount);
    int endIndex = range;

    // step 1: create threads using Executor FrameWork
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    List<Future> futures = new ArrayList<>();
    for (int i = 0; i < threadCount; i++) {

        // step 2: create object of callable
        RangeFinder rangeFinder = new RangeFinder(Arrays.copyOfRange(arr, startIndex, endIndex), startIndex, t);

        // Step 3: submit the task to thread pool
        Future<Integer> submit = executorService.submit(rangeFinder);

        // Step 4: recalculate array indexes for the next iteration
        startIndex = startIndex + range;
        int newEndIndex = endIndex + range;
        endIndex = newEndIndex < arr.length ? newEndIndex : arr.length;
        futures.add(submit);
    }

    // step 5: evaluate and return the results
    for (Future future : futures) {
        int index = (int) future.get();
        if(index != -1) {
            executorService.shutdownNow();
            return index;
        }
    }
    return -1;
}

The above code can be further refactored, of course, you need to handle exceptions also.

like image 25
Govinda Sakhare Avatar answered Oct 23 '22 18:10

Govinda Sakhare