How can I write a method for combining a Stream
of Consumers
into a single Consumer
using Consumer.andThen(Consumer)
?
My first version was:
<T> Consumer<T> combine(Stream<Consumer<T>> consumers) {
return consumers
.filter(Objects::nonNull)
.reduce(Consumer::andThen)
.orElse(noOpConsumer());
}
<T> Consumer<T> noOpConsumer() {
return value -> { /* do nothing */ };
}
This version compiles with JavaC and Eclipse. But it is too specific: The Stream
cannot be a Stream<SpecialConsumer>
, and if the Consumers
are not exactly of type T
but a super type of it, it cannot be used:
Stream<? extends Consumer<? super Foo>> consumers = ... ;
combine(consumers);
That won't compile, rightfully. The improved version would be:
<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
return consumers
.filter(Objects::nonNull)
.reduce(Consumer::andThen)
.orElse(noOpConsumer());
}
But neither Eclipse nor JavaC compile that:
Eclipse (4.7.3a):
The type
Consumer
does not defineandThen(capture#7-of ? extends Consumer<? super T>, capture#7-of ? extends Consumer<? super T>)
that is applicable here
JavaC (1.8.0172):
error: incompatible types: invalid method reference
.reduce(Consumer::andThen)
incompatible types:Consumer<CAP#1>
cannot be converted toConsumer<? super CAP#2>
whereT
is a type-variable:T extends Object
declared in method<T>combine(Stream<? extends Consumer<? super T>>)
whereCAP#1
,CAP#2
are fresh type-variables:CAP#1 extends Object super: T from capture of ? super T
CAP#2 extends Object super: T from capture of ? super T
But it should work: Every subclass of Consumer can be used as a Consumer, too. And every Consumer of a super-type of X can consume Xs, too. I tried to add type parameters to each line of the stream version, but that won't help. But if I write it down with a traditional loop, it compiles:
<T> Consumer<T> combine(Collection<? extends Consumer<? super T>> consumers) {
Consumer<T> result = noOpConsumer()
for (Consumer<? super T> consumer : consumers) {
result = result.andThen(consumer);
}
return result;
}
(Filtering out the null values is left out for conciseness.)
Therefore, my question is: How can I convince JavaC and Eclipse that my Code is correct? Or, if it is not correct: Why is the loop-version correct but not the Stream
Version?
The forEach method accepts a Consumer as a parameter. The consumer can be simplified with a lambda expression or a method reference. In the example, we go over the elements of a list with forEach . The consumer simply prints each of the elements.
Consumer<T> is an in-built functional interface introduced in Java 8 in the java. util. function package. Consumer can be used in all contexts where an object needs to be consumed,i.e. taken as input, and some operation is to be performed on the object without returning any result.
Java 8 offers the possibility to create streams out of three primitive types: int, long and double. As Stream<T> is a generic interface, and there is no way to use primitives as a type parameter with generics, three new special interfaces were created: IntStream, LongStream, DoubleStream.
Introduced in Java 8, the Stream API is used to process collections of objects. A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.
You use a one-argument Stream.reduce(accumulator)
version that has the following signature:
Optional<T> reduce(BinaryOperator<T> accumulator);
The BinaryOperator<T> accumulator
can only accept elements of type T
, but you have:
<? extends Consumer<? super T>>
I propose you to use a three-argument version of the Stream.reduce(...)
method instead:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator
BinaryOperator<U> combiner);
The BiFunction<U, ? super T, U> accumulator
can accept parameters of two different types, has a less restrictive bound and is more suitable for your situation. A possible solution could be:
<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
return consumers.filter(Objects::nonNull)
.reduce(t -> {}, Consumer::andThen, Consumer::andThen);
}
The third argument BinaryOperator<U> combiner
is called only in the parallel streams, but anyway it would be wise to provide a correct implementation of it.
In addition, for a better understanding one could represent the above code as follows:
<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
Consumer<T> identity = t -> {};
BiFunction<Consumer<T>, Consumer<? super T>, Consumer<T>> acc = Consumer::andThen;
BinaryOperator<Consumer<T>> combiner = Consumer::andThen;
return consumers.filter(Objects::nonNull)
.reduce(identity, acc, combiner);
}
Now you can write:
Stream<? extends Consumer<? super Foo>> consumers = Stream.of();
combine(consumers);
You have forgotten a small thing in your method definition. It currently is:
<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {}
But you're returing Consumer<? super T>
. So by changing the return type it almost works. Now you accept an argument consumers
of type Stream<? extends Consumer<? super T>>
. Currently it doesn't work, because you're working with possibly different subclasses and implementations of Consumer<? super T>
(because of the upperbounded wildcard extends
). You can overcome this, by casting every ? extends Consumer<? super T>
in your Stream
to a simple Consumer<? super T>
. Like the following:
<T> Consumer<? super T> combine(Stream<? extends Consumer<? super T>> consumers) {
return consumers
.filter(Objects::nonNull)
.map(c -> (Consumer<? super T>) c)
.reduce(Consumer::andThen)
.orElse(noOpConsumer());
}
This should now work
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With