How to implement "partition" operation on Java 8 Stream? By partition I mean, divide a stream into sub-streams of a given size. Somehow it will be identical to Guava Iterators.partition() method, just it's desirable that the partitions are lazily-evaluated Streams rather than List's.
First, we create a result map with two entries, one for each partition. The values are LinkedHashMap s so that insertion order is preserved. Then, we create a HashSet from the list, so that invoking set. contains(k) is a O(1) operation (otherwise, if we did list.
Collectors partitioningBy() method is a predefined method of java. util. stream. Collectors class which is used to partition a stream of objects(or a set of elements) based on a given predicate. There are two overloaded variants of the method that are present.
No storage. Streams don't have storage for values; they carry values from a source (which could be a data structure, a generating function, an I/O channel, etc) through a pipeline of computational steps.
The partitioningBy() method always returns a Map with two entries - one for where the Predicate is true , and one for when it's false .
It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.
However it is possible to create the stream of partitions from the random access List
. Such feature is available, for example, in my StreamEx
library:
List<Type> input = Arrays.asList(...); Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
Or if you really want the stream of streams:
Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
If you don't want to depend on third-party libraries, you can implement such ofSubLists
method manually:
public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) { if (length <= 0) throw new IllegalArgumentException("length = " + length); int size = source.size(); if (size <= 0) return Stream.empty(); int fullChunks = (size - 1) / length; return IntStream.range(0, fullChunks + 1).mapToObj( n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); }
This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.
If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):
public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, Collector<List<T>, A, R> downstream) { class Acc { List<T> cur = new ArrayList<>(); A acc = downstream.supplier().get(); } BiConsumer<Acc, T> accumulator = (acc, t) -> { acc.cur.add(t); if(acc.cur.size() == batchSize) { downstream.accumulator().accept(acc.acc, acc.cur); acc.cur = new ArrayList<>(); } }; return Collector.of(Acc::new, accumulator, (acc1, acc2) -> { acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc); for(T t : acc2.cur) accumulator.accept(acc1, t); return acc1; }, acc -> { if(!acc.cur.isEmpty()) downstream.accumulator().accept(acc.acc, acc.cur); return downstream.finisher().apply(acc.acc); }, Collector.Characteristics.UNORDERED); }
Usage example:
List<List<Integer>> list = IntStream.range(0,20) .boxed().parallel() .collect(unorderedBatches(3, Collectors.toList()));
Result:
[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
Such collector is perfectly thread-safe and produces ordered batches for sequential stream.
If you want to apply an intermediate transformation for every batch, you may use the following version:
public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize, Collector<T, AA, B> batchCollector, Collector<B, A, R> downstream) { return unorderedBatches(batchSize, Collectors.mapping(list -> list.stream().collect(batchCollector), downstream)); }
For example, this way you can sum the numbers in every batch on the fly:
List<Integer> list = IntStream.range(0,20) .boxed().parallel() .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), Collectors.toList()));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With