Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Executing Dependent tasks in parallel in Java

I need to find a way to execute tasks (dependent and independent) in parallel in java.

  1. Task A and Task C can run independently.
  2. Task B is dependent on the output of Task A.

I checked java.util.concurrent Future and Fork/Join, but looks like we cannot add dependency to a Task.

Can anyone point me to correct Java API.

like image 581
rocky Avatar asked Jun 01 '12 17:06

rocky


People also ask

How do you perform parallel execution of Java?

The 'Stream' interface in Java, which was introduced in Java 8, is used to manipulate data collections in a declarative fashion. Stream interface can also be used to execute processes in parallel, without making the process too complicated.

Are Java threads executed in parallel?

Within a Java application you work with several threads to achieve parallel processing or asynchronous behavior. Concurrency promises to perform certain task faster as these tasks can be divided into subtasks and these subtasks can be executed in parallel.

What is parallel processing in Java?

Parallel computing involves dividing a problem into subproblems, solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), and then combining the results of the solutions to the subproblems.

Is Java multithreading concurrent or parallel?

Some notes when we use concurrency and parallelism in JavaAn application can be concurrent, but not parallel. It means that it can process more than one task at the same time, but no two tasks are executing at the exact same time. A thread is only executing one task at a time.


3 Answers

In Scala this is very easy to do, and I think you are better off using Scala. Here is an example I pulled from here http://danielwestheide.com/ (The Neophyte’s Guide to Scala Part 16: Where to Go From Here) this guy has a great blog (I am not that guy)

Lets take a barrista making coffee. The tasks to do are:

  1. Grind the required coffee beans (no preceding tasks)
  2. Heat some water (no preceding tasks)
  3. Brew an espresso using the ground coffee and the heated water (depends on 1 & 2)
  4. Froth some milk (no preceding tasks)
  5. Combine the froth milk and the espresso (depends on 3,4)

or as a tree:

Grind   _
Coffe    \
          \   
Heat    ___\_Brew____ 
Water                \_____Combine
                     /
Foam    ____________/
Milk

In java using the concurrency api this would be:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Barrista {

    static class HeatWater implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Heating Water");
            Thread.sleep(1000);
            return "hot water";
        }
    }

    static class GrindBeans implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Grinding Beans");
            Thread.sleep(2000);
            return "grinded beans";
        }
    }

    static class Brew implements Callable<String> {

        final Future<String> grindedBeans;
        final Future<String> hotWater;

        public Brew(Future<String> grindedBeans, Future<String> hotWater) {
            this.grindedBeans = grindedBeans;
            this.hotWater = hotWater;
        }

        @Override
        public String call() throws Exception
        {
            System.out.println("brewing coffee with " + grindedBeans.get()
                    + " and " + hotWater.get());
            Thread.sleep(1000);
            return "brewed coffee";
        }
    }

    static class FrothMilk implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "some milk";
        }
    }

    static class Combine implements Callable<String> {

        public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
            super();
            this.frothedMilk = frothedMilk;
            this.brewedCoffee = brewedCoffee;
        }

        final Future<String> frothedMilk;
        final Future<String> brewedCoffee;

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            System.out.println("Combining " + frothedMilk.get() + " "
                    + brewedCoffee.get());
            return "Final Coffee";
        }

    }

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2);

        FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
        FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
        FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
        FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
        FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));

        executor.execute(heatWaterFuture);
        executor.execute(grindBeans);
        executor.execute(brewCoffee);
        executor.execute(frothMilk);
        executor.execute(combineCoffee);


        try {

            /**
             *  Warning this code is blocking !!!!!!!
             */         
            System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
            e.printStackTrace();
        } finally{
                executor.shutdown();
            }
        }
    }

Make sure that you add time outs though to ensure that your code will not wait forever on something to complete, that is done by using the Future.get(long, TimeUnit) and then handle failure accordingly.

It is much nicer in scala however, here it is like it's on the blog: The code to prepare some coffee would look something like this:

def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

where all the methods return a future (typed future), for instance grind would be something like this:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
   // grinding function contents
}

For all the implementations check out the blog but that's all there is to it. You can integrate Scala and Java easily as well. I really recommend doing this sort of thing in Scala instead of Java. Scala requires much less code, much cleaner and event driven.

like image 190
Derrops Avatar answered Oct 11 '22 17:10

Derrops


General programming model for tasks with dependencies is Dataflow. Simplified model where each task has only one, though repeating, dependency is Actor model. There are many actor libraries for Java, but very few for dataflow. See also: which-actor-model-library-framework-for-java, java-pattern-for-nested-callbacks

like image 26
Alexei Kaigorodov Avatar answered Oct 11 '22 18:10

Alexei Kaigorodov


Use a BlockingQueue. Put the output of task A into the queue, and task B blocks until something is available in the queue.

The docs contain example code to achieve this: http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html

like image 30
Steve McLeod Avatar answered Oct 11 '22 17:10

Steve McLeod