I need to find a way to execute tasks (dependent and independent) in parallel in java.
I checked java.util.concurrent Future and Fork/Join, but looks like we cannot add dependency to a Task.
Can anyone point me to correct Java API.
The 'Stream' interface in Java, which was introduced in Java 8, is used to manipulate data collections in a declarative fashion. Stream interface can also be used to execute processes in parallel, without making the process too complicated.
Within a Java application you work with several threads to achieve parallel processing or asynchronous behavior. Concurrency promises to perform certain task faster as these tasks can be divided into subtasks and these subtasks can be executed in parallel.
Parallel computing involves dividing a problem into subproblems, solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), and then combining the results of the solutions to the subproblems.
Some notes when we use concurrency and parallelism in JavaAn application can be concurrent, but not parallel. It means that it can process more than one task at the same time, but no two tasks are executing at the exact same time. A thread is only executing one task at a time.
In Scala this is very easy to do, and I think you are better off using Scala. Here is an example I pulled from here http://danielwestheide.com/ (The Neophyte’s Guide to Scala Part 16: Where to Go From Here) this guy has a great blog (I am not that guy)
Lets take a barrista making coffee. The tasks to do are:
or as a tree:
Grind _
Coffe \
\
Heat ___\_Brew____
Water \_____Combine
/
Foam ____________/
Milk
In java using the concurrency api this would be:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Barrista {
static class HeatWater implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Heating Water");
Thread.sleep(1000);
return "hot water";
}
}
static class GrindBeans implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Grinding Beans");
Thread.sleep(2000);
return "grinded beans";
}
}
static class Brew implements Callable<String> {
final Future<String> grindedBeans;
final Future<String> hotWater;
public Brew(Future<String> grindedBeans, Future<String> hotWater) {
this.grindedBeans = grindedBeans;
this.hotWater = hotWater;
}
@Override
public String call() throws Exception
{
System.out.println("brewing coffee with " + grindedBeans.get()
+ " and " + hotWater.get());
Thread.sleep(1000);
return "brewed coffee";
}
}
static class FrothMilk implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "some milk";
}
}
static class Combine implements Callable<String> {
public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
super();
this.frothedMilk = frothedMilk;
this.brewedCoffee = brewedCoffee;
}
final Future<String> frothedMilk;
final Future<String> brewedCoffee;
@Override
public String call() throws Exception {
Thread.sleep(1000);
System.out.println("Combining " + frothedMilk.get() + " "
+ brewedCoffee.get());
return "Final Coffee";
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));
executor.execute(heatWaterFuture);
executor.execute(grindBeans);
executor.execute(brewCoffee);
executor.execute(frothMilk);
executor.execute(combineCoffee);
try {
/**
* Warning this code is blocking !!!!!!!
*/
System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
e.printStackTrace();
} finally{
executor.shutdown();
}
}
}
Make sure that you add time outs though to ensure that your code will not wait forever on something to complete, that is done by using the Future.get(long, TimeUnit) and then handle failure accordingly.
It is much nicer in scala however, here it is like it's on the blog: The code to prepare some coffee would look something like this:
def prepareCappuccino(): Try[Cappuccino] = for {
ground <- Try(grind("arabica beans"))
water <- Try(heatWater(Water(25)))
espresso <- Try(brew(ground, water))
foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
where all the methods return a future (typed future), for instance grind would be something like this:
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
// grinding function contents
}
For all the implementations check out the blog but that's all there is to it. You can integrate Scala and Java easily as well. I really recommend doing this sort of thing in Scala instead of Java. Scala requires much less code, much cleaner and event driven.
General programming model for tasks with dependencies is Dataflow. Simplified model where each task has only one, though repeating, dependency is Actor model. There are many actor libraries for Java, but very few for dataflow. See also: which-actor-model-library-framework-for-java, java-pattern-for-nested-callbacks
Use a BlockingQueue. Put the output of task A into the queue, and task B blocks until something is available in the queue.
The docs contain example code to achieve this: http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With