I trying to read from a csv file, but due to its size, without loading it all into memory first.
The library I found for reading csv is opencsv
, which works really well, but only exposes two methods:
readAll()
and
readNext()
readAll
is out, as I don't want it all in memory at the same time, so I'd like to use lazily read from the file via readNext
. And ideally, i'd like to wrap up that reading via a stream.
The closest I've gotten was giving the readnext
method to a Stream.generate
construct,
Stream csvDataStream = Stream.generate(csvReader::readNext);
but this obviously has the massive draw back of throwing an error once the iterator underlying csvReader
is exhausted. I don't really want to wrap my entire program in a try/catch block because I'm using the language wrong. Is there a way to create a stream from something that only exposes a next
method?
Here's a ready-made implementation from my project. I have an abstract spliterator which handles splitting into fixed-size batches and allows for efficient parallelization of processing of any kind of I/O-based stream source:
import static java.util.Spliterators.spliterator;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
characteristics |= ORDERED;
if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
this.characteristics = characteristics;
this.batchSize = batchSize;
this.est = est;
}
public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
this(characteristics, batchSize, Long.MAX_VALUE);
}
public FixedBatchSpliteratorBase(int characteristics) {
this(characteristics, 64, Long.MAX_VALUE);
}
@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
And here's the opencsv spliterator based on it:
public class CsvSpliterator extends FixedBatchSpliteratorBase<String[]> {
private final CSVReader cr;
CsvSpliterator(CSVReader cr, int batchSize) {
super(NONNULL, batchSize);
if (cr == null) throw new NullPointerException("CSVReader is null");
this.cr = cr;
}
public CsvSpliterator(CSVReader cr) { this(cr, 100); }
@Override public void forEachRemaining(Consumer<? super String[]> action) {
if (action == null) throw new NullPointerException();
uncheckRun(() -> { for (String[] row; (row = cr.readNext()) != null;) action.accept(row); });
}
@Override public boolean tryAdvance(Consumer<? super String[]> action) {
if (action == null) throw new NullPointerException();
return uncheckCall(() -> {
final String[] row = cr.readNext();
if (row == null) return false;
action.accept(row);
return true;
});
}
}
where uncheckRun
and uncheckCall
are
public static <T> T uncheckCall(Callable<T> callable) {
try { return callable.call(); }
catch (Exception e) { return sneakyThrow(e); }
}
public static void uncheckRun(RunnableExc r) {
try { r.run(); } catch (Exception e) { sneakyThrow(e); }
}
public static <T> T sneakyThrow(Throwable e) {
return Util.<RuntimeException, T>sneakyThrow0(e);
}
@SuppressWarnings("unchecked")
private static <E extends Throwable, T> T sneakyThrow0(Throwable t) throws E { throw (E)t; }
Usage:
import static java.util.stream.StreamSupport.stream;
....
final CSVReader cr = new CSVReader(new InputStreamReader(yourInputStream), separator, '"');
return stream(new CsvSpliterator(cr), true).onClose(() -> uncheckRun(cr::close));
Implement a Spliterator
. You need only implement the tryAdvance
method with a nontrivial implementation; trySplit
can return null
, characteristics()
can return ORDERED, and estimateSize
can return Long.MAX_VALUE. Then call StreamSupport.stream(Spliterator)
to make a stream.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With