Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous Iterator

I have the following code:

while(slowIterator.hasNext()) {
  performLengthTask(slowIterator.next());
}

Because both iterator and task are slow it makes sense to put those into separate threads. Here is a quick and dirty attempt for an Iterator wrapper:

class AsyncIterator<T> implements Iterator<T> {
    private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);

    private AsyncIterator(final Iterator<T> delegate) {
      new Thread() {
        @Override
        public void run() {
          while(delegate.hasNext()) {
            queue.put(delegate.next()); // try/catch removed for brevity
          }
        }
      }.start();
    }

    @Override
    public boolean hasNext() {
      return true;
    }

    @Override
    public T next() {
        return queue.take(); // try/catch removed for brevity
    }
    // ... remove() throws UnsupportedOperationException
  }

However this implementation lacks support for "hasNext()". It would be ok of course for the hasNext() method to block until it knows whether to return true or not. I could have a peek object in my AsyncIterator and I could change hasNext() to take an object from the queue and have next() return this peek. But this would cause hasNext() to block indefinitely if the delegate iterator's end has been reached.

Instead of utilizing the ArrayBlockingQueue I could of course do thread communication myself:

private static class AsyncIterator<T> implements Iterator<T> {

  private final Queue<T> queue = new LinkedList<T>();
  private boolean delegateDone = false;

  private AsyncIterator(final Iterator<T> delegate) {
    new Thread() {
      @Override
      public void run() {
        while (delegate.hasNext()) {
          final T next = delegate.next();
          synchronized (AsyncIterator.this) {
            queue.add(next);
            AsyncIterator.this.notify();
          }
        }
        synchronized (AsyncIterator.this) {
          delegateDone = true;
          AsyncIterator.this.notify();
        }
      }
    }.start();
  }

  @Override
  public boolean hasNext() {
    synchronized (this) {
      while (queue.size() == 0 && !delegateDone) {
        try {
          wait();
        } catch (InterruptedException e) {
          throw new Error(e);
        }
      }
    }
    return queue.size() > 0;
  }

  @Override
  public T next() {
    return queue.remove();
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }
}

However all the extra synchronizations, waits and notifys don't really make the code any more readable and it is easy to hide a race condition somewhere.

Any better ideas?

Update

Yes I do know about common observer/observable patterns. However the usual implementations don't foresee an end to the flow of data and they are not iterators.

I specifically want an iterator here, because actually the above mentioned loop exists in an external library and it wants an Iterator.

like image 818
yankee Avatar asked Jan 15 '14 17:01

yankee


2 Answers

This is a tricky one, but I think I got the right answer this time. (I deleted my first answer.)

The answer is to use a sentinel. I haven't tested this code, and I removed try/catches for clarity:

public class AsyncIterator<T> implements Iterator<T> {

    private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
    private T sentinel = (T) new Object();
    private T next;

    private AsyncIterator(final Iterator<T> delegate) {
        new Thread() {
            @Override
            public void run() {
                while (delegate.hasNext()) {
                    queue.put(delegate.next());
                }
                queue.put(sentinel);
            }
        }.start();
    }

    @Override
    public boolean hasNext() {
        if (next != null) {
            return true;
        }
        next = queue.take(); // blocks if necessary
        if (next == sentinel) {
            return false;
        }
        return true;
    }

    @Override
    public T next() {
        T tmp = next;
        next = null;
        return tmp;
    }

}

The insight here is that hasNext() needs to block until the next item is ready. It also needs some kind of quit condition, and it can't use an empty queue or a boolean flag for that because of threading issues. A sentinel solves the problem without any locking or synchronization.

Edit: cached "next" so hasNext() can be called more than once.

like image 98
ccleve Avatar answered Nov 03 '22 00:11

ccleve


Or save yourself the headache and use RxJava:

import java.util.Iterator;

import rx.Observable;
import rx.Scheduler;
import rx.observables.BlockingObservable;
import rx.schedulers.Schedulers;

public class RxAsyncIteratorExample {

    public static void main(String[] args) throws InterruptedException {
        final Iterator<Integer> slowIterator = new SlowIntegerIterator(3, 7300);

        // the scheduler you use here will depend on what behaviour you
        // want but io is probably what you want
        Iterator<Integer> async = asyncIterator(slowIterator, Schedulers.io());
        while (async.hasNext()) {
            performLengthTask(async.next());
        }
    }

    public static <T> Iterator<T> asyncIterator(
            final Iterator<T> slowIterator,
            Scheduler scheduler) {

        final Observable<T> tObservable = Observable.from(new Iterable<T>() {
            @Override
            public Iterator<T> iterator() {
                return slowIterator;
            }
        }).subscribeOn(scheduler);

        return BlockingObservable.from(tObservable).getIterator();
    }

    /**
     * Uninteresting implementations...
     */
    public static void performLengthTask(Integer integer)
            throws InterruptedException {
        log("Running task for " + integer);
        Thread.sleep(10000l);
        log("Finished task for " + integer);
    }

    private static class SlowIntegerIterator implements Iterator<Integer> {
        private int count;
        private final long delay;

        public SlowIntegerIterator(int count, long delay) {
            this.count = count;
            this.delay = delay;
        }

        @Override
        public boolean hasNext() {
            return count > 0;
        }

        @Override
        public Integer next() {
            try {
                log("Starting long production " + count);
                Thread.sleep(delay);
                log("Finished long production " + count);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return count--;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private static final long startTime = System.currentTimeMillis();

    private static void log(String s) {
        double time = ((System.currentTimeMillis() - startTime) / 1000d);
        System.out.println(time + ": " + s);
    }
}

Gives me:

0.031: Starting long production 3
7.332: Finished long production 3
7.332: Starting long production 2
7.333: Running task for 3
14.633: Finished long production 2
14.633: Starting long production 1
17.333: Finished task for 3
17.333: Running task for 2
21.934: Finished long production 1
27.334: Finished task for 2
27.334: Running task for 1
37.335: Finished task for 1
like image 29
Duncan Irvine Avatar answered Nov 02 '22 23:11

Duncan Irvine