Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I use PriorityBlockingQueue with ListeningExecutorService?

Tags:

guava

Since Guava's ListeningExecutorService is implemented by wrapping an existing ExecutorService, it 'decorates' the task by intercepting the execute() method. That means that if I want to use a custom PriorityQueue on the underlying ExecutorService, my comparator "sees" the decorated task as a ListenableFutureTask object instead of the original.

Is there a way to get a hold of the task that it wraps? So that the queue's comparator can use the tasks weight to determine ordering?

like image 519
aim4min Avatar asked Sep 24 '13 18:09

aim4min


1 Answers

I assume that you're concerned with submit() rather than execute()? (See the bottom of my response.)

With a ListeningExecutorService from MoreExecutors.listeningDecorator (the kind of wrapper you refer to), you're out of luck. listeningDecorator, like most ExecutorService implementations, wraps any input to submit in a FutureTask. The normal solution to this problem is to implement AbstractExecutorService and override newTaskFor to return a custom object. That should work here, too. You'll basically be reimplementing listeningDecorator, which is a fairly trivial wrapper around AbstractListeningExecutorService, which is itself a fairly trivial wrapper around AbstractExecutorService.

There are two couple complications. (OK, there might be more. I admit that I haven't tested the approach I'm suggesting.)

  1. AbstractListeningExecutorService doesn't allow you to override newTaskFor. (Why? I can explain if you'd like to file a feature request.) As a result, you'll have to extend AbstractExecutorService directly, largely duplicating the (short) AbstractListeningExecutorService implementation.
  2. newTaskFor has to return a ListenableFuture that's also Comparable. The obvious choice for a ListenableFuture is ListenableFutureTask, but that class is final, so you can't make instances Comparable. The solution is to create a ListenableFutureTask and wrap it in a SimpleForwardingListenableFuture that implements Comparable.

Why do I assume you're dealing with submit() rather than execute()?

listeningDecorator(...).execute() doesn't wrap the input task, as shown by this test I just wrote:

public void testListeningDecorator_noWrapExecuteTask() {
  ExecutorService delegate = mock(ExecutorService.class);
  ListeningExecutorService service = listeningDecorator(delegate);
  Runnable task = new Runnable() {
    @Override
    public void run() {}
  };
  service.execute(task);
  verify(delegate).execute(task);
}
like image 169
Chris Povirk Avatar answered Oct 18 '22 09:10

Chris Povirk