Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can a stream be created from an object that has only exposed the "readNext" portion of an iterator?

Tags:

java

java-8

I trying to read from a csv file, but due to its size, without loading it all into memory first.

The library I found for reading csv is opencsv, which works really well, but only exposes two methods:

readAll() 

and

readNext() 

readAll is out, as I don't want it all in memory at the same time, so I'd like to use lazily read from the file via readNext. And ideally, i'd like to wrap up that reading via a stream.

The closest I've gotten was giving the readnext method to a Stream.generate construct,

Stream csvDataStream = Stream.generate(csvReader::readNext); 

but this obviously has the massive draw back of throwing an error once the iterator underlying csvReader is exhausted. I don't really want to wrap my entire program in a try/catch block because I'm using the language wrong. Is there a way to create a stream from something that only exposes a next method?

like image 790
user3308774 Avatar asked Jun 27 '14 18:06

user3308774


2 Answers

Here's a ready-made implementation from my project. I have an abstract spliterator which handles splitting into fixed-size batches and allows for efficient parallelization of processing of any kind of I/O-based stream source:

import static java.util.Spliterators.spliterator;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
    characteristics |= ORDERED;
    if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
    this.characteristics = characteristics;
    this.batchSize = batchSize;
    this.est = est;
  }
  public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
    this(characteristics, batchSize, Long.MAX_VALUE);
  }
  public FixedBatchSpliteratorBase(int characteristics) {
    this(characteristics, 64, Long.MAX_VALUE);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}

And here's the opencsv spliterator based on it:

public class CsvSpliterator extends FixedBatchSpliteratorBase<String[]> {
  private final CSVReader cr;

  CsvSpliterator(CSVReader cr, int batchSize) {
    super(NONNULL, batchSize);
    if (cr == null) throw new NullPointerException("CSVReader is null");
    this.cr = cr;
  }
  public CsvSpliterator(CSVReader cr) { this(cr, 100); }

  @Override public void forEachRemaining(Consumer<? super String[]> action) {
    if (action == null) throw new NullPointerException();
    uncheckRun(() -> { for (String[] row; (row = cr.readNext()) != null;) action.accept(row); });
  }
  @Override public boolean tryAdvance(Consumer<? super String[]> action) {
    if (action == null) throw new NullPointerException();
    return uncheckCall(() -> {
      final String[] row = cr.readNext();
      if (row == null) return false;
      action.accept(row);
      return true;
    });
  }
}

where uncheckRun and uncheckCall are

public static <T> T uncheckCall(Callable<T> callable) {
  try { return callable.call(); }
  catch (Exception e) { return sneakyThrow(e); }
}
public static void uncheckRun(RunnableExc r) {
  try { r.run(); } catch (Exception e) { sneakyThrow(e); }
}
public static <T> T sneakyThrow(Throwable e) {
  return Util.<RuntimeException, T>sneakyThrow0(e);
}
@SuppressWarnings("unchecked")
private static <E extends Throwable, T> T sneakyThrow0(Throwable t) throws E { throw (E)t; }

Usage:

import static java.util.stream.StreamSupport.stream;

....

final CSVReader cr = new CSVReader(new InputStreamReader(yourInputStream), separator, '"');
return stream(new CsvSpliterator(cr), true).onClose(() -> uncheckRun(cr::close));
like image 110
Marko Topolnik Avatar answered Nov 20 '22 01:11

Marko Topolnik


Implement a Spliterator. You need only implement the tryAdvance method with a nontrivial implementation; trySplit can return null, characteristics() can return ORDERED, and estimateSize can return Long.MAX_VALUE. Then call StreamSupport.stream(Spliterator) to make a stream.

like image 42
Brian Goetz Avatar answered Nov 20 '22 02:11

Brian Goetz