Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a good way to extract chunks of data from a java 8 stream?

Tags:

I an ETL process I'm retrieving a lot of entities from a Spring Data Repository. I'm then using a parallel stream to map the entities to different ones. I can either use a consumer to store those new entities in another repository one by one or collect them into a List and store that in a single bulk operation. The first is costly while the later might exceed the available memory.

Is there a good way to collect a certain amount of elements in the stream (like limit does), consume that chunk, and keep on going in parallel until all elements are processed?

like image 420
Christoph Grimmer-Dietrich Avatar asked Aug 20 '14 15:08

Christoph Grimmer-Dietrich


People also ask

Does streams in Java 8 have limited storage?

No storage. Streams don't have storage for values; they carry values from a source (which could be a data structure, a generating function, an I/O channel, etc) through a pipeline of computational steps.

How do I print a List of objects from a Stream?

There are 3 ways to print the elements of a Stream in Java: forEach() println() with collect() peek()


1 Answers

My approach to bulk operations with chunking is to use a partitioning spliterator wrapper, and another wrapper which overrides the default splitting policy (arithmetic progression of batch sizes in increments of 1024) to simple fixed-batch splitting. Use it like this:

Stream<OriginalType> existingStream = ...; Stream<List<OriginalType>> partitioned = partition(existingStream, 100, 1); partitioned.forEach(chunk -> ... process the chunk ...); 

Here is the full code:

import java.util.ArrayList; import java.util.List; import java.util.Spliterator; import java.util.Spliterators.AbstractSpliterator; import java.util.function.Consumer; import java.util.stream.Stream; import java.util.stream.StreamSupport;  public class PartitioningSpliterator<E> extends AbstractSpliterator<List<E>> {   private final Spliterator<E> spliterator;   private final int partitionSize;    public PartitioningSpliterator(Spliterator<E> toWrap, int partitionSize) {     super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL);     if (partitionSize <= 0) throw new IllegalArgumentException(         "Partition size must be positive, but was " + partitionSize);     this.spliterator = toWrap;     this.partitionSize = partitionSize;   }    public static <E> Stream<List<E>> partition(Stream<E> in, int size) {     return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false);   }    public static <E> Stream<List<E>> partition(Stream<E> in, int size, int batchSize) {     return StreamSupport.stream(         new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false);   }    @Override public boolean tryAdvance(Consumer<? super List<E>> action) {     final ArrayList<E> partition = new ArrayList<>(partitionSize);     while (spliterator.tryAdvance(partition::add)             && partition.size() < partitionSize);     if (partition.isEmpty()) return false;     action.accept(partition);     return true;   }    @Override public long estimateSize() {     final long est = spliterator.estimateSize();     return est == Long.MAX_VALUE? est          : est / partitionSize + (est % partitionSize > 0? 1 : 0);   } } 

import static java.util.Spliterators.spliterator;  import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer;  public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {   private final int batchSize;   private final int characteristics;   private long est;    public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {     characteristics |= ORDERED;     if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;     this.characteristics = characteristics;     this.batchSize = batchSize;     this.est = est;   }   public FixedBatchSpliteratorBase(int characteristics, int batchSize) {     this(characteristics, batchSize, Long.MAX_VALUE);   }   public FixedBatchSpliteratorBase(int characteristics) {     this(characteristics, 64, Long.MAX_VALUE);   }    @Override public Spliterator<T> trySplit() {     final HoldingConsumer<T> holder = new HoldingConsumer<>();     if (!tryAdvance(holder)) return null;     final Object[] a = new Object[batchSize];     int j = 0;     do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));     if (est != Long.MAX_VALUE) est -= j;     return spliterator(a, 0, j, characteristics());   }   @Override public Comparator<? super T> getComparator() {     if (hasCharacteristics(SORTED)) return null;     throw new IllegalStateException();   }   @Override public long estimateSize() { return est; }   @Override public int characteristics() { return characteristics; }    static final class HoldingConsumer<T> implements Consumer<T> {     Object value;     @Override public void accept(T value) { this.value = value; }   } } 

import static java.util.stream.StreamSupport.stream;  import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream;  public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> {   private final Spliterator<T> spliterator;    public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize, long est) {     super(toWrap.characteristics(), batchSize, est);     this.spliterator = toWrap;   }   public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) {     this(toWrap, batchSize, toWrap.estimateSize());   }   public FixedBatchSpliterator(Spliterator<T> toWrap) {     this(toWrap, 64, toWrap.estimateSize());   }    public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) {     return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true);   }    public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) {     return new FixedBatchSpliterator<>(toWrap, batchSize);   }    @Override public boolean tryAdvance(Consumer<? super T> action) {     return spliterator.tryAdvance(action);   }   @Override public void forEachRemaining(Consumer<? super T> action) {     spliterator.forEachRemaining(action);   } } 
like image 116
Marko Topolnik Avatar answered Oct 27 '22 14:10

Marko Topolnik