Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle a dynamic collection as stream?

Java 8 collections provide features to obtain a collection as stream. However once we call stream() method, we get current contents of the collection as stream. What if my collection grows during stream processing? The operations on the stream might update the collection with more data. Is there a simple & effective way to handle this situation?

( I tried Stream.concat() from within the stream processing operation but I get exception: Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed )

Taking a specific example, suppose I have a concurrent queue of urls.

Queue<Url> concurrentUrlQue= initUrlQueue();

Now I want to obtain stream of this url queue, and process the URLs one by one. The process involves removing url from queue, reading the web pages pointed by the url, extracting URLs from the page and adding those URLs into the concurrent queue.

concurrentUrlQue.stream().forEach((url)->readAndExtractUrls(url, concurrentUrlQue));

I want to be able to handle the above dynamically growing queue as a stream. (Further, I want to be able to process this dynamic queue using parallel stream)

Is there a simple way of achieving this using java streams?

like image 350
RajatJ Avatar asked Apr 04 '17 14:04

RajatJ


1 Answers

You need to write spliterator that blocks on waiting for the new element.

class QueueSpliterator<T> extends Spliterators.AbstractSpliterator<T> {

  private final BlockingQueue<T> queue;

  public QueueSpliterator(BlockingQueue<T> queue) {
    super(Long.MAX_VALUE, 0);
    this.queue = queue;
  }

  public boolean tryAdvance(Consumer<? super T> action) {
    try {
      T element = queue.take();
      action.accept(element);
      return true;
    } catch (InterruptedException e) {
      return false;
    }
  }
}

Then you create a stream using that spliterator and handle it like an ordinary infinite stream.

public class Main {
  public static void main(String... args) {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);

    new Thread(() -> {
      for (int i = 0; i < 1000; ++i) {
        try {
          queue.put(i);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
    }).start();


    Spliterator<Integer> queueSpliterator = new QueueSpliterator<>(queue);
    Stream<Integer> stream = StreamSupport.stream(queueSpliterator, false);

    stream.forEach(System.out::println);
  }
}
like image 195
Nikita Marshalkin Avatar answered Oct 13 '22 08:10

Nikita Marshalkin