I've stream of strings and nulls like
Stream<String> str1 = Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);
I want to reduce it to another stream, where any sequence of not null string joined together, ie like
Stream<String> str2 = Stream.of("ABC", "", "D", "EF","G")
First way, that i found - create collector that firstly reduce complete input stream to single object with list of all joined strings and then create new stream from it:
class Acc1 {
final private List<String> data = new ArrayList<>();
final private StringBuilder sb = new StringBuilder();
private void accept(final String s) {
if (s != null)
sb.append(s);
else {
data.add(sb.toString());
sb.setLength(0);
}
}
public static Collector<String,Acc1,Stream<String>> collector() {
return Collector.of(Acc1::new, Acc1::accept, (a,b)-> a, acc -> acc.data.stream());
}
}
...
Stream<String> str2 = str.collect(Acc1.collector());
But in this case before any use if str2, even as str2.findFirst(), input stream will be completely processed. It time and memory consuming operation and on infinity stream from some generator it will not work at all
Another way - create external object that will keep intermediate state and use it in flatMap():
class Acc2 {
final private StringBuilder sb = new StringBuilder();
Stream<String> accept(final String s) {
if (s != null) {
sb.append(s);
return Stream.empty();
} else {
final String result = sb.toString();
sb.setLength(0);
return Stream.of(result);
}
}
}
...
Acc2 acc = new Acc2();
Stream<String> str2 = str1.flatMap(acc::accept);
In this case from str1 will be retrieved only elemets that really accessed via str2.
But using of external object, created outside of stream processing, looks ugly for me and probably can cause some side effects, that i do not see now. Also if str2 will be used later with parallelStream() it will cause unpredictable result.
Is there any more correct implemetation of stream->stream reduction without these flaws?
Reduction or its mutable variant, collect
, is always an operation that will process all items. Your operation can be implemented via a custom Spliterator
, e.g.
public static Stream<String> joinGroups(Stream<String> s) {
Spliterator<String> sp=s.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<String>(sp.estimateSize(),
sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
private StringBuilder sb = new StringBuilder();
private String last;
public boolean tryAdvance(Consumer<? super String> action) {
if(!sp.tryAdvance(str -> last=str))
return false;
while(last!=null) {
sb.append(last);
if(!sp.tryAdvance(str -> last=str)) break;
}
action.accept(sb.toString());
sb=new StringBuilder();
return true;
}
}, false);
}
which produces the intended groups, as you can test with
joinGroups(Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null))
.forEach(System.out::println);
but also has the desired lazy behavior, testable via
joinGroups(
Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null)
.peek(str -> System.out.println("consumed "+str))
).skip(1).filter(s->!s.isEmpty()).findFirst().ifPresent(System.out::println);
After a second thought, I came to this slightly more efficient variant. It will incorporate the StringBuilder
only if there are at least two String
s to join, otherwise, it will simply use the already existing sole String
instance or the literal ""
string for empty groups:
public static Stream<String> joinGroups(Stream<String> s) {
Spliterator<String> sp=s.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<String>(sp.estimateSize(),
sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
private String next;
public boolean tryAdvance(Consumer<? super String> action) {
if(!sp.tryAdvance(str -> next=str))
return false;
String string=next;
if(string==null) string="";
else if(sp.tryAdvance(str -> next=str) && next!=null) {
StringBuilder sb=new StringBuilder().append(string);
do sb.append(next);while(sp.tryAdvance(str -> next=str) && next!=null);
string=sb.toString();
}
action.accept(string);
return true;
}
}, false);
}
It's quite hard to implement such scenarios using standard Stream API. In my free StreamEx library I extended standard Stream interface with methods which allow to perform so-called "partial reduction" which is exactly what is necessary here:
StreamEx<String> str1 = StreamEx.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);
Stream<String> str2 = str1.collapse((a, b) -> a != null,
MoreCollectors.filtering(Objects::nonNull, Collectors.joining()));
str2.map(x -> '"'+x+'"').forEach(System.out::println);
Output:
"ABC"
""
"D"
"EF"
"G"
The StreamEx.collapse()
method performs a partial reduction of the stream using the supplied collector. The first argument is a predicate which applied to two adjacent original items and should return true if they must be reduced together. Here we just require that first of the pair is not null ((a, b) -> a != null
): this means that every group ends with null
and new group starts here. Now we need to join group letters together: this can be done by standard Collectors.joining()
collector. However we need also to filter out null
. We can do it using MoreCollectors.filtering
collector (actually the same collector will be available in Java 9 in Collectors class).
This implementation is completely lazy and quite friendly to parallel processing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With