Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

use FutureTask for concurrency

I have a service like:

class DemoService {
    Result process(Input in) {
        filter1(in);
        if (filter2(in)) return...
        filter3(in);
        filter4(in);
        filter5(in);
        return ...

    }
}

Now I want it faster and I found that some filters can start at the same time, while some filters must wait for others to finish. For example:

filter1--
         |---filter3--
filter2--             |---filter5
          ---filter4--

which means:

1.filter1 and filter2 can start at the same time, so do filter3 and filter4

2.filter3 and filter4 must wait for filter2 to finish

one more thing:

if filter2 returns true, then the 'process' method returns immediately and ignores the following filters.

now my solution is using FutureTask:

            // do filter's work at FutureTask
        for (Filter filter : filters) {
            FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
            executorService.execute(futureTask);
        }

        //when all FutureTask are submitted, wait for result
        for(Filter filter : filters) {
            if (filter.isReturnNeeded()) {
                FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
                riskResult = futureTask.get();
                if (canReturn(filter, riskResult)) {
                    returnOk = true;
                    return riskResult;
                }
            }
        }

my CallableFilter:

public class CallableFilter implements Callable<RiskResult> {

    private Filter filter;
    private Context context;

    @Override
    public RiskResult call() throws Exception {
        List<Filter> dependencies = filter.getDependentFilters();
        if (dependencies != null && dependencies.size() > 0) {

            //wait for its dependency filters to finish
            for (Filter d : dependencies) {
                FutureTask<RiskResult> futureTask = context.getTask(d.getId());
                futureTask.get();

            }
        }

        //do its own work
        return filter.execute(context);
    }
}

I want to know:

1.is it a good idea to use FutureTask in the case? is there a better solution?

2.the overhead of thread context switch.

thanks!

like image 740
bylijinnan Avatar asked Mar 02 '15 13:03

bylijinnan


2 Answers

In Java 8 you can use CompletableFuture to chain your filters after each other. Use the thenApply and thenCompose family of methods in order to add new asynchronous filters to the CompletableFuture - they will execute after the previous step is finished. thenCombine combines two independent CompletableFutures when both are finished. Use allOf to wait for the result of more than two CompletableFuture objects.

If you can't use Java 8, then the Guava ListenableFuture can do the same, see Listenable Future Explained. With Guava you can wait for multiple independently running filters to finish with Futures.allAsList - this also returns a ListenableFuture.

With both approaches the idea is that after you declare your future actions, their dependencies on each other, and their threads, you get back a single Future object, which encapsulates your end result.

EDIT: The early return could be implemented by explicitly completing the CompletableFuture with the complete() method or using a Guava SettableFuture (which implements ListenableFuture)

like image 71
lbalazscs Avatar answered Oct 17 '22 10:10

lbalazscs


You can use a ForkJoinPool for parallelization, which is explicitely thought for that kind of parallel computions:

(...) Method join() and its variants are appropriate for use only when completion dependencies are acyclic; that is, the parallel computation can be described as a directed acyclic graph (DAG) (...)

(see ForkJoinTask)

The advantage of a ForkJoinPool is that every task can spawn new tasks and also wait for them to complete without actually blocking the executing thread (which otherwise might cause a deadlock if more tasks are waiting for others to complete than threads are available).

This is an example that should work so far, although it has some limitations yet:

  1. It ignores filter results
  2. It does not prematurely finish execution if filter 2 returns true
  3. Exception handling is not implemented

The main idea behind this code: Every filter is represented as Node that may depend on other nodes (= filters that must complete before this filter can execute). Dependent nodes are spawned as parallel tasks.

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Node<V> extends RecursiveTask<V> {
    private static final short VISITED = 1;

    private final Callable<V> callable;
    private final Set<Node<V>> dependencies = new HashSet<>();

    @SafeVarargs
    public Node(Callable<V> callable, Node<V>... dependencies) {
        this.callable = callable;
        this.dependencies.addAll(Arrays.asList(dependencies));
    }

    public Set<Node<V>> getDependencies() {
        return this.dependencies;
    }

    @Override
    protected V compute() {
        try {
            // resolve dependencies first
            for (Node<V> node : dependencies) {
                if (node.tryMarkVisited()) {
                    node.fork(); // start node
                }
            }

            // wait for ALL nodes to complete
            for (Node<V> node : dependencies) {
                node.join();
            }

            return callable.call();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }

    public boolean tryMarkVisited() {
        return compareAndSetForkJoinTaskTag((short) 0, VISITED);
    }
}

Usage example:

public static void main(String[] args) {
    Node<Void> filter1 = new Node<>(filter("filter1"));
    Node<Void> filter2 = new Node<>(filter("filter2"));
    Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2);
    Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2);
    Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4);
    Node<Void> root = new Node<>(() -> null, filter5);

    ForkJoinPool.commonPool().invoke(root);
}

public static Callable<Void> filter(String name) {
    return () -> {
        System.out.println(Thread.currentThread().getName() + ": start " + name);
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + ": end   " + name);
        return null;
    };
}
like image 23
isnot2bad Avatar answered Oct 17 '22 12:10

isnot2bad