Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BufferedIterator implementation

Does someone know of an open source BufferedIterator, where the next N elements are eagerly fetched on a background thread? Here is an implementation from a TechRepublic article, but I assume it has not been thoroughly tested.

Iterators.buffer(Iterator toBuffer, int bufferSize) would be a nice addition to Guava, has that been considered?

like image 513
Brian Harris Avatar asked Jan 27 '10 18:01

Brian Harris


1 Answers

The linked implementation appears to have been written for Java 4 and can be simplified a little using guava and java.util.concurrent:

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.Executors;

public abstract class Iterators2 {
    public static <E> Iterator<E> buffer(final Iterator<E>      source,
                                         int                    capacity) {
        return buffer(source, capacity, defaultExecutor);
    }

    public static <E> Iterator<E> buffer(final Iterator<E>      source,
                                         int                    capacity,
                                         final ExecutorService  exec) {
        if (capacity <= 0) return source;
        final BlockingQueue<E> queue = new ArrayBlockingQueue<E>(capacity);

        // Temporary storage for an element we fetched but could not fit in the queue
        final AtomicReference<E> overflow = new AtomicReference<E>();
        final Runnable inserter = new Runnable() {
            @SuppressWarnings("unchecked")
            public void run() {
                E next = (E) END_MARKER;
                if (source.hasNext()) {
                    next = source.next();
                    // ArrayBlockingQueue does not allow nulls
                    if (next == null) next = (E) NULL_MARKER;
                }
                if (queue.offer(next)) {
                    // Keep buffering elements as long as we can
                    if (next != END_MARKER) exec.submit(this);
                } else {
                    // Save the element.  This also signals to the
                    // iterator that the inserter thread is blocked.
                    overflow.lazySet(next);
                }
            }
        };
        // Fetch the first element.
        // The inserter will resubmit itself as necessary to fetch more elements.
        exec.submit(inserter);
        Iterator<E> iterator = new AbstractIterator<E>() {
            protected E computeNext() {
                try {
                    E next = queue.take();
                    E overflowElem = overflow.getAndSet(null);
                    if (overflowElem != null) {
                        // There is now a space in the queue
                        queue.put(overflowElem);
                        // Awaken the inserter thread
                        exec.submit(inserter);
                    }
                    if (next == END_MARKER) {
                        return endOfData();
                    } else if (next == NULL_MARKER) {
                        return null;
                    } else {
                        return next;
                    }
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    return endOfData();
                }
            }
        };

        return iterator;
    }

    protected Iterators2() {
        throw Throwables.propagate(new InstantiationException(Iterators2.class + " is a static class and cannot be instantiated"));
    }

    private static ExecutorService defaultExecutor =
        java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory());

    private static final Object END_MARKER = new Object();

    private static final Object NULL_MARKER = new Object();
}

Note: the above implementation makes no attempt to handle exceptions in the source iterator (if one is thrown, the inserter task will terminate abruptly, leaving the calling thread deadlocked.)

like image 111
finnw Avatar answered Sep 21 '22 18:09

finnw