Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to do a lazy groupby, returning a stream, in java 8?

I have some large-ish text files that I want to process by grouping its lines.

I tried to use the new streaming features, like

return FileUtils.readLines(...) 
            .parallelStream()
            .map(...)
            .collect(groupingBy(pair -> pair[0]));

The problem is that, AFAIK, this generates a Map.

Is there any way to have high level code like the one above that generates, for example, a Stream of Entries?

UPDATE: What I'm looking for is something like python's itertools.groupby. My files are already sorted (by pair[0]), I just want to load the groups one by one.

I already have an iterative solution. I'm just wondering if there's a more declarative way to do that. Btw, using guava or another 3rd party library wouldn't be a big problem.

like image 341
José Ricardo Avatar asked Sep 03 '14 20:09

José Ricardo


People also ask

Are Java 8 streams lazy?

Streams are lazy because intermediate operations are not evaluated until terminal operation is invoked. Each intermediate operation creates a new stream, stores the provided operation/function and return the new stream. The pipeline accumulates these newly created streams.

Does Java 8 support streams?

Note that Java 8 added a new stream() method to the Collection interface. And we can create a stream from individual objects using Stream.

What are two types of streams in Java 8?

With Java 8, Collection interface has two methods to generate a Stream. stream() − Returns a sequential stream considering collection as its source. parallelStream() − Returns a parallel Stream considering collection as its source.

What would be a good way to describe stream in Java 8?

Introduced in Java 8, the Stream API is used to process collections of objects. A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.


2 Answers

The task you want to achieve is quite different from what grouping does. groupingBy does not rely on the order of the Stream’s elements but on the Map’s algorithm applied to the classifier Function’s result.

What you want is to fold adjacent items having a common property value into one List item. It is not even necessary to have the Stream sorted by that property as long as you can guaranty that all items having the same property value are clustered.

Maybe it is possible to formulate this task as a reduction but to me the resulting structure looks too complicated.

So, unless direct support for this feature gets added to the Streams, an iterator based approach looks most pragmatic to me:

class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> {
    static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
            Stream<? extends T> s, Function<? super T, ? extends G> f) {
        return StreamSupport.stream(new Folding<>(s.spliterator(), f), false);
    }
    private final Spliterator<? extends T> source;
    private final Function<? super T, ? extends G> pf;
    private final Consumer<T> c=this::addItem;
    private List<T> pending, result;
    private G pendingGroup, resultGroup;

    Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) {
        source=s;
        pf=f;
    }
    private void addItem(T item) {
        G group=pf.apply(item);
        if(pending==null) pending=new ArrayList<>();
        else if(!pending.isEmpty()) {
            if(!Objects.equals(group, pendingGroup)) {
                if(pending.size()==1)
                    result=Collections.singletonList(pending.remove(0));
                else {
                    result=pending;
                    pending=new ArrayList<>();
                }
                resultGroup=pendingGroup;
            }
        }
        pendingGroup=group;
        pending.add(item);
    }
    public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) {
        while(source.tryAdvance(c)) {
            if(result!=null) {
                action.accept(entry(resultGroup, result));
                result=null;
                return true;
            }
        }
        if(pending!=null) {
            action.accept(entry(pendingGroup, pending));
            pending=null;
            return true;
        }
        return false;
    }
    private Map.Entry<G,List<T>> entry(G g, List<T> l) {
        return new AbstractMap.SimpleImmutableEntry<>(g, l);
    }
    public int characteristics() { return 0; }
    public long estimateSize() { return Long.MAX_VALUE; }
    public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; }
}

The lazy nature of the resulting folded Stream can be best demonstrated by applying it to an infinite stream:

Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4)
       .filter(e -> e.getKey()>5)
       .findFirst().ifPresent(e -> System.out.println(e.getValue()));
like image 153
Holger Avatar answered Sep 22 '22 04:09

Holger


cyclops-react, I library a contribute to, offers both sharding and grouping funcitonality that might do what you want.

  ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...) )
             .groupedStatefullyWhile((batch,next) ->  batch.size()==0 ? true : next.equals(batch.get(0)));

The groupedStatefullyWhile operator allows elements to be grouped based on the current state of the batch. ReactiveSeq is a single threaded sequential Stream.

  Map<Key, Stream<Value> sharded = 
                  new LazyReact()
                 .fromCollection(FileUtils.readLines(...) )
                 .map(..)
                 .shard(shards, pair -> pair[0]);

This will create a LazyFutureStream (that implements java.util.stream.Stream), that will process the data in the file asynchronously and in parallel. It's lazy and won't start processing until data is pulled through.

The only caveat is that you need to define the shards beforehand. I.e. the 'shards' parameter above which is a Map of async.Queue's keyed by the key to the shard (possibly whatever pair[0] is?).

e.g.

Map<Integer,Queue<String>> shards;

There is a sharding example with video here and test code here

like image 28
John McClean Avatar answered Sep 23 '22 04:09

John McClean