Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Extending FutureTask, how to handle cancel

I've extended FutureTask from java.util.concurrent to provide callbacks to track the execution of tasks submitted to an ExecutorService.

public class StatusTask<V> extends FutureTask<V> {

    private final ITaskStatusHandler<V> statusHandler;

    public StatusTask(Callable<V> callable, ITaskStatusHandler<V> statusHandler){
        super(callable);
        if (statusHandler == null)
            throw new NullPointerException("statusHandler cannot be null");
        this.statusHandler = statusHandler;
        statusHandler.TaskCreated(this);
    }

    @Override
    public void run() {
        statusHandler.TaskRunning(this);
        super.run();
    }

    @Override
    protected void done() {
        super.done();
        statusHandler.TaskCompleted(this);
    }

}

Now, what I see is if the task is submitted, but ends up queued and i cancel(true); the task - the run() method still gets called - and the FutureTask.run() (likely) checks that the task is cancelled and doesn't call the wrapped callable.

Should I do e.g.

@Override
public void run() {
  if(!isCancelled()) {  
    statusHandler.TaskRunning(this);
    super.run();
  }
}

Or should I still call super.run() ? Both these approaches seems susceptible to race conditions in between checking for cancellation and doing something about it.. any thoughts appreciated.

like image 821
Anonym Avatar asked Nov 27 '09 13:11

Anonym


People also ask

Can a Callable task can be cancelled?

A Callable task can be canceled.

How to stop executor java?

Two different methods are provided for shutting down an ExecutorService. The shutdown() method will allow previously submitted tasks to execute before terminating, while the shutdownNow() method prevents waiting tasks from starting and attempts to stop currently executing tasks.

How to stop thread in ExecutorService java?

When using an Executor, we can shut it down by calling the shutdown() or shutdownNow() methods. Although, it won't wait until all threads stop executing. Waiting for existing threads to complete their execution can be achieved by using the awaitTermination() method.

How to stop Future task in java?

The Future. cancel() API is very useful in cancelling a task that has either not started yet, or has started but not completed. A completed task can't be cancelled in any way.


2 Answers

You're right that there's a race there. FutureTask#done() will be called at most once, so if the task has already been canceled before it was ever run via RunnableFuture#run(), you'll have missed the call to FutureTask#done().

Have you considered a simpler approach that always issues a symmetric set of paired calls to ITaskStatusHandler#taskRunning() and ITaskStatusHandler#taskCompleted(), like so?

@Override
public void run() {
  statusHandler.TaskRunning(this);
  try {
    super.run();
  finally {
    statusHandler.TaskCompleted(this);
  }
}

Once RunnableFuture#run() is called, it's true that your task in running, or at least trying to be run. Once FutureTask#run() is done, your task is no longer running. It so happens that in the case of cancellation, the transition is (nearly) immediate.

Trying to avoid calling ITaskStatusHandler#taskRunning() if the inner Callable or Runnable is never invoked by FutureTask#run() will require you to establish some shared structure between the Callable or Runnable and the FutureTask-derived type itself, so that when your inner function is first called you set some flag that the outer FutureTask-derived type can observe as a latch, indicating that yes, the function did start running before having been canceled. However, by then, you had to have committed to calling ITaskStatusHandler#taskRunning(), so the distinction isn't so useful.

I've been struggling with a similar design problem lately, and wound up settling on the symmetric before and after operations in my overridden FutureTask#run() method.

like image 166
seh Avatar answered Nov 13 '22 21:11

seh


Your problem is that your future tasks are still executed after you have called cancel on them, right?

After a task was submitted to the executor service it should be managed by the executor. (You can still cancel a single task if you like.) You should cancel the execution with the executor shutdownNow method. (This will call the cancel method of all submitted tasks.) An shutdown would still execute all submitted tasks.

The executor does otherwise not "know" that a task was cancelled. It will call the method independently of the internal state of the future task.

The easiest approach would be to use the Executor framework as it is and write an Callable decorator.

class CallableDecorator{

  CallableDecorator(Decorated decorated){
     ...
  }

  setTask(FutureTask task){
    statusHandler.taskCreated(task);
  }

  void call(){
     try{
       statusHandler.taskRunning(task);
       decorated.call();
     }finally{
       statusHandler.taskCompleted(task);
     }
  }
}

The only problem is that task cannot be in the constructor of the decorator. (It's a parameter of the future task constructor.) To break this cycle you have to use a setter or some proxy work around with constructor injection. Maybe this is not needed for the callback at all and you can say: statusHandler.callableStarted(decorated).

Depending on your requirements you may have to signal exceptions and interrupts.

Basic implementation:

class CallableDecorator<T> implements Callable<T> {

    private final Callable<T> decorated;
    CallableDecorator(Callable<T> decorated){
        this.decorated = decorated;
    }

    @Override public T call() throws Exception {
        out.println("before " + currentThread());
        try {
            return decorated.call();
        }catch(InterruptedException e){
            out.println("interupted " + currentThread());
            throw e;
        }
        finally {
            out.println("after " + currentThread());
        }
    }
}

ExecutorService executor = newFixedThreadPool(1);
Future<Long> normal = executor.submit(new CallableDecorator<Long>(
        new Callable<Long>() {
            @Override
            public Long call() throws Exception {
                return System.currentTimeMillis();
            }
        }));
out.println(normal.get());

Future<Long> blocking = executor.submit(new CallableDecorator<Long>(
        new Callable<Long>() {
            @Override
            public Long call() throws Exception {
                sleep(MINUTES.toMillis(2)); // blocking call
                return null;
            }
        }));

sleep(SECONDS.toMillis(1));
blocking.cancel(true); // or executor.shutdownNow();

Output:

before Thread[pool-1-thread-1,5,main]
after Thread[pool-1-thread-1,5,main]
1259347519104
before Thread[pool-1-thread-1,5,main]
interupted Thread[pool-1-thread-1,5,main]
after Thread[pool-1-thread-1,5,main]
like image 35
Thomas Jung Avatar answered Nov 13 '22 20:11

Thomas Jung