Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream API and Queues: Subscribe to BlockingQueue stream-style

Let's say we have a Queue

BlockingQueue<String> queue= new LinkedBlockingQueue<>(); 

and some other thread puts values in it, then we read it like

while (true) {     String next = queue.take();     System.out.println("next message:" + next); } 

How can I iterate over this queue in stream style, while maintaining similar semantics to above code.

This code only traverses the current queue state:

queue.stream().forEach(e -> System.out.println(e)); 
like image 930
Mikhail Boyarsky Avatar asked May 04 '14 22:05

Mikhail Boyarsky


People also ask

What is the difference between queue and BlockingQueue in Java?

A blocking queue is a queue which provides insert and remove operations that block or keep waiting until they are performed. The blocking queues are usually used in Producer-Consumer frameworks. This interface extends Queue and exists since Java 5. Null elements are not allowed.

What is Strem API?

The Streams API allows JavaScript to programmatically access streams of data received over the network and process them as desired by the developer.

What is BlockingQueue used for?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.


1 Answers

I'm guessing a bit at what you're expecting, but I think I have a good hunch.

The stream of a queue, like iterating over a queue, represents the current contents of the queue. When the iterator or the stream reaches the tail of the queue, it doesn't block awaiting further elements to be added. The iterator or the stream is exhausted at that point and the computation terminates.

If you want a stream that consists of all current and future elements of the queue, you can do something like this:

Stream.generate(() -> {         try {             return queue.take();         } catch (InterruptedException ie) {             return "Interrupted!";         }     })     .filter(s -> s.endsWith("x"))     .forEach(System.out::println);    

(Unfortunately the need to handle InterruptedException makes this quite messy.)

Note that there is no way to close a queue, and there is no way for Stream.generate to stop generating elements, so this is effectively an infinite stream. The only way to terminate it is with a short-circuiting stream operation such as findFirst.

like image 176
Stuart Marks Avatar answered Sep 17 '22 21:09

Stuart Marks