Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to reduce a stream of Futures in Java?

Assuming that I have given a Stream of Futures, which I want to reduce by invoking the Stream#reduce method. But I don't want to reduce the Futures itself, but the result of the Future (Future#get). The problem is, that the get method may throw an ExecutionException and does not provide a result in this case.

This is the reason why

Stream<Future<Integer>> stream = ...;
BinaryOperator<Integer> sum = (i1, i2) -> i1 + i2;  
stream.map(future -> future.get())
      .reduce(sum); // does not work, get needs to handle exceptions!

So, I have to catch the exceptions:

stream.map(future -> {
    Integer i = null;
    try {
        i = future.get();
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {}
    return i;
}).reduce(sum); 

But in this approach, I may get troubles because null values may appear.

So, to get rid of these, I would have to filter those out, where a ExecutionException appeared:

stream.filter(future -> {
    Integer i = null;
    try {
        i = future.get();
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    }
    return i != null;
})
.map(future -> {
    Integer i = null;
    try {
        i = future.get();
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    }
    return i;
}).reduce(sum);

I think, this code would work.. But I don't want to believe, that this is the only and the smartest way to reduce Futures.

Any ideas or suggestions?

like image 513
mrbela Avatar asked Jan 17 '20 15:01

mrbela


People also ask

How does reduce work in streams Java?

Reducing is the repeated process of combining all elements. reduce operation applies a binary operator to each element in the stream where the first argument to the operator is the return value of the previous application and second argument is the current stream element.

What happens if you don't close a stream in Java?

IO Resources. Under the hood, this method opens a FileChannel instance and then closes it upon stream closure. Therefore, if we forget to close the stream, the underlying channel will remain open and then we would end up with a resource leak.

Is it good to use streams in Java?

Java streams enable functional-style operations on streams of elements. A stream is an abstraction of a non-mutable collection of functions applied in some order to the data. A stream is not a collection where you can store elements.

What does stream of () method in Java?

Stream of(T t) returns a sequential Stream containing a single element. Syntax : static Stream of(T t) Parameters: This method accepts a mandatory parameter t which is the single element in the Stream. Return Value: Stream of(T t) returns a sequential Stream containing the single specified element.


2 Answers

You could extract the value from the future first, and then filter out null:

Integer result = stream
    .map(future -> {
        try {
          return future.get();
        } catch (InterruptedException | ExecutionException e) {
        }
        return null; })
    .filter(Objects::nonNull)
    .reduce(sum)
    .orElse(0);
like image 57
Benoit Avatar answered Sep 27 '22 22:09

Benoit


One of the ways to simplify it could be:

void reduceImpl(Stream<Future<Integer>> stream) {
    Optional<Integer> integerOptional = stream
            .map(this::transform)
            .filter(Objects::nonNull)
            .reduce(Integer::sum);
}

private Integer transform(Future<Integer> future) {
    try {
        return future.get();
    } catch (InterruptedException | ExecutionException e) {
        return null; // should ideally be handled properly
    }
}
like image 34
Naman Avatar answered Sep 27 '22 22:09

Naman