Say I have a task like:
for(Object object: objects) { Result result = compute(object); list.add(result); }
What is the easiest way to parallelize each compute() (assuming they are already parallelizable)?
I do not need an answer that matches strictly the code above, just a general answer. But if you need more info: my tasks are IO bound and this is for a Spring Web application and the tasks are going to be executed in a HTTP request.
Parallel tasks are split into subtasks that are assigned to multiple workers and then completed simultaneously. A worker system can carry out both parallel and concurrent tasks by working on multiple tasks at the same time while also breaking down each task into sub-tasks that are executed simultaneously.
The 'Stream' interface in Java, which was introduced in Java 8, is used to manipulate data collections in a declarative fashion. Stream interface can also be used to execute processes in parallel, without making the process too complicated.
Within a Java application you work with several threads to achieve parallel processing or asynchronous behavior. Concurrency promises to perform certain task faster as these tasks can be divided into subtasks and these subtasks can be executed in parallel.
Does Java have support for multicore processors/parallel processing? Yes. It also has been a platform for other programming languages where the implementation added a "true multithreading" or "real threading" selling point.
I would recommend taking a look at ExecutorService.
In particular, something like this:
ExecutorService EXEC = Executors.newCachedThreadPool(); List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); for (final Object object: objects) { Callable<Result> c = new Callable<Result>() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } List<Future<Result>> results = EXEC.invokeAll(tasks);
Note that using newCachedThreadPool
could be bad if objects
is a big list. A cached thread pool could create a thread per task! You may want to use newFixedThreadPool(n)
where n is something reasonable (like the number of cores you have, assuming compute()
is CPU bound).
Here's full code that actually runs:
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceExample { private static final Random PRNG = new Random(); private static class Result { private final int wait; public Result(int code) { this.wait = code; } } public static Result compute(Object obj) throws InterruptedException { int wait = PRNG.nextInt(3000); Thread.sleep(wait); return new Result(wait); } public static void main(String[] args) throws InterruptedException, ExecutionException { List<Object> objects = new ArrayList<Object>(); for (int i = 0; i < 100; i++) { objects.add(new Object()); } List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); for (final Object object : objects) { Callable<Result> c = new Callable<Result>() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } ExecutorService exec = Executors.newCachedThreadPool(); // some other exectuors you could try to see the different behaviours // ExecutorService exec = Executors.newFixedThreadPool(3); // ExecutorService exec = Executors.newSingleThreadExecutor(); try { long start = System.currentTimeMillis(); List<Future<Result>> results = exec.invokeAll(tasks); int sum = 0; for (Future<Result> fr : results) { sum += fr.get().wait; System.out.println(String.format("Task waited %d ms", fr.get().wait)); } long elapsed = System.currentTimeMillis() - start; System.out.println(String.format("Elapsed time: %d ms", elapsed)); System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d))); } finally { exec.shutdown(); } } }
With Java8 and later you can use a parallelStream on the collection to achieve this:
List<T> objects = ...; List<Result> result = objects.parallelStream().map(object -> { return compute(object); }).collect(Collectors.toList());
Note: the order of the result list may not match the order in the objects list.
Details how to setup the right number of threads are available in this stackoverflow question how-many-threads-are-spawned-in-parallelstream-in-java-8
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With