Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Add/extend behavior of Future created by ListeningExecutorService

Tags:

java

guava

The ultimate goal is to add extra behavior to ListenableFutures based on the type of the Callable/Runnable argument. I want to add extra behavior to each of the Future methods. (Example use cases can be found in AbstractExecutorService's javadoc and section 7.1.7 of Goetz's Java Concurrency in Practice)

I have an existing ExecutorService which overrides newTaskFor. It tests the argument's type and creates a subclass of FutureTask. This naturally supports submit as well as invokeAny and invokeAll.

How do I get the same effect for the ListenableFutures returned by a ListeningExecutorService?

Put another way, where can I put this code

if (callable instanceof SomeClass) {
   return new FutureTask<T>(callable) {
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("Canceling Task");
            return super.cancel(mayInterruptIfRunning);
        }
    };
} else {
    return new FutureTask<T>(callable);
}

such that my client can execute the println statement with

ListeningExecutorService executor = ...;
Collection<Callable> callables = ImmutableSet.of(new SomeClass());
List<Future<?>> futures = executor.invokeAll(callables);
for (Future<?> future : futures) {
    future.cancel(true);
}

Failed Solutions

Here's a list of things I've already tried and why they don't work.

Solution A

Pass MyExecutorService to MoreExecutors.listeningDecorator.

Problem 1: Unfortunately the resulting ListeningExecutorService (an AbstractListeningExecutorService) doesn't delegate to the ExecutorService methods, it delegates to the execute(Runnable) method on Executor. As a result, the newTaskFor method on MyExecutorService is never called.

Problem 2: AbstractListeningExecutorService creates the Runnable (a ListenableFutureTask) via static factory method which I can't extend.

Solution B

Inside newTaskFor, create MyRunnableFuture normally and then wrap it with a ListenableFutureTask.

Problem 1: ListenableFutureTask's factory methods don't accept RunnableFutures, they accept Runnable and Callable. If I pass MyRunnableFuture as a Runnable, the resulting ListenableFutureTask just calls run() and not any of the Future methods (where my behavior is).

Problem 2: Even if it did call my Future methods, MyRunnableFuture is not a Callable, so I have to supply a return value when I create the ListenableFutureTask... which I don't have... hence the Callable.

Solution C

Let MyRunnableFuture extend ListenableFutureTask instead of FutureTask

Problem: ListenableFutureTask is now final (as of r10 / r11).

Solution D

Let MyRunnableFuture extend ForwardingListenableFuture and implement RunnableFuture. Then wrap the SomeClass argument in a ListenableFutureTask and return that from delegate()

Problem: It hangs. I don't understand the problem well enough to explain it, but this configuration causes a deadlock in FutureTask.Sync .

Source Code: As requested, here's the source for Solution D which hangs:

import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.*;

