I've extended FutureTask
from java.util.concurrent
to provide callbacks to track the execution of tasks submitted to an ExecutorService
.
public class StatusTask<V> extends FutureTask<V> {
private final ITaskStatusHandler<V> statusHandler;
public StatusTask(Callable<V> callable, ITaskStatusHandler<V> statusHandler){
super(callable);
if (statusHandler == null)
throw new NullPointerException("statusHandler cannot be null");
this.statusHandler = statusHandler;
statusHandler.TaskCreated(this);
}
@Override
public void run() {
statusHandler.TaskRunning(this);
super.run();
}
@Override
protected void done() {
super.done();
statusHandler.TaskCompleted(this);
}
}
Now, what I see is if the task is submitted, but ends up queued and i cancel(true);
the task - the run()
method still gets called - and the FutureTask.run()
(likely) checks that the task is cancelled and doesn't call the wrapped callable.
Should I do e.g.
@Override
public void run() {
if(!isCancelled()) {
statusHandler.TaskRunning(this);
super.run();
}
}
Or should I still call super.run()
? Both these approaches seems susceptible to race conditions in between checking for cancellation and doing something about it.. any thoughts appreciated.
A Callable task can be canceled.
Two different methods are provided for shutting down an ExecutorService. The shutdown() method will allow previously submitted tasks to execute before terminating, while the shutdownNow() method prevents waiting tasks from starting and attempts to stop currently executing tasks.
When using an Executor, we can shut it down by calling the shutdown() or shutdownNow() methods. Although, it won't wait until all threads stop executing. Waiting for existing threads to complete their execution can be achieved by using the awaitTermination() method.
The Future. cancel() API is very useful in cancelling a task that has either not started yet, or has started but not completed. A completed task can't be cancelled in any way.
You're right that there's a race there. FutureTask#done()
will be called at most once, so if the task has already been canceled before it was ever run via RunnableFuture#run()
, you'll have missed the call to FutureTask#done()
.
Have you considered a simpler approach that always issues a symmetric set of paired calls to ITaskStatusHandler#taskRunning()
and ITaskStatusHandler#taskCompleted()
, like so?
@Override
public void run() {
statusHandler.TaskRunning(this);
try {
super.run();
finally {
statusHandler.TaskCompleted(this);
}
}
Once RunnableFuture#run()
is called, it's true that your task in running, or at least trying to be run. Once FutureTask#run()
is done, your task is no longer running. It so happens that in the case of cancellation, the transition is (nearly) immediate.
Trying to avoid calling ITaskStatusHandler#taskRunning()
if the inner Callable
or Runnable
is never invoked by FutureTask#run()
will require you to establish some shared structure between the Callable
or Runnable
and the FutureTask
-derived type itself, so that when your inner function is first called you set some flag that the outer FutureTask
-derived type can observe as a latch, indicating that yes, the function did start running before having been canceled. However, by then, you had to have committed to calling ITaskStatusHandler#taskRunning()
, so the distinction isn't so useful.
I've been struggling with a similar design problem lately, and wound up settling on the symmetric before and after operations in my overridden FutureTask#run()
method.
Your problem is that your future tasks are still executed after you have called cancel on them, right?
After a task was submitted to the executor service it should be managed by the executor. (You can still cancel a single task if you like.) You should cancel the execution with the executor shutdownNow method. (This will call the cancel method of all submitted tasks.) An shutdown would still execute all submitted tasks.
The executor does otherwise not "know" that a task was cancelled. It will call the method independently of the internal state of the future task.
The easiest approach would be to use the Executor framework as it is and write an Callable decorator.
class CallableDecorator{
CallableDecorator(Decorated decorated){
...
}
setTask(FutureTask task){
statusHandler.taskCreated(task);
}
void call(){
try{
statusHandler.taskRunning(task);
decorated.call();
}finally{
statusHandler.taskCompleted(task);
}
}
}
The only problem is that task cannot be in the constructor of the decorator. (It's a parameter of the future task constructor.) To break this cycle you have to use a setter or some proxy work around with constructor injection. Maybe this is not needed for the callback at all and you can say: statusHandler.callableStarted(decorated)
.
Depending on your requirements you may have to signal exceptions and interrupts.
Basic implementation:
class CallableDecorator<T> implements Callable<T> {
private final Callable<T> decorated;
CallableDecorator(Callable<T> decorated){
this.decorated = decorated;
}
@Override public T call() throws Exception {
out.println("before " + currentThread());
try {
return decorated.call();
}catch(InterruptedException e){
out.println("interupted " + currentThread());
throw e;
}
finally {
out.println("after " + currentThread());
}
}
}
ExecutorService executor = newFixedThreadPool(1);
Future<Long> normal = executor.submit(new CallableDecorator<Long>(
new Callable<Long>() {
@Override
public Long call() throws Exception {
return System.currentTimeMillis();
}
}));
out.println(normal.get());
Future<Long> blocking = executor.submit(new CallableDecorator<Long>(
new Callable<Long>() {
@Override
public Long call() throws Exception {
sleep(MINUTES.toMillis(2)); // blocking call
return null;
}
}));
sleep(SECONDS.toMillis(1));
blocking.cancel(true); // or executor.shutdownNow();
Output:
before Thread[pool-1-thread-1,5,main]
after Thread[pool-1-thread-1,5,main]
1259347519104
before Thread[pool-1-thread-1,5,main]
interupted Thread[pool-1-thread-1,5,main]
after Thread[pool-1-thread-1,5,main]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With