So I have a function in which finds a number bigger than N in a large unsorted array of numbers shown below.
import java.util.*;
public class program {
// Linear-search function to find the index of an element
public static int findIndex(int arr[], int t)
{
// if array is Null
if (arr == null) {
return -1;
}
// find length of array
int len = arr.length;
int i = 0;
// traverse in the array
while (i < len) {
// if the i-th element is t
// then return the index
if (arr[i] > t) {
return i;
}
else {
i = i + 1;
}
}
return -1;
}
// Driver Code
public static void main(String[] args)
{
int[] my_array = { 5, 4, 6, 1, 3, 2, 7, 8, 9 };
int i = findIndex(my_array, 7);
// find the index of 5
System.out.println("Index position of 5 is: "
+ my_array[i]);
}
}
But I have to find a way to implement this in parallel. I'm not sure how to start or what to do exactly as I'm fairly new in the field of parallel programming.
Any help will be appreciated.
The most straight forward way is to use the Parallel Stream as nicely illustrated by @Govinda Sakhare.
However, if you want to use this example as a way of learning about how to work with Threads then to parallelize your code follow the steps below:
To create the threads we can do the following:
Thread[] threads = new Thread[total_threads];
for(int t = 0; t < threads.length; t++) {
threads[t] = new Thread(/** the parallel work **/);
threads[t].start();
}
To assign the work to the threads, we need to split the array among the threads. The easiest way is actually split the iterations instead of the array itself. The threads receive the entire array, but only work with some of its positions, for instance:
private final static int NO_FOUND = -1;
// Linear-search function to find the index of an element
public static int findIndex(int[] arr, int t, int threadId, int total_threads){
for (int i = threadId; i < arr.length; i += total_threads)
if ( arr[i] > t)
return i;
return NO_FOUND;
}
To each thread we assign an ID
range from 0 to N-1
, with N
being the total number of threads.
To notify the threads we can use a shared Atomic Integer among threads that will be used to update the index of the value found. So the final code would look like the following:
public class program {
private final static int NO_FOUND = -1;
// Linear-search function to find the index of an element
public static int findIndex(int[] arr, int t, int threadId, int total_threads, AtomicInteger shared_index){
for (int i = threadId; i < arr.length && shared_index.get() == -1; i += total_threads)
if ( arr[i] > t)
return i;
return NO_FOUND;
}
public static void main(String[] args) throws InterruptedException {
final int N = 8;
int[] my_array = { 5, 4, 6, 1, 3, 2, 7, 8, 9 };
int total_threads = 4;
AtomicInteger shared_index = new AtomicInteger(-1);
Thread[] threads = new Thread[total_threads];
for(int t = 0; t < threads.length; t++) {
final int thread_id = t;
threads[t] = new Thread(() ->parallel_work(N, my_array, total_threads, shared_index, thread_id));
threads[t].start();
}
for (Thread thread : threads)
thread.join();
System.out.println("Index of value bigger than " + N + " : " + shared_index.get());
}
private static void parallel_work(int n, int[] my_array, int total_threads, AtomicInteger shared_index, int thread_id) {
int index_found = findIndex(my_array, n, thread_id, total_threads, shared_index);
shared_index.compareAndExchange(NO_FOUND, index_found);
}
}
OUTPUT:
Index of value bigger than 8 : 8
You can use Parallel Stream to run the operation in parallel.
public static int findIndex(int arr[], int t)
{
// if array is Null
if (arr == null) {
return -1;
}
return IntStream.rangeClosed(0, arr.length)
.parallel()
.filter(index -> arr[index] > t)
.findFirst()
.orElse(-1);
}
If you are doing it for learning purpose, you can divide the array into subarray, then submit each part to a separate thread, and then evaluate and return the result.
First, implement a callable that can take the subarray and return the index.
class RangeFinder implements Callable<Integer> {
private final int[] arr;
private final int startIndex;
private final int t;
RangeFinder(int[] arr, int startIndex, int t) {
this.arr = arr;
this.startIndex = startIndex;
this.t = t;
}
@Override
public Integer call() throws Exception {
for (int i = 0; i < this.arr.length; i++) {
if (arr[i] > t) {
return startIndex + i;
}
}
return -1;
}
}
Now you can divide the array into chunks and submit it for parallel processing.
public static int findIndex(int arr[], int t) throws ExecutionException, InterruptedException {
if (arr == null) {
return -1;
}
int numberOfThreads = 4;
return findIndexInParallel(arr, t, numberOfThreads);
}
private static int findIndexInParallel(int[] arr, int t, int threadCount) throws ExecutionException, InterruptedException {
if(threadCount > arr.length) {
threadCount = arr.length;
}
int startIndex = 0;
int range = (int) Math.ceil(arr.length / (double) threadCount);
int endIndex = range;
// step 1: create threads using Executor FrameWork
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
// step 2: create object of callable
RangeFinder rangeFinder = new RangeFinder(Arrays.copyOfRange(arr, startIndex, endIndex), startIndex, t);
// Step 3: submit the task to thread pool
Future<Integer> submit = executorService.submit(rangeFinder);
// Step 4: recalculate array indexes for the next iteration
startIndex = startIndex + range;
int newEndIndex = endIndex + range;
endIndex = newEndIndex < arr.length ? newEndIndex : arr.length;
futures.add(submit);
}
// step 5: evaluate and return the results
for (Future future : futures) {
int index = (int) future.get();
if(index != -1) {
executorService.shutdownNow();
return index;
}
}
return -1;
}
The above code can be further refactored, of course, you need to handle exceptions also.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With