Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Future and ExecutorService, how to know when a cancelled task has terminated?

Suppose I have some code that start a task using an ExecutorService and then the invoker cancels it, by means of the Future returned by the submit() method:

execService = Executors.newSingleThreadExecutor ();
Future<String> result = execService.submit ( () -> {
  for ( ... ) {
    if ( Thread.interrupted() ) break;
    // Stuff that takes a while
  }
  return "result";
});
...
result.cancel ( true );
// I'm not sure the corresponding thread/task has finished and the call() method 
// above returned
// result.isDone() is immediately true. result.get() throws CancellationException

So, I can cancel the background task and make the executor ready to start again. However, I cannot do the latter before the interrupted task completes the interruption and the corresponding method returns.

Note that in the code above the main flow returns straight after result.cancel() and result.isCancelled() is true straight after that, at the same time, the parallel task might take still a while before checking Thread.interrupted() again and terminate. I need to be sure that the side task is completely finished before continuing in the main thread.

Moreover, note that the use of a single thread executor is accidental, apart from this simple example, I'd like to solve the problem for both the case of just one parallel thread and for the one where more than one thread are running.

What's the best way to be sure of that?

So far I've thought of either introducing a flag that is visible to both the task and the invoker, or shutting down the executor and waiting for that to complete. The latter solution might be inefficient if one wants to reuse the executor many other times, the former is better, but I'd like to know if there is another simpler or more canonical way.

like image 522
zakmck Avatar asked Nov 26 '22 23:11

zakmck


1 Answers

I also struggled with a similar problem, I ended up making this utility:

    private enum TaskState {
        READY_TO_START,
        RUNNING,
        DONE,
        MUST_NOT_START;
    }
    
    /**
     * Runs the given set of tasks ensuring that:
     * 
     * If one fails this will not return while tasks are running and once returned
     * no more tasks will start.
     * 
     * 
     * @param <V>
     * @param pool
     * @param tasksToRun
     */
    public <V> void runAndWait(ExecutorService pool, List<Callable<V>> tasksToRun) {
        
        // We use this to work around the fact that the future doesn't tell us if the task is actually terminated or not.
        List<AtomicReference<TaskState>> taskStates = new ArrayList<>();
        List<Future<V>> futures = new ArrayList<>();
        
        for(Callable<V> c : tasksToRun) {
            AtomicReference<TaskState> state = new AtomicReference<>(TaskState.READY_TO_START);
            futures.add(pool.submit(new Callable<V>() {

                @Override
                public V call() throws Exception {
                    if(state.compareAndSet(TaskState.READY_TO_START, TaskState.RUNNING)) {
                        try {
                            return c.call();
                        } finally {
                            state.set(TaskState.DONE);
                        }
                    } else {
                        throw new CancellationException();
                    }
                }
                
            }));
            taskStates.add(state);
        }
        int i = 0;
        try {
            
            // Wait for all tasks.
            for(; i < futures.size(); i++) {
                futures.get(i).get(7, TimeUnit.DAYS);
            }
        } catch(Throwable t) { 
            try {
                // If we have tasks left that means something failed.
                List<Throwable> exs = new ArrayList<>();
                final int startOfRemaining = i;
                
                // Try to stop all running tasks if any left.
                for(i = startOfRemaining; i < futures.size(); i++) {
                    try {
                        futures.get(i).cancel(true);
                    } catch (Throwable e) {
                        // Prevent exceptions from stopping us from
                        // canceling all tasks. Consider logging this.
                    }
                }
                
                // Stop the case that the task is started, but has not reached our compare and set statement. 
                taskStates.forEach(s -> s.compareAndSet(TaskState.READY_TO_START, TaskState.MUST_NOT_START));
                
                for(i = startOfRemaining; i < futures.size(); i++) {
                    try {
                        futures.get(i).get();
                    } catch (InterruptedException e) {
                        break; // I guess move on, does this make sense should we instead wait for our tasks to finish?
                    } catch (CancellationException e) {
                        // It was cancelled, it may still be running if it is we must wait for it.
                        while(taskStates.get(i).get() == TaskState.RUNNING) {
                            Thread.sleep(1);
                        }
                    } catch (Throwable t1) {
                        // Record the exception if it is interesting, so users can see why it failed.
                        exs.add(t1);
                    }
                }
                
                exs.forEach(t::addSuppressed);
            } finally {
                throw new RuntimeException(t);
            }
        }
    }

I ended up wrapping each task, in something so I could know if all tasks were done.

like image 89
Luke Avatar answered Nov 28 '22 11:11

Luke