Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partition a Java 8 Stream

How to implement "partition" operation on Java 8 Stream? By partition I mean, divide a stream into sub-streams of a given size. Somehow it will be identical to Guava Iterators.partition() method, just it's desirable that the partitions are lazily-evaluated Streams rather than List's.

like image 765
Trader001 Avatar asked Sep 07 '15 08:09

Trader001


People also ask

Can we partition map in Java?

First, we create a result map with two entries, one for each partition. The values are LinkedHashMap s so that insertion order is preserved. Then, we create a HashSet from the list, so that invoking set. contains(k) is a O(1) operation (otherwise, if we did list.

What is partitioningBy in Java?

Collectors partitioningBy() method is a predefined method of java. util. stream. Collectors class which is used to partition a stream of objects(or a set of elements) based on a given predicate. There are two overloaded variants of the method that are present.

Does Java 8 streams have limited storage?

No storage. Streams don't have storage for values; they carry values from a source (which could be a data structure, a generating function, an I/O channel, etc) through a pipeline of computational steps.

Which return type can be created by partitioning by () collector on stream of string elements?

The partitioningBy() method always returns a Map with two entries - one for where the Predicate is true , and one for when it's false .


1 Answers

It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.

However it is possible to create the stream of partitions from the random access List. Such feature is available, for example, in my StreamEx library:

List<Type> input = Arrays.asList(...);  Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize); 

Or if you really want the stream of streams:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream); 

If you don't want to depend on third-party libraries, you can implement such ofSubLists method manually:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {     if (length <= 0)         throw new IllegalArgumentException("length = " + length);     int size = source.size();     if (size <= 0)         return Stream.empty();     int fullChunks = (size - 1) / length;     return IntStream.range(0, fullChunks + 1).mapToObj(         n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); } 

This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.


If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize,                     Collector<List<T>, A, R> downstream) {     class Acc {         List<T> cur = new ArrayList<>();         A acc = downstream.supplier().get();     }     BiConsumer<Acc, T> accumulator = (acc, t) -> {         acc.cur.add(t);         if(acc.cur.size() == batchSize) {             downstream.accumulator().accept(acc.acc, acc.cur);             acc.cur = new ArrayList<>();         }     };     return Collector.of(Acc::new, accumulator,             (acc1, acc2) -> {                 acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);                 for(T t : acc2.cur) accumulator.accept(acc1, t);                 return acc1;             }, acc -> {                 if(!acc.cur.isEmpty())                     downstream.accumulator().accept(acc.acc, acc.cur);                 return downstream.finisher().apply(acc.acc);             }, Collector.Characteristics.UNORDERED); } 

Usage example:

List<List<Integer>> list = IntStream.range(0,20)                                     .boxed().parallel()                                     .collect(unorderedBatches(3, Collectors.toList())); 

Result:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]] 

Such collector is perfectly thread-safe and produces ordered batches for sequential stream.

If you want to apply an intermediate transformation for every batch, you may use the following version:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,         Collector<T, AA, B> batchCollector,         Collector<B, A, R> downstream) {     return unorderedBatches(batchSize,              Collectors.mapping(list -> list.stream().collect(batchCollector), downstream)); } 

For example, this way you can sum the numbers in every batch on the fly:

List<Integer> list = IntStream.range(0,20)         .boxed().parallel()         .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),              Collectors.toList())); 
like image 147
Tagir Valeev Avatar answered Oct 14 '22 06:10

Tagir Valeev