I an ETL process I'm retrieving a lot of entities from a Spring Data Repository. I'm then using a parallel stream to map the entities to different ones. I can either use a consumer to store those new entities in another repository one by one or collect them into a List and store that in a single bulk operation. The first is costly while the later might exceed the available memory.
Is there a good way to collect a certain amount of elements in the stream (like limit does), consume that chunk, and keep on going in parallel until all elements are processed?
No storage. Streams don't have storage for values; they carry values from a source (which could be a data structure, a generating function, an I/O channel, etc) through a pipeline of computational steps.
There are 3 ways to print the elements of a Stream in Java: forEach() println() with collect() peek()
My approach to bulk operations with chunking is to use a partitioning spliterator wrapper, and another wrapper which overrides the default splitting policy (arithmetic progression of batch sizes in increments of 1024) to simple fixed-batch splitting. Use it like this:
Stream<OriginalType> existingStream = ...; Stream<List<OriginalType>> partitioned = partition(existingStream, 100, 1); partitioned.forEach(chunk -> ... process the chunk ...);
Here is the full code:
import java.util.ArrayList; import java.util.List; import java.util.Spliterator; import java.util.Spliterators.AbstractSpliterator; import java.util.function.Consumer; import java.util.stream.Stream; import java.util.stream.StreamSupport; public class PartitioningSpliterator<E> extends AbstractSpliterator<List<E>> { private final Spliterator<E> spliterator; private final int partitionSize; public PartitioningSpliterator(Spliterator<E> toWrap, int partitionSize) { super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL); if (partitionSize <= 0) throw new IllegalArgumentException( "Partition size must be positive, but was " + partitionSize); this.spliterator = toWrap; this.partitionSize = partitionSize; } public static <E> Stream<List<E>> partition(Stream<E> in, int size) { return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false); } public static <E> Stream<List<E>> partition(Stream<E> in, int size, int batchSize) { return StreamSupport.stream( new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false); } @Override public boolean tryAdvance(Consumer<? super List<E>> action) { final ArrayList<E> partition = new ArrayList<>(partitionSize); while (spliterator.tryAdvance(partition::add) && partition.size() < partitionSize); if (partition.isEmpty()) return false; action.accept(partition); return true; } @Override public long estimateSize() { final long est = spliterator.estimateSize(); return est == Long.MAX_VALUE? est : est / partitionSize + (est % partitionSize > 0? 1 : 0); } }
import static java.util.Spliterators.spliterator; import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> { private final int batchSize; private final int characteristics; private long est; public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) { characteristics |= ORDERED; if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED; this.characteristics = characteristics; this.batchSize = batchSize; this.est = est; } public FixedBatchSpliteratorBase(int characteristics, int batchSize) { this(characteristics, batchSize, Long.MAX_VALUE); } public FixedBatchSpliteratorBase(int characteristics) { this(characteristics, 64, Long.MAX_VALUE); } @Override public Spliterator<T> trySplit() { final HoldingConsumer<T> holder = new HoldingConsumer<>(); if (!tryAdvance(holder)) return null; final Object[] a = new Object[batchSize]; int j = 0; do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder)); if (est != Long.MAX_VALUE) est -= j; return spliterator(a, 0, j, characteristics()); } @Override public Comparator<? super T> getComparator() { if (hasCharacteristics(SORTED)) return null; throw new IllegalStateException(); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return characteristics; } static final class HoldingConsumer<T> implements Consumer<T> { Object value; @Override public void accept(T value) { this.value = value; } } }
import static java.util.stream.StreamSupport.stream; import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream; public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> { private final Spliterator<T> spliterator; public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize, long est) { super(toWrap.characteristics(), batchSize, est); this.spliterator = toWrap; } public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) { this(toWrap, batchSize, toWrap.estimateSize()); } public FixedBatchSpliterator(Spliterator<T> toWrap) { this(toWrap, 64, toWrap.estimateSize()); } public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) { return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true); } public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) { return new FixedBatchSpliterator<>(toWrap, batchSize); } @Override public boolean tryAdvance(Consumer<? super T> action) { return spliterator.tryAdvance(action); } @Override public void forEachRemaining(Consumer<? super T> action) { spliterator.forEachRemaining(action); } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With