/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceD extends ThreadPoolExecutor implements ListeningExecutorService {

    // ===== Test Harness =====

    private static interface SomeInterface {
        public String getName();
    }
    
    private static class SomeClass implements SomeInterface, Callable<Void>, Runnable {
        private final String name;

        private SomeClass(String name) {
            this.name = name;
        }

        public Void call() throws Exception {
            System.out.println("SomeClass.call");
            return null;
        }

        public void run() {
            System.out.println("SomeClass.run");
        }

        public String getName() {
            return name;
        }
    }

    private static class MyListener implements FutureCallback<Void> {
        public void onSuccess(Void result) {
            System.out.println("MyListener.onSuccess");
        }

        public void onFailure(Throwable t) {
            System.out.println("MyListener.onFailure");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Main.start");
        
        SomeClass someClass = new SomeClass("Main.someClass");
        
        ListeningExecutorService executor = new MyListeningExecutorServiceD();
        Collection<Callable<Void>> callables = ImmutableSet.<Callable<Void>>of(someClass);
        List<Future<Void>> futures = executor.invokeAll(callables);
        
        for (Future<Void> future : futures) {
            Futures.addCallback((ListenableFuture<Void>) future, new MyListener());
            future.cancel(true);
        }
        
        System.out.println("Main.done");
    }

    // ===== Implementation =====

    private static class MyRunnableFutureD<T> extends ForwardingListenableFuture<T> implements RunnableFuture<T> {

        private final ListenableFuture<T> delegate;
        private final SomeInterface someClass;

        private MyRunnableFutureD(SomeInterface someClass, Runnable runnable, T value) {
            assert someClass == runnable;
            this.delegate = ListenableFutureTask.create(runnable, value);
            this.someClass = someClass;
        }
        
        private MyRunnableFutureD(SomeClass someClass, Callable<T> callable) {
            assert someClass == callable;
            this.delegate = ListenableFutureTask.create(callable);
            this.someClass = someClass;
        }

        @Override
        protected ListenableFuture<T> delegate() {
            return delegate;
        }

        public void run() {
            System.out.println("MyRunnableFuture.run");
            try {
                delegate.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("MyRunnableFuture.cancel " + someClass.getName());
            return super.cancel(mayInterruptIfRunning);
        }
    }

    public MyListeningExecutorServiceD() {
        // Same as Executors.newSingleThreadExecutor for now
        super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        if (runnable instanceof SomeClass) {
            return new MyRunnableFutureD<T>((SomeClass) runnable, runnable, value);
        } else {
            return new FutureTask<T>(runnable, value);
        }
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof SomeClass) {
            return new MyRunnableFutureD<T>((SomeClass) callable, callable);
        } else {
            return new FutureTask<T>(callable);
        }
    }

    /** Must override to supply co-variant return type */
    @Override
    public ListenableFuture<?> submit(Runnable task) {
        return (ListenableFuture<?>) super.submit(task);
    }

    /** Must override to supply co-variant return type */
    @Override
    public <T> ListenableFuture<T> submit(Runnable task, T result) {
        return (ListenableFuture<T>) super.submit(task, result);
    }

    /** Must override to supply co-variant return type */
    @Override
    public <T> ListenableFuture<T> submit(Callable<T> task) {
        return (ListenableFuture<T>) super.submit(task);
    }
}
like image 851
Drew Avatar asked Jan 19 '12 18:01

Drew


People also ask

How do I get a listeningexecutorservice listenablefuture?

The simplest way we can obtain a ListenableFuture is by submitting a task to a ListeningExecutorService (much like how we would use a normal ExecutorService to obtain a normal Future ): Notice how we use the MoreExecutors class to decorate our ExecutorService as a ListeningExecutorService.

What is the difference between lexecservice and listeningexecutor?

Notice that the lExecService here is the executor that is running our asynchronous task, while the listeningExecutor is the executor on which our listener is invoked. As seen above, we should always consider separating these two executors to avoid scenarios where our listeners and workers are competing for the same thread pool resources.

What is the Keepalive time of ExecutorService in Java?

The keepalive time is five milliseconds. And, there is a blocking queue to watch for tasks in the future. The ExecutorService in Java is a subinterface of the executor framework. It provides certain functions to manage the thread life cycle of an application. There is also a submit () method that can accept both runnable and callable objects.

How do I add a listener to a listenablefuture?

One way we can add a listener to a ListenableFuture is by registering a callback with Futures.addCallback (), providing us access to the result or exception when success or failure occurs: We can also add a listener by adding it directly to the ListenableFuture.


2 Answers

Based on this question and a couple others discussions I've had recently, I'm coming to the conclusion that RunnableFuture/FutureTask is inherently misleading: Clearly you submit a Runnable, and clearly you get a Future back, and clearly the underlying Thread needs a Runnable. But why should a class implement both Runnable and Future? And if it does, which Runnable is it replacing? That's bad enough already, but then we introduce multiple levels of executors, and things really get out of hand.

If there's a solution here, I think it's going to require treating FutureTask as an implementation detail of AbstractExecutorService. I'd focus instead on splitting the problem into two pieces:

  • I want to conditionally modify the returned Future.
  • I want to conditionally modify the code run by the executor service. (I'm actually not sure whether this is a requirement here, but I'll cover it in case it is. Even if not, it may help establish the Runnable/Future distinction.)

(grumble Markdown grumble)

class MyWrapperExecutor extends ForwardingListeningExecutorService {
  private final ExecutorService delegateExecutor;

  @Override public <T> ListenableFuture<T> submit(Callable<T> task) {
    if (callable instanceof SomeClass) {
      // Modify and submit Callable (or just submit the original Callable):
      ListenableFuture<T> delegateFuture =
          delegateExecutor.submit(new MyCallable(callable));
      // Modify Future:
      return new MyWrapperFuture<T>(delegateFuture);
    } else {
      return delegateExecutor.submit(callable);
    }
  }

  // etc.
}

Could that work?

like image 58
Chris Povirk Avatar answered Nov 13 '22 05:11

Chris Povirk


According to ListeningExecutorService Javadoc, you can use MoreExecutors.listeningDecorator to decorate your own ExecutorService.

So use your ExecutorService that overrides newTaskFor, and wrap it with the method above. Would that work for you?

UPDATE

Ok, this is what I would do:

1) Download Guava sources if you haven't already.

2) Don't use listeningDecorator, instead, make your custom ExecutorService implement ListeningExecutorService.

3) Your subclass of FutureTask should implement ListenableFuture, and copy the code from ListenableFutureTask, which is quite simple, and then add your cancel method override.

4) Implement the methods of ListeningExecutorService on your custom ExecutorService by changing the returning method of the existing methods to ListenableFuture.

like image 40
Luciano Avatar answered Nov 13 '22 06:11

Luciano