Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

One upstream stream feeding multiple downstream streams

I have a general Streams API problem I'd like to solve "efficiently". Suppose I have a (possibly very large, possibly infinite) stream. I want to pre-process it in some way, for example, filtering out some items, and mutating some. Let's assume that this pre-processing is complex, time and compute intensive, so I do not want to do it twice.

Next I want to do two distinct sets of operations to the item sequence and process the far end of each distinct sequence with a different stream-type construction. For an infinite stream, this would be a forEach, for a finite one, it might be a collector or whatever.

Clearly, I might collect the intermediate results into a list, then drag two separate streams off that list, processing each stream individually. That would work for a finite stream, though a) it seems "ugly" and b) it's potentially impractical for a very large stream, and flat out won't work for an infinite stream.

I guess I could use peek as a kind of "tee". I could then perform one chain of processing on the results downstream of peek and somehow coerce the Consumer in peek to do the the "other" work, but now that second path is no longer a stream.

I have discovered that I can create a BlockingQueue, use peek to push things into that queue, and then obtain a stream from the queue. This seems to be a fine idea and actually works quite well, though I fail to understand how the stream gets closed (it actually does, but I cannot see how). Here's sample code illustrating this:

List<Student> ls = Arrays.asList(
  new Student("Fred", 2.3F)
  // more students (and Student definition) elided ...
);

BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();

ls.stream()
  .peek(s -> {
     try {
       pipe.put(s);
     } catch (InterruptedException ioe) {
       ioe.printStackTrace();
     }
   })
   .forEach(System.out::println);

   new Thread(
     new Runnable() {
       public void run() {
         Map<String, Double> map = 
           pipe.stream()
             .collect(Collectors.groupingBy(s->s.getName(),
                      Collectors.averagingDouble(s->s.getGpa())));
         map.forEach(
           (k,v)->
             System.out.println(
               "Students called " + k 
               + " average " + v));

       }
     }).start();

So, the first question is: is there a "better" way to do this?

Second question, how the heck is that stream on the BlockingQueue getting closed?

Cheers, Toby

like image 990
Toby Eggitt Avatar asked May 03 '15 18:05

Toby Eggitt


People also ask

Can a stream be consumed more than once?

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream.

Can we have multiple filter in stream?

The Stream API allows chaining multiple filters. We can leverage this to satisfy the complex filtering criteria described. Besides, we can use the not Predicate if we want to negate conditions.

What is stream forEach?

Stream forEach(Consumer action) performs an action for each element of the stream. Stream forEach(Consumer action) is a terminal operation i.e, it may traverse the stream to produce a result or a side-effect.

Are streams better than for loops?

If you have a small list, loops perform better. If you have a huge list, a parallel stream will perform better. Purely thinking in terms of performance, you shouldn't use a for-each loop with an ArrayList, as it creates an extra Iterator instance that you don't need (for LinkedList it's a different matter).


1 Answers

Interesting problem. I'll answer the second question first, since it's a simpler issue.

Second question, how the heck is that stream on the BlockingQueue getting closed?

By "closed" I think you mean, the stream has certain number of elements and then it finishes, disregarding any elements that may be added to the queue in the future. The reason is that a stream on a queue represents only the current contents of a queue as of the time the stream is created. It doesn't represent any future elements, that is, those that some other thread might add in the future.

If you want a stream that represents the current and future contents of the queue, then you can use the technique described in this other answer. Basically use Stream.generate() to call queue.take(). I don't think this is what you want to do, though, so I won't discuss it further here.

Now to your larger issue.

You have a source of objects upon which you want to do some processing, including filtering. You then want to take the results and send them through two different downstream processing steps. Essentially you have a single producer and two consumers.

One of the fundamental issues you have to handle is how to deal with the cases where the different processing steps occur at different rates. Suppose that we've solve the issue of how to get a stream from queue without the stream terminating prematurely. If the producer can produce elements faster than the consumer can process elements from this queue, the queue will accumulate elements until it fills all available memory.

You also have to decide how to deal with the different consumers processing elements at different rates. If one consumer is significantly slower than the other, either an arbitrary number of elements may need to be buffered (which might fill memory) or the faster consumer will have to be slowed down to match the average rate of the slower consumer.

Let me toss out a sketch of how you might proceed. I don't know your actual requirements, though, so I have no idea whether this will be satisfactory. One thing to note is that using parallel streams with this kind of application can be problematic, since parallel streams don't deal with blocking and load-balancing very well.

First, I'd start off with a stream processing elements from the producer and accumulating them into an ArrayBlockingQueue:

BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity);
producer.map(...)
        .filter(...)
        .forEach(queue::put);

(Note that put throws InterruptedException, so you can't just put queue::put here. You have to put a try-catch block here, or write a helper method instead. But it's not obvious what to do if InterruptedException is caught.)

If the queue fills up, this will block the pipeline. Either run this sequentially in its own thread, or if in parallel, in a dedicated thread pool, to avoid blocking up the common pool.

Next, the consumers:

while (true) {
    // wait until the queue is full, or a timeout has expired,
    // depending upon how frequently you want to continue
    // processing elements emitted by the producer
    List<T> list = new ArrayList<>();
    queue.drainTo(list);
    downstream1 = list.stream().filter(...).map(...).collect(...);
    downstream2 = list.stream().filter(...).map(...).collect(...);
    // deal with results downstream1 and downstream2
}

The key here is the handoff from the producer to the consumers is done in batches with the drainTo method, which adds the elements of the queue to the destination and atomically empties the queue. This way, the consumers don't have to wait for the producer to finish its processing (which won't happen if it's infinite). In addition, the consumers are operating on a known quantity of data and won't block in the midst of processing. Each consumer stream could thus conceivably be run in parallel, if that's helpful.

Here, I have the consumers running in lockstep. If you want the consumers to run at different rates, you'll have to construct additional queues (or something) to buffer up their workloads independently.

If the consumers are overall slower than the producer, the queue will eventually fill up and be blocked, slowing down the producer to the rate the consumers can accept. If the consumers are faster than the producer on average, then maybe you don't need to worry about the relative processing rates of the consumers. You can just have them loop and pick up whatever the producer has managed to put into the queue, or even have them block until something is available.

I should say that what I've outlined is a very simplistic approach to multi-stage pipelining. If your application is performance critical, you may find yourself doing a lot of work tuning memory consumption, load balancing, increasing throughput and reducing latency. There are other frameworks out there that may be more amenable to your application. You might take a look at the LMAX Disruptor, for example, though I don't have any experience with it myself.

like image 174
Stuart Marks Avatar answered Oct 19 '22 02:10

Stuart Marks