Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I create a general purpose paging spliterator?

Tags:

I would like to be able to process a java stream reading from a source that must be accessed in pages. As a first approach, I implemented a paging iterator that simply requested pages when the current page ran out of items and then used StreamSupport.stream(iterator, false) to get a stream handle over the iterator.

As I found that my pages are quite expensive to fetch, I would like to access pages by way of a parallel stream. At this point I discovered that the parallelism provided by my naive approach is non-existent due to the spliterator implementation that java provides directly from an iterator. Since I actually know quite a lot about the elements I'd like to traverse (I know total result count after requesting the first page, and the source supports an offset and limit) I think it should be possible to implement my own spliterator that achieves real concurrency (both in work done on the elements of a page AND the querying of a page).

I've been able to achieve the "work done on elements" concurrency quite easily, but in my initial implementation the querying of a page is only ever being done by the top-most spliterator and thus doesn't benefit from the division of work offered by the fork-join implementation.

How can I write a spliterator that achieves both of these goals?

For reference, I'll provide what I've done so far (I know that it doesn't divide up the queries appropriately).

   public final class PagingSourceSpliterator<T> implements Spliterator<T> {      public static final long DEFAULT_PAGE_SIZE = 100;      private Page<T> result;     private Iterator<T> results;     private boolean needsReset = false;     private final PageProducer<T> generator;     private long offset = 0L;     private long limit = DEFAULT_PAGE_SIZE;       public PagingSourceSpliterator(PageProducer<T> generator) {         this.generator = generator;     }      public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {         this.generator = generator;         this.limit = pageSize;     }       @Override     public boolean tryAdvance(Consumer<? super T> action) {          if (hasAnotherElement()) {             if (!results.hasNext()) {                 loadPageAndPrepareNextPaging();             }             if (results.hasNext()) {                 action.accept(results.next());                 return true;             }         }          return false;     }      @Override     public Spliterator<T> trySplit() {         // if we know there's another page, go ahead and hand off whatever         // remains of this spliterator as a new spliterator for other         // threads to work on, and then mark that next time something is         // requested from this spliterator it needs to be reset to the head         // of the next page         if (hasAnotherPage()) {             Spliterator<T> other = result.getPage().spliterator();             needsReset = true;             return other;         } else {             return null;         }      }      @Override     public long estimateSize() {         if(limit == 0) {             return 0;         }          ensureStateIsUpToDateEnoughToAnswerInquiries();         return result.getTotalResults();     }      @Override     public int characteristics() {         return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;     }      private boolean hasAnotherElement() {         ensureStateIsUpToDateEnoughToAnswerInquiries();         return isBound() && (results.hasNext() || hasAnotherPage());     }      private boolean hasAnotherPage() {         ensureStateIsUpToDateEnoughToAnswerInquiries();         return isBound() && (result.getTotalResults() > offset);     }      private boolean isBound() {         return Objects.nonNull(results) && Objects.nonNull(result);     }      private void ensureStateIsUpToDateEnoughToAnswerInquiries() {         ensureBound();         ensureResetIfNecessary();     }      private void ensureBound() {         if (!isBound()) {             loadPageAndPrepareNextPaging();         }     }      private void ensureResetIfNecessary() {         if(needsReset) {             loadPageAndPrepareNextPaging();             needsReset = false;         }     }      private void loadPageAndPrepareNextPaging() {         // keep track of the overall result so that we can reference the original list and total size         this.result = generator.apply(offset, limit);          // make sure that the iterator we use to traverse a single page removes         // results from the underlying list as we go so that we can simply pass         // off the list spliterator for the trySplit rather than constructing a         // new kind of spliterator for what remains.         this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {             @Override             public T next() {                 T next = super.next();                 this.remove();                 return next;             }         };          // update the paging for the next request and inquiries prior to the next request         // we use the page of the actual result set instead of the limit in case the limit         // was not respected exactly.         this.offset += result.getPage().size();     }      public static class DelegatingIterator<T> implements Iterator<T> {          private final Iterator<T> iterator;          public DelegatingIterator(Iterator<T> iterator) {             this.iterator = iterator;         }           @Override         public boolean hasNext() {             return iterator.hasNext();         }          @Override         public T next() {             return iterator.next();         }          @Override         public void remove() {             iterator.remove();         }          @Override         public void forEachRemaining(Consumer<? super T> action) {             iterator.forEachRemaining(action);         }     } } 

And the source of my pages:

public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {  } 

And a page:

public final class Page<T> {      private long totalResults;     private final List<T> page = new ArrayList<>();      public long getTotalResults() {         return totalResults;     }      public List<T> getPage() {         return page;     }      public Page setTotalResults(long totalResults) {         this.totalResults = totalResults;         return this;     }      public Page setPage(List<T> results) {         this.page.clear();         this.page.addAll(results);         return this;     }      @Override     public boolean equals(Object o) {         if (this == o) {             return true;         }         if (!(o instanceof Page)) {             return false;         }         Page<?> page1 = (Page<?>) o;         return totalResults == page1.totalResults && Objects.equals(page, page1.page);     }      @Override     public int hashCode() {         return Objects.hash(totalResults, page);     }  } 

And a sample of getting a stream with "slow" paging for testing

private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {      PageProducer<T> producer = (offset, limit) -> {          try {             Thread.sleep(5000L);         } catch (InterruptedException e) {             throw new RuntimeException(e);         }          int beginIndex = offset.intValue();         int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());         return new Page<T>().setTotalResults(things.size())                 .setPage(things.subList(beginIndex, endIndex));     };      return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true); } 
like image 644
RutledgePaulV Avatar asked Jun 30 '16 16:06

RutledgePaulV


People also ask

What is a Spliterator in Java?

Java Spliterator interface is an internal iterator that breaks the stream into the smaller parts. These smaller parts can be processed in parallel. In real life programming, we may never need to use Spliterator directly. Under normal operations, it will behave exactly same as Java Iterator.

What is the meaning of a Spliterator in Java 8?

Like Iterator and ListIterator, Spliterator is a Java Iterator, which is used to iterate elements one-by-one from a List implemented object. Some important points about Java Spliterator are: Java Spliterator is an interface in Java Collection API. Spliterator is introduced in Java 8 release in java.

What is the difference between iterator and Spliterator?

An Iterator is a simple representation of a series of elements that can be iterated over. A Spliterator can be used to split given element set into multiple sets so that we can perform some kind of operations/calculations on each set in different threads independently, possibly taking advantage of parallelism.

How do I create a custom SPL iterator?

Also, the custom Spliterator is created from a list of authors and traverses through it by holding the current position. Let’s discuss in more details the implementation of each method: tryAdvance – passes authors to the Consumer at the current index position and increments its position

What is the difference between default void foreachremaining and comparator getcomparator?

default void forEachRemaining (Consumer action) : performs the given action for each remaining element, sequentially in the current thread, until all elements have been processed or the action throws an exception. default Comparator getComparator () : if the spliterator’s source is SORTED by a Comparator, returns that Comparator.

What is SPL iterator in Java 8?

Overview The Spliterator interface, introduced in Java 8, can be used for traversing and partitioning sequences. It's a base utility for Streams, especially parallel ones. In this article, we'll cover its usage, characteristics, methods and how to create our own custom implementations.

What is trysplit () method in SPL iterator?

Spliterator trySplit () : if the spliterator can be partitioned, returns a Spliterator covering elements, that will, upon return from this method, not be covered by this Spliterator. 3. Java Spliterator Example 3.1. Spliterator characteristics () example


1 Answers

The main reason, your spliterator doesn’t get you closer to your goal, is that it tries to to split the pages, rather than the source element space. If you know the total number of elements and have a source allowing to fetch a page via offset and limit, the most natural form of spliterator is to encapsulate a range within these elements, e.g. via offset and limit or end. Then, splitting means just splitting that range, adapting your spliterator’s offset to the split position and creating a new spliterator representing the prefix, from “old offset” to the split position.

Before splitting:       this spliterator: offset=x, end=y After splitting:       this spliterator: offset=z, end=y   returned spliterator: offset=x, end=z  x <= z <= y 

Whereas in the best case, z is exactly in the middle between x and y, to produce balanced splits, but in our case, we will slightly adapt it to produce working sets being multiples of the page size.

This logic works without the need to fetch pages, so if you defer fetching pages up to the moment, the framework wants to start traversal, i.e. after the splitting, the fetch operations can run in parallel. The biggest obstacle is the fact that you need to fetch the first page in order to learn about the total number of elements. The solution below separates this first fetch from the rest, simplifying the implementation. Of course, it has to pass down the result of this first page fetch, which will be consumed on the first traversal (in the sequential case) or returned as the first split off prefix, accepting one unbalanced split at this point, but not having to deal with it anymore afterwards.

public class PagingSpliterator<T> implements Spliterator<T> {     public interface PageFetcher<T> {         List<T> fetchPage(long offset, long limit, LongConsumer totalSizeSink);     }     public static final long DEFAULT_PAGE_SIZE = 100;      public static <T> Stream<T> paged(PageFetcher<T> pageAccessor) {         return paged(pageAccessor, DEFAULT_PAGE_SIZE, false);     }     public static <T> Stream<T> paged(PageFetcher<T> pageAccessor,                                       long pageSize, boolean parallel) {         if(pageSize<=0) throw new IllegalArgumentException();         return StreamSupport.stream(() -> {             PagingSpliterator<T> pgSp                 = new PagingSpliterator<>(pageAccessor, 0, 0, pageSize);             pgSp.danglingFirstPage                 =spliterator(pageAccessor.fetchPage(0, pageSize, l -> pgSp.end=l));             return pgSp;         }, CHARACTERISTICS, parallel);     }     private static final int CHARACTERISTICS = IMMUTABLE|ORDERED|SIZED|SUBSIZED;      private final PageFetcher<T> supplier;     long start, end, pageSize;     Spliterator<T> currentPage, danglingFirstPage;      PagingSpliterator(PageFetcher<T> supplier,             long start, long end, long pageSize) {         this.supplier = supplier;         this.start    = start;         this.end      = end;         this.pageSize = pageSize;     }      public boolean tryAdvance(Consumer<? super T> action) {         for(;;) {             if(ensurePage().tryAdvance(action)) return true;             if(start>=end) return false;             currentPage=null;         }     }     public void forEachRemaining(Consumer<? super T> action) {         do {             ensurePage().forEachRemaining(action);             currentPage=null;         } while(start<end);     }     public Spliterator<T> trySplit() {         if(danglingFirstPage!=null) {             Spliterator<T> fp=danglingFirstPage;             danglingFirstPage=null;             start=fp.getExactSizeIfKnown();             return fp;         }         if(currentPage!=null)             return currentPage.trySplit();         if(end-start>pageSize) {             long mid=(start+end)>>>1;             mid=mid/pageSize*pageSize;             if(mid==start) mid+=pageSize;             return new PagingSpliterator<>(supplier, start, start=mid, pageSize);         }         return ensurePage().trySplit();     }     /**      * Fetch data immediately before traversing or sub-page splitting.      */     private Spliterator<T> ensurePage() {         if(danglingFirstPage!=null) {             Spliterator<T> fp=danglingFirstPage;             danglingFirstPage=null;             currentPage=fp;             start=fp.getExactSizeIfKnown();             return fp;         }         Spliterator<T> sp = currentPage;         if(sp==null) {             if(start>=end) return Spliterators.emptySpliterator();             sp = spliterator(supplier.fetchPage(                                  start, Math.min(end-start, pageSize), l->{}));             start += sp.getExactSizeIfKnown();             currentPage=sp;         }         return sp;     }     /**      * Ensure that the sub-spliterator provided by the List is compatible with      * ours, i.e. is {@code SIZED | SUBSIZED}. For standard List implementations,      * the spliterators are, so the costs of dumping into an intermediate array      * in the other case is irrelevant.      */     private static <E> Spliterator<E> spliterator(List<E> list) {         Spliterator<E> sp = list.spliterator();         if((sp.characteristics()&(SIZED|SUBSIZED))!=(SIZED|SUBSIZED))             sp=Spliterators.spliterator(                 StreamSupport.stream(sp, false).toArray(), IMMUTABLE | ORDERED);         return sp;     }     public long estimateSize() {         if(currentPage!=null) return currentPage.estimateSize();         return end-start;     }     public int characteristics() {         return CHARACTERISTICS;     } } 

It uses a specialized PageFetcher functional interface which can be implemented by invoking the accept method of the call-back with the resulting total size and returning a list of items. The paging spliterator will simply delegate to the list’s spliterator for traversal and in case the concurrency is significantly higher than the resulting number of pages, it may even benefit from splitting these page spliterators, which implies that random access lists, like ArrayList, are the preferred list type here.

Adapting your example code to

private static <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {     return PagingSpliterator.paged( (offset, limit, totalSizeSink) -> {         totalSizeSink.accept(things.size());         if(offset>things.size()) return Collections.emptyList();         int beginIndex = (int)offset;         assert beginIndex==offset;         int endIndex = Math.min(beginIndex+(int)limit, things.size());         System.out.printf("Page %6d-%6d:\t%s%n",                           beginIndex, endIndex, Thread.currentThread());         // artificial slowdown         LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));         return things.subList(beginIndex, endIndex);     }, pageSize, true); } 

you may test it like

List<Integer> samples=IntStream.range(0, 555_000).boxed().collect(Collectors.toList()); List<Integer> result =asSlowPagedSource(10_000, samples) .collect(Collectors.toList()); if(!samples.equals(result))     throw new AssertionError(); 

Given enough free CPU cores, it will demonstrate how the pages are fetched concurrently, hence unordered, while the result will correctly be in encounter order. You may also test the sub-page concurrency which applies when there are less pages:

Set<Thread> threads=ConcurrentHashMap.newKeySet(); List<Integer> samples=IntStream.range(0, 1_000_000).boxed().collect(Collectors.toList()); List<Integer> result=asSlowPagedSource(500_000, samples)     .peek(x -> threads.add(Thread.currentThread()))     .collect(Collectors.toList()); if(!samples.equals(result))     throw new AssertionError(); System.out.println("Concurrency: "+threads.size()); 
like image 114
Holger Avatar answered Oct 24 '22 14:10

Holger