Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Turning a ListenableFuture<Iterable<A>> to Iterable<ListenableFuture<B>> through split and run

I'm looking to find the best way to convert a ListenableFuture<Iterable<A>> into a sequence of individual ListenableFutures. This is the kind of method signature I'm looking for:

public <A, B> Iterable<ListenableFuture<B>> splitAndRun(
    final ListenableFuture<Iterable<A>> elements, 
    final Function<A, B> func, 
    final ListeningExecutorService executor
);

Clearly I could do with if I returned ListenableFuture<Iterable<ListenableFuture<B>>>, but I feel like I should be able to split and run this and maintain its asynchronicity.

Here's the code I have so far, but you will notice the nasty .get() at the end, which rather ruins things. Please excuse me if I've overcomplicated things.

public class CallableFunction<I, O> implements Callable<O>{
  private final I input;
  private final Function<I, O> func;

  public CallableFunction(I input, Function<I, O> func) {
    this.input = input;
    this.func = func;
  }

  @Override public O call() throws Exception {
    return func.apply(input);
  }
}

public <A, B> Iterable<ListenableFuture<B>> splitAndRun(
    final ListenableFuture<Iterable<A>> elements, 
    final Function<A, B> func, 
    final ListeningExecutorService executor
) throws InterruptedException, ExecutionException {
  return Futures.transform(elements, 
      new Function<Iterable<A>, Iterable<ListenableFuture<B>>>() {
    @Override
    public Iterable<ListenableFuture<B>> apply(Iterable<A> input) {
      return Iterables.transform(input, new Function<A, ListenableFuture<B>>() {
        @Override
        public ListenableFuture<B> apply(A a) {
          return executor.submit(new CallableFunction<A, B>(a, func));
        }
      });
    }
  }, executor).get();
}
like image 989
Ben Smith Avatar asked Feb 21 '13 10:02

Ben Smith


2 Answers

If you think of ListenableFuture as a box, in order to get something from that box you have to block the thread (call the get method).

Not sure that implementing splitAndRun method could benefit you in some way. You would still have one single asynchronous operation that returns ListenableFuture<Iterable<A>>.

Reverse operation of joining Iterable<ListenableFuture<A>> as a single ListenableFuture<Iterable<A>> could be useful, though. You might want to use it in case if you want to collect several asynchronous computation into a single one.

like image 154
Mairbek Khadikov Avatar answered Nov 15 '22 13:11

Mairbek Khadikov


(alternative to my original answer)

But what if the transformation is slow or if it might fail for some inputs but succeed for others? In that case, we want to transform each output individually. Plus, we want to ensure that the transformation happens only once. Our collection transformation methods do not make this guarantee. As a result, in your example code, every iteration over the output will submit new tasks to the executor, even though the previously submitted tasks may have already completed. For this reason, we recommend Iterables.transform and friends only when the transformation is lightweight. (Typically, if you're doing something heavyweight, your transformation function will throw a checked exception, which Function doesn't allow. Consider this a hint :) Your example, of course, happens not to trigger the hint.)

What does that all mean in code? Basically, we'll reverse the order of operations that I gave in my other answer. We'll convert from Future<Iterable<A>> to Iterable<Future<A>> first. Then we'll submit a separate task for each A to convert it to a B. For this latter step, we'll supply an Executor so that the transformation doesn't block some innocent thread. (We'll now need only Futures.transform, so I've static imported it.)

List<ListenableFuture<A>> individuals = newArrayList();
for (int i = 0; i < knownSize; i++) {
  final int index = i;
  individuals.add(transform(input, new Function<List<A>, A>() {
    @Override
    public A apply(List<A> values) {
      return values.get(index);
    }
  }));
}

List<ListenableFuture<B>> result = newArrayList();
for (ListenableFuture<A> original : individuals) {
  result.add(transform(original, function, executor));
}
return result;

That's the idea, anyway. But my implementation is dumb. We can easily do both steps at the same time:

List<ListenableFuture<B>> result = newArrayList();
for (int i = 0; i < knownSize; i++) {
  final int index = i;
  result.add(transform(input, new Function<List<A>, B>() {
    @Override
    public B apply(List<A> values) {
      return function.apply(values.get(index));
    }
  }, executor));
}
return result;

Because this makes n Futures.transform calls instead of 1 and because it uses a separate Executor, it's better than my other solution if the transformation is heavyweight and worse if it's lightweight. The other caveat remains: This will work only if you know how many outputs you'll have.

like image 37
Chris Povirk Avatar answered Nov 15 '22 11:11

Chris Povirk