I have some large-ish text files that I want to process by grouping its lines.
I tried to use the new streaming features, like
return FileUtils.readLines(...)
.parallelStream()
.map(...)
.collect(groupingBy(pair -> pair[0]));
The problem is that, AFAIK, this generates a Map.
Is there any way to have high level code like the one above that generates, for example, a Stream of Entries?
UPDATE: What I'm looking for is something like python's itertools.groupby. My files are already sorted (by pair[0]), I just want to load the groups one by one.
I already have an iterative solution. I'm just wondering if there's a more declarative way to do that. Btw, using guava or another 3rd party library wouldn't be a big problem.
Streams are lazy because intermediate operations are not evaluated until terminal operation is invoked. Each intermediate operation creates a new stream, stores the provided operation/function and return the new stream. The pipeline accumulates these newly created streams.
Note that Java 8 added a new stream() method to the Collection interface. And we can create a stream from individual objects using Stream.
With Java 8, Collection interface has two methods to generate a Stream. stream() − Returns a sequential stream considering collection as its source. parallelStream() − Returns a parallel Stream considering collection as its source.
Introduced in Java 8, the Stream API is used to process collections of objects. A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.
The task you want to achieve is quite different from what grouping does. groupingBy
does not rely on the order of the Stream
’s elements but on the Map
’s algorithm applied to the classifier Function
’s result.
What you want is to fold adjacent items having a common property value into one List
item. It is not even necessary to have the Stream
sorted by that property as long as you can guaranty that all items having the same property value are clustered.
Maybe it is possible to formulate this task as a reduction but to me the resulting structure looks too complicated.
So, unless direct support for this feature gets added to the Stream
s, an iterator based approach looks most pragmatic to me:
class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> {
static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
Stream<? extends T> s, Function<? super T, ? extends G> f) {
return StreamSupport.stream(new Folding<>(s.spliterator(), f), false);
}
private final Spliterator<? extends T> source;
private final Function<? super T, ? extends G> pf;
private final Consumer<T> c=this::addItem;
private List<T> pending, result;
private G pendingGroup, resultGroup;
Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) {
source=s;
pf=f;
}
private void addItem(T item) {
G group=pf.apply(item);
if(pending==null) pending=new ArrayList<>();
else if(!pending.isEmpty()) {
if(!Objects.equals(group, pendingGroup)) {
if(pending.size()==1)
result=Collections.singletonList(pending.remove(0));
else {
result=pending;
pending=new ArrayList<>();
}
resultGroup=pendingGroup;
}
}
pendingGroup=group;
pending.add(item);
}
public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) {
while(source.tryAdvance(c)) {
if(result!=null) {
action.accept(entry(resultGroup, result));
result=null;
return true;
}
}
if(pending!=null) {
action.accept(entry(pendingGroup, pending));
pending=null;
return true;
}
return false;
}
private Map.Entry<G,List<T>> entry(G g, List<T> l) {
return new AbstractMap.SimpleImmutableEntry<>(g, l);
}
public int characteristics() { return 0; }
public long estimateSize() { return Long.MAX_VALUE; }
public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; }
}
The lazy nature of the resulting folded Stream
can be best demonstrated by applying it to an infinite stream:
Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4)
.filter(e -> e.getKey()>5)
.findFirst().ifPresent(e -> System.out.println(e.getValue()));
cyclops-react, I library a contribute to, offers both sharding and grouping funcitonality that might do what you want.
ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...) )
.groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0)));
The groupedStatefullyWhile operator allows elements to be grouped based on the current state of the batch. ReactiveSeq is a single threaded sequential Stream.
Map<Key, Stream<Value> sharded =
new LazyReact()
.fromCollection(FileUtils.readLines(...) )
.map(..)
.shard(shards, pair -> pair[0]);
This will create a LazyFutureStream (that implements java.util.stream.Stream), that will process the data in the file asynchronously and in parallel. It's lazy and won't start processing until data is pulled through.
The only caveat is that you need to define the shards beforehand. I.e. the 'shards' parameter above which is a Map of async.Queue's keyed by the key to the shard (possibly whatever pair[0] is?).
e.g.
Map<Integer,Queue<String>> shards;
There is a sharding example with video here and test code here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With