Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make ScheduledThreadPool report errors?

After a painful debugging experience, I tracked down this issue: ScheduledThreadPool does not report if a task fails, and does not execute a task that failed once ever again. Therefore, it is hard to keep track of the liveness of periodic jobs, short of checking them with yet other periodic tasks (via dead man's switch or the ScheduledFuture).

Now we can hand a ScheduledThreadPool an UncaughtExceptionHandler, but not even that seems to work:

import java.util.concurrent.*;

class Test {
  public static void main(String[] args) {
    final ThreadFactory tf = new ThreadFactory() {
      private final ThreadFactory delegate = Executors.defaultThreadFactory();

      @Override public Thread newThread(final Runnable r) {
        final Thread res = delegate.newThread(r);
        res.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
          @Override
          public void uncaughtException(final Thread t, final Throwable e) {
            e.printStackTrace();
          }
        });
        return res;
      }
    };
    final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, tf);

    final Runnable task = new Runnable() {
      private int c = 0;

      @Override
      public void run() {
        if ( c++ == 5 ) {
          throw new ArrayIndexOutOfBoundsException("Runtime error!");
        }

        System.out.println("Reached " + c);
      }
    };

    exec.scheduleWithFixedDelay(task, 1, 1, TimeUnit.SECONDS);
  }
}

The output of this program is simply (Oracle Java SE (64-Bit Server) 1.7.0_06-b24)

Reached 1
Reached 2
Reached 3
Reached 4
Reached 5

and then it hangs (by design).

I can always try-catch the whole task, but that feels ugly; the UncaughtExceptionHandler should do that already!

Is there an API-solution for this issue? Did I do something wrong, or is it a bug?

like image 715
Raphael Avatar asked Aug 29 '12 10:08

Raphael


2 Answers

The currency thread pools capture all exceptions and place then in the Future object for you to inspect. UncaughtExceptionHandler is only for exception the thread doesn't catch and kills the thread, which in this case would only be for exception thrown by the thread pool code.

A simple way around this is to wrap your runnable.

public class ExceptionHandlingScheduledExecutor extends ScheduledThreadPoolExecutor {
    private final Thread.UncaughtExceptionHandler ueh;

    public ExceptionHandlingScheduledExecutor(int corePoolSize, Thread.UncaughtExceptionHandler ueh) {
        super(corePoolSize);
        this.ueh = ueh;
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return super.schedule(wrap(command), delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return super.schedule(wrap(callable), delay, unit); 
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return super.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return super.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(wrap(task), result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(wrap(task));
    }

    private Runnable wrap(final Runnable runnable) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    runnable.run();
                } catch (final Throwable t) {
                    ueh.uncaughtException(Thread.currentThread(), t);
                    throw t;
                }
            }
        };
    }

    private <T> Callable<T> wrap(final Callable<T> callable) {
        return new Callable<T>() {
            @Override
            public T call() throws Exception {
                try {
                    return callable.call();
                } catch (Throwable t) {
                    ueh.uncaughtException(Thread.currentThread(), t);
                    throw t;
                }
            }
        };
    }
}

You can sub-class the ThreadPoolExecutor to do this transparently.


You can also use a cached thread pool to handle exception but this is is more complicated.

One way to use the returned Future in a transparent way is to sub-class ScheduledThreadPoolExecutor (or any Executor, for that matter):

class MyScheduledExecutor extends ScheduledThreadPoolExecutor {
  private final Thread.UncaughtExceptionHandler ueh;
  private final ExecutorService futureService = Executors.newCachedThreadPool();

  public MyScheduledExecutor(int corePoolSize, Thread.UncaughtExceptionHandler ueh) {
    super(corePoolSize);
    this.ueh = ueh;
  }

  // Copy other constructors

  @Override
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                   long initialDelay,
                                                   long delay,
                                                   TimeUnit unit) {
    final ScheduledFuture<?> f = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    futureService.submit(new Runnable() {
      @Override
      public void run() {
        try {
          f.get();
        } catch (Throwable t ) {
          ueh.uncaughtException(null, t.getCause());
        }
      }
    };

    return f;
  }

  // Do similarly for other submit/schedule methods
}

And use it like this:

final ScheduledThreadPoolExecutor exec = new MyScheduledExecutor(1, new Thread.UncaughtExceptionHandler() {
      @Override
      public void uncaughtException(final Thread t, final Throwable e) {
        e.printStackTrace();
      }
    });

Now the output is as desired:

Reached 1
Reached 2
Reached 3
Reached 4
Reached 5
java.lang.ArrayIndexOutOfBoundsException: Runtime error!
   ...
like image 163
Peter Lawrey Avatar answered Oct 19 '22 12:10

Peter Lawrey


You can use VerboseRunnable class from jcabi-log, which does the wrapping suggested above:

import com.jcabi.log.VerboseRunnable;
Runnable runnable = new VerboseRunnable(
  Runnable() {
    public void run() { 
      // do business logic, may Exception occurs
    }
  },
  true // it means that all exceptions will be swallowed and logged
);

Now, when executor calls runnable.run() no exceptions are thrown. Instead, they are swallowed and logged (to SLF4J). Thus, the executor won't stop because of exception and you will see what's going on.

like image 41
yegor256 Avatar answered Oct 19 '22 13:10

yegor256