Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 Generics: Reducing a Stream of Consumers to a single Consumer

How can I write a method for combining a Stream of Consumers into a single Consumer using Consumer.andThen(Consumer)?

My first version was:

<T> Consumer<T> combine(Stream<Consumer<T>> consumers) {
    return consumers
            .filter(Objects::nonNull)
            .reduce(Consumer::andThen)
            .orElse(noOpConsumer());
}

<T> Consumer<T> noOpConsumer() {
    return value -> { /* do nothing */ };
}

This version compiles with JavaC and Eclipse. But it is too specific: The Stream cannot be a Stream<SpecialConsumer>, and if the Consumers are not exactly of type T but a super type of it, it cannot be used:

Stream<? extends Consumer<? super Foo>> consumers = ... ;
combine(consumers);

That won't compile, rightfully. The improved version would be:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers
            .filter(Objects::nonNull)
            .reduce(Consumer::andThen)
            .orElse(noOpConsumer());
}

But neither Eclipse nor JavaC compile that:
Eclipse (4.7.3a):

The type Consumer does not define andThen(capture#7-of ? extends Consumer<? super T>, capture#7-of ? extends Consumer<? super T>) that is applicable here

JavaC (1.8.0172):

error: incompatible types: invalid method reference
.reduce(Consumer::andThen)
incompatible types: Consumer<CAP#1> cannot be converted to Consumer<? super CAP#2>
where T is a type-variable:
T extends Object declared in method <T>combine(Stream<? extends Consumer<? super T>>)
where CAP#1,CAP#2 are fresh type-variables:
CAP#1 extends Object super: T from capture of ? super T
CAP#2 extends Object super: T from capture of ? super T

But it should work: Every subclass of Consumer can be used as a Consumer, too. And every Consumer of a super-type of X can consume Xs, too. I tried to add type parameters to each line of the stream version, but that won't help. But if I write it down with a traditional loop, it compiles:

<T> Consumer<T> combine(Collection<? extends Consumer<? super T>> consumers) {
    Consumer<T> result = noOpConsumer()
    for (Consumer<? super T> consumer : consumers) {
        result = result.andThen(consumer);
    }
    return result;
}

(Filtering out the null values is left out for conciseness.)

Therefore, my question is: How can I convince JavaC and Eclipse that my Code is correct? Or, if it is not correct: Why is the loop-version correct but not the Stream Version?

like image 545
user194860 Avatar asked Jun 18 '18 11:06

user194860


People also ask

What method can you use to apply a given instance of Consumer to all the elements of a collection?

The forEach method accepts a Consumer as a parameter. The consumer can be simplified with a lambda expression or a method reference. In the example, we go over the elements of a list with forEach . The consumer simply prints each of the elements.

What is Consumer <> in Java?

Consumer<T> is an in-built functional interface introduced in Java 8 in the java. util. function package. Consumer can be used in all contexts where an object needs to be consumed,i.e. taken as input, and some operation is to be performed on the object without returning any result.

How many types of streams are available in Java 8?

Java 8 offers the possibility to create streams out of three primitive types: int, long and double. As Stream<T> is a generic interface, and there is no way to use primitives as a type parameter with generics, three new special interfaces were created: IntStream, LongStream, DoubleStream.

What would be a good way of describing streams in Java 8?

Introduced in Java 8, the Stream API is used to process collections of objects. A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.


2 Answers

You use a one-argument Stream.reduce(accumulator) version that has the following signature:

Optional<T> reduce(BinaryOperator<T> accumulator);

The BinaryOperator<T> accumulator can only accept elements of type T, but you have:

<? extends Consumer<? super T>>

I propose you to use a three-argument version of the Stream.reduce(...) method instead:

<U> U reduce(U identity,
             BiFunction<U, ? super T, U> accumulator
             BinaryOperator<U> combiner);

The BiFunction<U, ? super T, U> accumulator can accept parameters of two different types, has a less restrictive bound and is more suitable for your situation. A possible solution could be:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers.filter(Objects::nonNull)
                    .reduce(t -> {}, Consumer::andThen, Consumer::andThen);
}

The third argument BinaryOperator<U> combiner is called only in the parallel streams, but anyway it would be wise to provide a correct implementation of it.

In addition, for a better understanding one could represent the above code as follows:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {

    Consumer<T> identity = t -> {};
    BiFunction<Consumer<T>, Consumer<? super T>, Consumer<T>> acc = Consumer::andThen;
    BinaryOperator<Consumer<T>> combiner = Consumer::andThen;

    return consumers.filter(Objects::nonNull)
                    .reduce(identity, acc, combiner);
}

Now you can write:

Stream<? extends Consumer<? super Foo>> consumers = Stream.of();
combine(consumers);
like image 101
Oleksandr Pyrohov Avatar answered Oct 20 '22 03:10

Oleksandr Pyrohov


You have forgotten a small thing in your method definition. It currently is:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {}

But you're returing Consumer<? super T>. So by changing the return type it almost works. Now you accept an argument consumers of type Stream<? extends Consumer<? super T>>. Currently it doesn't work, because you're working with possibly different subclasses and implementations of Consumer<? super T> (because of the upperbounded wildcard extends). You can overcome this, by casting every ? extends Consumer<? super T> in your Stream to a simple Consumer<? super T>. Like the following:

<T> Consumer<? super T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers
        .filter(Objects::nonNull)
        .map(c -> (Consumer<? super T>) c)
        .reduce(Consumer::andThen)
        .orElse(noOpConsumer());
}

This should now work

like image 1
Lino Avatar answered Oct 20 '22 04:10

Lino