Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to identify if cancelled ScheduledFuture is actually not cancelled?

I'm using a ScheduledExecutorService and submitting a task like that:

future = scheduledExecutorService.schedule(myRunnableTask, delay, timeunit)

However a certain event might occur after indefinite amount of time, which signals that this task is no longer needed. And so I need to cancel this task, and I am using

boolean cancelled = future.cancel(false) line.

After cancelling, I have to take different actions depending on whether the submitted runnable actually ran or not. And here lets first jump into Oracle Documentation and read what cancelled flag means:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html#cancel(boolean)

Returns: false if the task could not be cancelled, typically because it has already completed normally; true otherwise

That's all it says about the return value. Seems like the person who wrote this text line was uncertain about false return value here, but I think I can take it.

Lets now focus on the case, when it returns true. There are two possibilities here:

  1. The task was actually cancelled and runnable never ran.
  2. The runnable is in the process of running and thus cannot be cancelled. (unless I do some thread interrupting logic, which I don't really want to do)

I am okay with both cases occurring, but I want to KNOW which one actually occurred and take actions accordingly. If the runnable is in the process, then I am okay with it finishing it's job, I want to wait for it's completion and then do one thing. But if it was cancelled and never going to run at all, I want to do another thing.

Can you please recommend an approach to this? Am I missing something?

like image 275
Mikhail Valiev Avatar asked Apr 30 '19 14:04

Mikhail Valiev


2 Answers

You can accomplish your goal with the following:

  • A Set<Runnable> that keeps track of Runnables that have begun execution by the thread pool.
  • A Map<ScheduledFuture<?>, Runnable> that maps a ScheduledFuture<?> to its respective Runnable.
    • After scheduling the task, you should immediately add the ScheduledFuture and its respective Runnable to the Map.
    • If this insertion into the Map is performed atomically with scheduling the task itself, then you can avoid the edge case that the ScheduledFuture was never added to the Map even after it was cancelled.

I recommend changing your ScheduledExecutorService to a ScheduledThreadPoolExecutor, which will allow you to override its beforeExecute(Thread, Runnable) method; this method is invoked immediately before the task is run by the pool after it has already been assigned a thread that will execute the task.

When overriding this method, you can add the Runnable to your Set<Runnable>.

Then, when a ScheduledFuture is cancelled, you can call set.contains(map.get(future)), which tells you if the Runnable (that the ScheduledFuture maps to) was executed.


Note that your Set<Runnable> and Map<ScheduledFuture<?>, Runnable> implementations may have to be made thread-safe to avoid possible race conditions.

like image 107
Jacob G. Avatar answered Sep 28 '22 01:09

Jacob G.


I ended up writing something like this for this issue. Source code and some unit tests can be found at https://github.com/nuzayats/cancellabletaskexecutor

public class CancellableTaskExecutor {

    private final ScheduledExecutorService es;
    private final Logger log;

    /**
     * For a unit test to replicate a particular timing
     */
    private final Runnable hookBetweenCancels;

    public CancellableTaskExecutor(ScheduledExecutorService es, Logger log) {
        this(es, log, () -> {
            // nop
        });
    }

    // For unit tests
    CancellableTaskExecutor(ScheduledExecutorService es, Logger log, Runnable hookBetweenCancels) {
        this.es = es;
        this.log = log;
        this.hookBetweenCancels = hookBetweenCancels;
    }

    public Execution schedule(Runnable task, long delay, TimeUnit unit) {
        CancellableRunnable runnable = new CancellableRunnable(task);
        ScheduledFuture<?> future = es.schedule(runnable, delay, unit);
        return new Execution(future, runnable);
    }

    public class Execution {

        private final ScheduledFuture<?> future;
        private final CancellableRunnable runnable;

        private Execution(ScheduledFuture<?> future, CancellableRunnable runnable) {
            this.future = future;
            this.runnable = runnable;
        }

        /**
         * @return true when the task has been successfully cancelled and it's guaranteed that
         * the task won't get executed. otherwise false
         */
        public boolean cancel() {
            boolean cancelled = runnable.cancel();
            hookBetweenCancels.run();

            // the return value of this call is unreliable; see https://stackoverflow.com/q/55922874/3591946
            future.cancel(false);

            return cancelled;
        }
    }

    private class CancellableRunnable implements Runnable {

        private final AtomicBoolean cancelledOrStarted = new AtomicBoolean();
        private final Runnable task;

        private CancellableRunnable(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            if (!cancelledOrStarted.compareAndSet(false, true)) {
                return; // cancelled, forget about the task
            }
            try {
                task.run();
            } catch (Throwable e) {
                log.log(Level.WARNING, "Uncaught Exception", e);
            }
        }

        boolean cancel() {
            return cancelledOrStarted.compareAndSet(false, true);
        }
    }
}
like image 24
Kohei Nozaki Avatar answered Sep 28 '22 03:09

Kohei Nozaki