Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reduce List<CompletableFuture<T>>

When ints is given:

List<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList());

With Java Stream API, we can reduce them

MyValue myvalue = ints
        .parallelStream()
        .map(x -> toMyValue(x))
        .reduce((t, t2) -> t.combine(t2))
        .get();

In this example, what important to me are...

  • items will be reduced in multiple threads
  • early mapped items will be reduced early
  • not all result of toMyValue() will be loaded at the same time

Now I want to do same processing by CompletableFuture API.

To do map, I did:

List<CompeletableFuture<MyValue>> myValueFutures = ints
        .stream()
        .map(x -> CompletableFuture.supplyAsync(() -> toMyValue(x), MY_THREAD_POOL))
        .collect(Collectors.toList());

And now I have no idea how to reduce List<CompeletableFuture<MyValue>> myValueFutures to get single MyValue.

Parallel stream provides convenient APIs but because of these problems I want not to use Stream API:

  • parallel stream is hard to stop the stage during processing.
  • parallel stream's active worker count can exceed the parallelism when some workers blocked by IO. This helps to maximize cpu utilization but memory overhead can occur(even OOM).

Any way to reduce CompetableFutures? one by one with out stream reduce api?

like image 347
Jihun No Avatar asked Jan 10 '21 12:01

Jihun No


1 Answers

It's interesting that in your initial example you already mention a method you have combine, while for CompletableFuture there is a dedicated method, just for that : thenCombine (and its two brothers thenCombineAsync).

So considering that you have something like:

static class MyValue {
    final int x;

    MyValue(int x) {
        this.x = x;
    }

    MyValue combine(MyValue v){
        return new MyValue(this.x + v .x);
    }
}

static MyValue toMyValue(int x) {
    return new MyValue(x);
}

And:

List<CompletableFuture<Integer>> list = 
    IntStream.range(0, 4)
             .mapToObj(x -> supplyAsync(() -> x))
             .collect(Collectors.toList());

You could use one of the thenCombine methods and achieve what you want via:

MyValue value =
    list.stream()
        .map(x -> x.thenApply(YourClass::toMyValue))
        .reduce((left, right) -> left.thenCombine(right, MyValue::combine))
        .orElse(CompletableFuture.completedFuture(new MyValue(0)))
        .join();

If you want to execute the combine action in a predictable thread, you need a pool for that, or an overloaded method, like:

.reduce((left, right) -> left.thenCombineAsync(right, MyValue::combine, MY_THREAD_POOL))
like image 66
Eugene Avatar answered Sep 25 '22 17:09

Eugene