The ultimate goal is to add extra behavior to ListenableFutures based on the type of the Callable
/Runnable
argument. I want to add extra behavior to each of the Future methods. (Example use cases can be found in AbstractExecutorService's javadoc and section 7.1.7 of Goetz's Java Concurrency in Practice)
I have an existing ExecutorService which overrides newTaskFor. It tests the argument's type and creates a subclass of FutureTask
. This naturally supports submit as well as invokeAny and invokeAll.
How do I get the same effect for the ListenableFutures returned by a ListeningExecutorService?
Put another way, where can I put this code
if (callable instanceof SomeClass) {
return new FutureTask<T>(callable) {
public boolean cancel(boolean mayInterruptIfRunning) {
System.out.println("Canceling Task");
return super.cancel(mayInterruptIfRunning);
}
};
} else {
return new FutureTask<T>(callable);
}
such that my client can execute the println
statement with
ListeningExecutorService executor = ...;
Collection<Callable> callables = ImmutableSet.of(new SomeClass());
List<Future<?>> futures = executor.invokeAll(callables);
for (Future<?> future : futures) {
future.cancel(true);
}
Here's a list of things I've already tried and why they don't work.
Pass MyExecutorService
to MoreExecutors.listeningDecorator.
Problem 1: Unfortunately the resulting ListeningExecutorService (an AbstractListeningExecutorService
) doesn't delegate to the ExecutorService methods, it delegates to the execute(Runnable) method on Executor. As a result, the newTaskFor
method on MyExecutorService
is never called.
Problem 2: AbstractListeningExecutorService
creates the Runnable (a ListenableFutureTask) via static factory method which I can't extend.
Inside newTaskFor
, create MyRunnableFuture
normally and then wrap it with a ListenableFutureTask
.
Problem 1: ListenableFutureTask's factory methods don't accept RunnableFutures, they accept Runnable
and Callable
. If I pass MyRunnableFuture
as a Runnable, the resulting ListenableFutureTask
just calls run()
and not any of the Future
methods (where my behavior is).
Problem 2: Even if it did call my Future
methods, MyRunnableFuture
is not a Callable
, so I have to supply a return value when I create the ListenableFutureTask
... which I don't have... hence the Callable
.
Let MyRunnableFuture extend ListenableFutureTask
instead of FutureTask
Problem: ListenableFutureTask
is now final (as of r10 / r11).
Let MyRunnableFuture
extend ForwardingListenableFuture and implement RunnableFuture. Then wrap the SomeClass
argument in a ListenableFutureTask
and return that from delegate()
Problem: It hangs. I don't understand the problem well enough to explain it, but this configuration causes a deadlock in FutureTask.Sync .
Source Code: As requested, here's the source for Solution D which hangs:
import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.*;
/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceD extends ThreadPoolExecutor implements ListeningExecutorService {
// ===== Test Harness =====
private static interface SomeInterface {
public String getName();
}
private static class SomeClass implements SomeInterface, Callable<Void>, Runnable {
private final String name;
private SomeClass(String name) {
this.name = name;
}
public Void call() throws Exception {
System.out.println("SomeClass.call");
return null;
}
public void run() {
System.out.println("SomeClass.run");
}
public String getName() {
return name;
}
}
private static class MyListener implements FutureCallback<Void> {
public void onSuccess(Void result) {
System.out.println("MyListener.onSuccess");
}
public void onFailure(Throwable t) {
System.out.println("MyListener.onFailure");
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("Main.start");
SomeClass someClass = new SomeClass("Main.someClass");
ListeningExecutorService executor = new MyListeningExecutorServiceD();
Collection<Callable<Void>> callables = ImmutableSet.<Callable<Void>>of(someClass);
List<Future<Void>> futures = executor.invokeAll(callables);
for (Future<Void> future : futures) {
Futures.addCallback((ListenableFuture<Void>) future, new MyListener());
future.cancel(true);
}
System.out.println("Main.done");
}
// ===== Implementation =====
private static class MyRunnableFutureD<T> extends ForwardingListenableFuture<T> implements RunnableFuture<T> {
private final ListenableFuture<T> delegate;
private final SomeInterface someClass;
private MyRunnableFutureD(SomeInterface someClass, Runnable runnable, T value) {
assert someClass == runnable;
this.delegate = ListenableFutureTask.create(runnable, value);
this.someClass = someClass;
}
private MyRunnableFutureD(SomeClass someClass, Callable<T> callable) {
assert someClass == callable;
this.delegate = ListenableFutureTask.create(callable);
this.someClass = someClass;
}
@Override
protected ListenableFuture<T> delegate() {
return delegate;
}
public void run() {
System.out.println("MyRunnableFuture.run");
try {
delegate.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
System.out.println("MyRunnableFuture.cancel " + someClass.getName());
return super.cancel(mayInterruptIfRunning);
}
}
public MyListeningExecutorServiceD() {
// Same as Executors.newSingleThreadExecutor for now
super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (runnable instanceof SomeClass) {
return new MyRunnableFutureD<T>((SomeClass) runnable, runnable, value);
} else {
return new FutureTask<T>(runnable, value);
}
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof SomeClass) {
return new MyRunnableFutureD<T>((SomeClass) callable, callable);
} else {
return new FutureTask<T>(callable);
}
}
/** Must override to supply co-variant return type */
@Override
public ListenableFuture<?> submit(Runnable task) {
return (ListenableFuture<?>) super.submit(task);
}
/** Must override to supply co-variant return type */
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
return (ListenableFuture<T>) super.submit(task, result);
}
/** Must override to supply co-variant return type */
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
return (ListenableFuture<T>) super.submit(task);
}
}
The simplest way we can obtain a ListenableFuture is by submitting a task to a ListeningExecutorService (much like how we would use a normal ExecutorService to obtain a normal Future ): Notice how we use the MoreExecutors class to decorate our ExecutorService as a ListeningExecutorService.
Notice that the lExecService here is the executor that is running our asynchronous task, while the listeningExecutor is the executor on which our listener is invoked. As seen above, we should always consider separating these two executors to avoid scenarios where our listeners and workers are competing for the same thread pool resources.
The keepalive time is five milliseconds. And, there is a blocking queue to watch for tasks in the future. The ExecutorService in Java is a subinterface of the executor framework. It provides certain functions to manage the thread life cycle of an application. There is also a submit () method that can accept both runnable and callable objects.
One way we can add a listener to a ListenableFuture is by registering a callback with Futures.addCallback (), providing us access to the result or exception when success or failure occurs: We can also add a listener by adding it directly to the ListenableFuture.
Based on this question and a couple others discussions I've had recently, I'm coming to the conclusion that RunnableFuture
/FutureTask
is inherently misleading: Clearly you submit a Runnable
, and clearly you get a Future
back, and clearly the underlying Thread
needs a Runnable
. But why should a class implement both Runnable
and Future
? And if it does, which Runnable
is it replacing? That's bad enough already, but then we introduce multiple levels of executors, and things really get out of hand.
If there's a solution here, I think it's going to require treating FutureTask
as an implementation detail of AbstractExecutorService
. I'd focus instead on splitting the problem into two pieces:
Future
.Runnable
/Future
distinction.)(grumble Markdown grumble)
class MyWrapperExecutor extends ForwardingListeningExecutorService {
private final ExecutorService delegateExecutor;
@Override public <T> ListenableFuture<T> submit(Callable<T> task) {
if (callable instanceof SomeClass) {
// Modify and submit Callable (or just submit the original Callable):
ListenableFuture<T> delegateFuture =
delegateExecutor.submit(new MyCallable(callable));
// Modify Future:
return new MyWrapperFuture<T>(delegateFuture);
} else {
return delegateExecutor.submit(callable);
}
}
// etc.
}
Could that work?
According to ListeningExecutorService Javadoc, you can use MoreExecutors.listeningDecorator to decorate your own ExecutorService.
So use your ExecutorService that overrides newTaskFor, and wrap it with the method above. Would that work for you?
UPDATE
Ok, this is what I would do:
1) Download Guava sources if you haven't already.
2) Don't use listeningDecorator, instead, make your custom ExecutorService implement ListeningExecutorService.
3) Your subclass of FutureTask should implement ListenableFuture, and copy the code from ListenableFutureTask, which is quite simple, and then add your cancel method override.
4) Implement the methods of ListeningExecutorService on your custom ExecutorService by changing the returning method of the existing methods to ListenableFuture.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With