Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an elegant way to process a stream in chunks?

My exact scenario is inserting data to database in batches, so I want to accumulate DOM objects then every 1000, flush them.

I implemented it by putting code in the accumulator to detect fullness then flush, but that seems wrong - the flush control should come from the caller.

I could convert the stream to a List then use subList in an iterative fashion, but that too seems clunky.

It there a neat way to take action every n elements then continue with the stream while only processing the stream once?

like image 342
Bohemian Avatar asked Dec 20 '14 19:12

Bohemian


People also ask

What is chunk stream?

Chunked transfer encoding is a streaming data transfer mechanism available in version 1.1 of the Hypertext Transfer Protocol (HTTP). In chunked transfer encoding, the data stream is divided into a series of non-overlapping "chunks". The chunks are sent out and received independently of one another.

Which method is used to sort the stream?

Java Stream sorted() Learn to use Stream sorted() method to sort the elements in a Stream by their natural order.

What is a sequential stream?

Sequential Streams are non-parallel streams that use a single thread to process the pipelining. Any stream operation without explicitly specified as parallel is treated as a sequential stream.


1 Answers

Elegance is in the eye of the beholder. If you don't mind using a stateful function in groupingBy, you can do this:

AtomicInteger counter = new AtomicInteger();  stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))     .values()     .forEach(database::flushChunk); 

This doesn't win any performance or memory usage points over your original solution because it will still materialize the entire stream before doing anything.

If you want to avoid materializing the list, stream API will not help you. You will have to get the stream's iterator or spliterator and do something like this:

Spliterator<Integer> split = stream.spliterator(); int chunkSize = 1000;  while(true) {     List<Integer> chunk = new ArrayList<>(size);     for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};     if (chunk.isEmpty()) break;     database.flushChunk(chunk); } 
like image 119
Misha Avatar answered Sep 23 '22 11:09

Misha