Does someone know of an open source BufferedIterator, where the next N elements are eagerly fetched on a background thread? Here is an implementation from a TechRepublic article, but I assume it has not been thoroughly tested.
Iterators.buffer(Iterator toBuffer, int bufferSize) would be a nice addition to Guava, has that been considered?
The linked implementation appears to have been written for Java 4 and can be simplified a little using guava and java.util.concurrent
:
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.Executors;
public abstract class Iterators2 {
public static <E> Iterator<E> buffer(final Iterator<E> source,
int capacity) {
return buffer(source, capacity, defaultExecutor);
}
public static <E> Iterator<E> buffer(final Iterator<E> source,
int capacity,
final ExecutorService exec) {
if (capacity <= 0) return source;
final BlockingQueue<E> queue = new ArrayBlockingQueue<E>(capacity);
// Temporary storage for an element we fetched but could not fit in the queue
final AtomicReference<E> overflow = new AtomicReference<E>();
final Runnable inserter = new Runnable() {
@SuppressWarnings("unchecked")
public void run() {
E next = (E) END_MARKER;
if (source.hasNext()) {
next = source.next();
// ArrayBlockingQueue does not allow nulls
if (next == null) next = (E) NULL_MARKER;
}
if (queue.offer(next)) {
// Keep buffering elements as long as we can
if (next != END_MARKER) exec.submit(this);
} else {
// Save the element. This also signals to the
// iterator that the inserter thread is blocked.
overflow.lazySet(next);
}
}
};
// Fetch the first element.
// The inserter will resubmit itself as necessary to fetch more elements.
exec.submit(inserter);
Iterator<E> iterator = new AbstractIterator<E>() {
protected E computeNext() {
try {
E next = queue.take();
E overflowElem = overflow.getAndSet(null);
if (overflowElem != null) {
// There is now a space in the queue
queue.put(overflowElem);
// Awaken the inserter thread
exec.submit(inserter);
}
if (next == END_MARKER) {
return endOfData();
} else if (next == NULL_MARKER) {
return null;
} else {
return next;
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return endOfData();
}
}
};
return iterator;
}
protected Iterators2() {
throw Throwables.propagate(new InstantiationException(Iterators2.class + " is a static class and cannot be instantiated"));
}
private static ExecutorService defaultExecutor =
java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory());
private static final Object END_MARKER = new Object();
private static final Object NULL_MARKER = new Object();
}
Note: the above implementation makes no attempt to handle exceptions in the source iterator (if one is thrown, the inserter task will terminate abruptly, leaving the calling thread deadlocked.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With