I have a service like:
class DemoService {
Result process(Input in) {
filter1(in);
if (filter2(in)) return...
filter3(in);
filter4(in);
filter5(in);
return ...
}
}
Now I want it faster and I found that some filters can start at the same time, while some filters must wait for others to finish. For example:
filter1--
|---filter3--
filter2-- |---filter5
---filter4--
which means:
1.filter1 and filter2 can start at the same time, so do filter3 and filter4
2.filter3 and filter4 must wait for filter2 to finish
one more thing:
if filter2 returns true, then the 'process' method returns immediately and ignores the following filters.
now my solution is using FutureTask:
// do filter's work at FutureTask
for (Filter filter : filters) {
FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
executorService.execute(futureTask);
}
//when all FutureTask are submitted, wait for result
for(Filter filter : filters) {
if (filter.isReturnNeeded()) {
FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
riskResult = futureTask.get();
if (canReturn(filter, riskResult)) {
returnOk = true;
return riskResult;
}
}
}
my CallableFilter:
public class CallableFilter implements Callable<RiskResult> {
private Filter filter;
private Context context;
@Override
public RiskResult call() throws Exception {
List<Filter> dependencies = filter.getDependentFilters();
if (dependencies != null && dependencies.size() > 0) {
//wait for its dependency filters to finish
for (Filter d : dependencies) {
FutureTask<RiskResult> futureTask = context.getTask(d.getId());
futureTask.get();
}
}
//do its own work
return filter.execute(context);
}
}
I want to know:
1.is it a good idea to use FutureTask in the case? is there a better solution?
2.the overhead of thread context switch.
thanks!
In Java 8 you can use CompletableFuture to chain your filters after each other. Use the thenApply and thenCompose family of methods in order to add new asynchronous filters to the CompletableFuture - they will execute after the previous step is finished. thenCombine combines two independent CompletableFutures when both are finished. Use allOf to wait for the result of more than two CompletableFuture objects.
If you can't use Java 8, then the Guava ListenableFuture can do the same, see Listenable Future Explained. With Guava you can wait for multiple independently running filters to finish with Futures.allAsList - this also returns a ListenableFuture.
With both approaches the idea is that after you declare your future actions, their dependencies on each other, and their threads, you get back a single Future object, which encapsulates your end result.
EDIT: The early return could be implemented by explicitly completing the CompletableFuture with the complete() method or using a Guava SettableFuture (which implements ListenableFuture)
You can use a ForkJoinPool
for parallelization, which is explicitely thought for that kind of parallel computions:
(...) Method join() and its variants are appropriate for use only when completion dependencies are acyclic; that is, the parallel computation can be described as a directed acyclic graph (DAG) (...)
(see ForkJoinTask
)
The advantage of a ForkJoinPool
is that every task can spawn new tasks and also wait for them to complete without actually blocking the executing thread (which otherwise might cause a deadlock if more tasks are waiting for others to complete than threads are available).
This is an example that should work so far, although it has some limitations yet:
true
The main idea behind this code: Every filter is represented as Node
that may depend on other nodes (= filters that must complete before this filter can execute). Dependent nodes are spawned as parallel tasks.
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class Node<V> extends RecursiveTask<V> {
private static final short VISITED = 1;
private final Callable<V> callable;
private final Set<Node<V>> dependencies = new HashSet<>();
@SafeVarargs
public Node(Callable<V> callable, Node<V>... dependencies) {
this.callable = callable;
this.dependencies.addAll(Arrays.asList(dependencies));
}
public Set<Node<V>> getDependencies() {
return this.dependencies;
}
@Override
protected V compute() {
try {
// resolve dependencies first
for (Node<V> node : dependencies) {
if (node.tryMarkVisited()) {
node.fork(); // start node
}
}
// wait for ALL nodes to complete
for (Node<V> node : dependencies) {
node.join();
}
return callable.call();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
public boolean tryMarkVisited() {
return compareAndSetForkJoinTaskTag((short) 0, VISITED);
}
}
Usage example:
public static void main(String[] args) {
Node<Void> filter1 = new Node<>(filter("filter1"));
Node<Void> filter2 = new Node<>(filter("filter2"));
Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2);
Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2);
Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4);
Node<Void> root = new Node<>(() -> null, filter5);
ForkJoinPool.commonPool().invoke(root);
}
public static Callable<Void> filter(String name) {
return () -> {
System.out.println(Thread.currentThread().getName() + ": start " + name);
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + ": end " + name);
return null;
};
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With