Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the easiest way to parallelize a task in java?

Say I have a task like:

for(Object object: objects) {     Result result = compute(object);     list.add(result); } 

What is the easiest way to parallelize each compute() (assuming they are already parallelizable)?

I do not need an answer that matches strictly the code above, just a general answer. But if you need more info: my tasks are IO bound and this is for a Spring Web application and the tasks are going to be executed in a HTTP request.

like image 494
Eduardo Avatar asked Jan 06 '10 20:01

Eduardo


People also ask

How do you parallelize tasks?

Parallel tasks are split into subtasks that are assigned to multiple workers and then completed simultaneously. A worker system can carry out both parallel and concurrent tasks by working on multiple tasks at the same time while also breaking down each task into sub-tasks that are executed simultaneously.

How do you handle parallel processing in Java?

The 'Stream' interface in Java, which was introduced in Java 8, is used to manipulate data collections in a declarative fashion. Stream interface can also be used to execute processes in parallel, without making the process too complicated.

What is parallel thread in Java?

Within a Java application you work with several threads to achieve parallel processing or asynchronous behavior. Concurrency promises to perform certain task faster as these tasks can be divided into subtasks and these subtasks can be executed in parallel.

Does Java support parallelism?

Does Java have support for multicore processors/parallel processing? Yes. It also has been a platform for other programming languages where the implementation added a "true multithreading" or "real threading" selling point.


2 Answers

I would recommend taking a look at ExecutorService.

In particular, something like this:

ExecutorService EXEC = Executors.newCachedThreadPool(); List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); for (final Object object: objects) {     Callable<Result> c = new Callable<Result>() {         @Override         public Result call() throws Exception {             return compute(object);         }     };     tasks.add(c); } List<Future<Result>> results = EXEC.invokeAll(tasks); 

Note that using newCachedThreadPool could be bad if objects is a big list. A cached thread pool could create a thread per task! You may want to use newFixedThreadPool(n) where n is something reasonable (like the number of cores you have, assuming compute() is CPU bound).

Here's full code that actually runs:

import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;  public class ExecutorServiceExample {     private static final Random PRNG = new Random();      private static class Result {         private final int wait;         public Result(int code) {             this.wait = code;         }     }      public static Result compute(Object obj) throws InterruptedException {         int wait = PRNG.nextInt(3000);         Thread.sleep(wait);         return new Result(wait);     }      public static void main(String[] args) throws InterruptedException,         ExecutionException {         List<Object> objects = new ArrayList<Object>();         for (int i = 0; i < 100; i++) {             objects.add(new Object());         }          List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();         for (final Object object : objects) {             Callable<Result> c = new Callable<Result>() {                 @Override                 public Result call() throws Exception {                     return compute(object);                 }             };             tasks.add(c);         }          ExecutorService exec = Executors.newCachedThreadPool();         // some other exectuors you could try to see the different behaviours         // ExecutorService exec = Executors.newFixedThreadPool(3);         // ExecutorService exec = Executors.newSingleThreadExecutor();         try {             long start = System.currentTimeMillis();             List<Future<Result>> results = exec.invokeAll(tasks);             int sum = 0;             for (Future<Result> fr : results) {                 sum += fr.get().wait;                 System.out.println(String.format("Task waited %d ms",                     fr.get().wait));             }             long elapsed = System.currentTimeMillis() - start;             System.out.println(String.format("Elapsed time: %d ms", elapsed));             System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));         } finally {             exec.shutdown();         }     } } 
like image 104
overthink Avatar answered Oct 14 '22 06:10

overthink


With Java8 and later you can use a parallelStream on the collection to achieve this:

List<T> objects = ...;  List<Result> result = objects.parallelStream().map(object -> {             return compute(object);         }).collect(Collectors.toList()); 

Note: the order of the result list may not match the order in the objects list.

Details how to setup the right number of threads are available in this stackoverflow question how-many-threads-are-spawned-in-parallelstream-in-java-8

like image 36
i000174 Avatar answered Oct 14 '22 08:10

i000174