I'm looking to find the best way to convert a ListenableFuture<Iterable<A>>
into a sequence of individual ListenableFutures
. This is the kind of method signature I'm looking for:
public <A, B> Iterable<ListenableFuture<B>> splitAndRun(
final ListenableFuture<Iterable<A>> elements,
final Function<A, B> func,
final ListeningExecutorService executor
);
Clearly I could do with if I returned ListenableFuture<Iterable<ListenableFuture<B>>>
, but I feel like I should be able to split and run this and maintain its asynchronicity.
Here's the code I have so far, but you will notice the nasty .get()
at the end, which rather ruins things. Please excuse me if I've overcomplicated things.
public class CallableFunction<I, O> implements Callable<O>{
private final I input;
private final Function<I, O> func;
public CallableFunction(I input, Function<I, O> func) {
this.input = input;
this.func = func;
}
@Override public O call() throws Exception {
return func.apply(input);
}
}
public <A, B> Iterable<ListenableFuture<B>> splitAndRun(
final ListenableFuture<Iterable<A>> elements,
final Function<A, B> func,
final ListeningExecutorService executor
) throws InterruptedException, ExecutionException {
return Futures.transform(elements,
new Function<Iterable<A>, Iterable<ListenableFuture<B>>>() {
@Override
public Iterable<ListenableFuture<B>> apply(Iterable<A> input) {
return Iterables.transform(input, new Function<A, ListenableFuture<B>>() {
@Override
public ListenableFuture<B> apply(A a) {
return executor.submit(new CallableFunction<A, B>(a, func));
}
});
}
}, executor).get();
}
If you think of ListenableFuture
as a box, in order to get something from that box you have to block the thread (call the get
method).
Not sure that implementing splitAndRun
method could benefit you in some way. You would still have one single asynchronous operation that returns ListenableFuture<Iterable<A>>
.
Reverse operation of joining Iterable<ListenableFuture<A>>
as a single ListenableFuture<Iterable<A>>
could be useful, though. You might want to use it in case if you want to collect several asynchronous computation into a single one.
(alternative to my original answer)
But what if the transformation is slow or if it might fail for some inputs but succeed for others? In that case, we want to transform each output individually. Plus, we want to ensure that the transformation happens only once. Our collection transformation methods do not make this guarantee. As a result, in your example code, every iteration over the output will submit new tasks to the executor, even though the previously submitted tasks may have already completed. For this reason, we recommend Iterables.transform
and friends only when the transformation is lightweight. (Typically, if you're doing something heavyweight, your transformation function will throw a checked exception, which Function
doesn't allow. Consider this a hint :) Your example, of course, happens not to trigger the hint.)
What does that all mean in code? Basically, we'll reverse the order of operations that I gave in my other answer. We'll convert from Future<Iterable<A>>
to Iterable<Future<A>>
first. Then we'll submit a separate task for each A
to convert it to a B
. For this latter step, we'll supply an Executor
so that the transformation doesn't block some innocent thread. (We'll now need only Futures.transform
, so I've static imported it.)
List<ListenableFuture<A>> individuals = newArrayList();
for (int i = 0; i < knownSize; i++) {
final int index = i;
individuals.add(transform(input, new Function<List<A>, A>() {
@Override
public A apply(List<A> values) {
return values.get(index);
}
}));
}
List<ListenableFuture<B>> result = newArrayList();
for (ListenableFuture<A> original : individuals) {
result.add(transform(original, function, executor));
}
return result;
That's the idea, anyway. But my implementation is dumb. We can easily do both steps at the same time:
List<ListenableFuture<B>> result = newArrayList();
for (int i = 0; i < knownSize; i++) {
final int index = i;
result.add(transform(input, new Function<List<A>, B>() {
@Override
public B apply(List<A> values) {
return function.apply(values.get(index));
}
}, executor));
}
return result;
Because this makes n Futures.transform
calls instead of 1 and because it uses a separate Executor
, it's better than my other solution if the transformation is heavyweight and worse if it's lightweight. The other caveat remains: This will work only if you know how many outputs you'll have.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With