Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Find Stream size before performing other operations

In my program, I repeatedly1collect Java 8 streams to reduce a collection of objects to a single one. The size of this collection can vary a lot throughout the execution: from 3 objects to hundreds.

public void findInterestingFoo(Stream<Foo> foos) {
    internalState.update(foos.collect(customCollector()));
}

In the process of optimizing my code and searching for bottlenecks, I made the stream parallel at some point. This worked at that point in time, as the collections were all fairly big. Later, after changing other parts and parameters of the program, the collections became smaller. I realized that not making the stream parallel was more efficient. This makes sense: the overhead of distributing work over multiple threads for 4 objects is simply not worth it. It is worth it for hundreds of objects though.

It would be really convenient if I could make only big streams parallel:

public void findInterestingFoo(Stream<Foo> foos) {
    if (isSmall(foos)) {
        internalState.update(foos.collect(customCollector()));
    } else {
        internalState.update(foos.parallel().collect(customCollector()));
    }
}

Of course, this is possible to do manually when the stream is created from an array, a collection, or manually. That is, we know what elements go in the stream, so this can be tracked. I am however interested in solving this in a generic way, so that no matter what kind of stream is passed to findInterestingFoo, it is handled appropriately and as efficiently as possible.

Something like count() might help, except it terminates the stream before I can collect it.

I am well aware that streams are designed to not have a set size, in particular:

  • Possibly unbounded. While collections have a finite size, streams need not. Short-circuiting operations such as limit(n) or findFirst() can allow computations on infinite streams to complete in finite time. — java.util.stream package description

Still, I wonder if there is any way to determine how many elements are in a stream before performing any operations on it. Does a stream really not know that it is created from a finite collection?

__________
1 Thousands of times. Optimizing this led to a speedup from about 1.5 to 0.5 seconds total running time in my case.

like image 250
Just a student Avatar asked Jan 22 '18 11:01

Just a student


People also ask

How do you determine stream size?

If you want to get the size of the stream after the filtering, you call count() on it. long sizeAfterFilter = locales. stream(). filter(l -> l.

When performing operations on a stream will it affect the original stream?

A stream can be composed of multiple functions that create a pipeline that data that flows through. This data cannot be mutated. That is to say the original data structure doesn't change. However the data can be transformed and later stored in another data structure or perhaps consumed by another operation.

What does .stream do in Java?

Introduced in Java 8, the Stream API is used to process collections of objects. A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result.

What are two kinds of operations you can apply on a stream?

There are two types of operations in streams, some operations produce another stream as a result and some operations produce non-stream values as a result.

What are the stream operations in Java?

Java Streams - Java Stream Operations. The commonly used stream operations are listed as follows. Returns a stream consisting of the distinct elements by checking equals() method. Returns a stream that match the specified predicate. Produces a stream flattened. truncates a stream by number. Applies the action for debugging.

How do you know if a stream is ordered or unordered?

Returns the first element of the stream. For an ordered stream, it returns the first element; for an unordered stream, it returns any element. Returns true if no elements in the stream match the specified predicate, false otherwise. Returns true if the stream is empty.

How to check if two elements are equal in a stream?

It uses the equals () method of the elements to decide whether two elements are equal or not: These operations all take a predicate and return a boolean. Short-circuiting is applied and processing is stopped as soon as the answer is determined: allMatch () checks if the predicate is true for all the elements in the stream.

What are the common usages and operations of the stream support?

Let’s now see some common usages and operations we can perform on and with the help of the stream support in the language. forEach () is simplest and most common operation; it loops over the stream elements, calling the supplied function on each element. The method is so common that is has been introduced directly in Iterable, Map etc:


1 Answers

In theory, you could do something like this:

public void findInterestingFoo(Stream<Foo> foos) {
    Spliterator<Foo> sp = foos.spliterator();
    long size = sp.getExactSizeIfKnown();// returns -1 if not known
          // or sp.estimateSize(); // Long.MAX_VALUE means "unknown"
    internalState.update(
        StreamSupport.stream(sp, size > PARALLEL_THRESHOLD)
                     .collect(customCollector()));
}

spliterator() is a terminal operation that consumes the input stream, but you can pass the Spliterator to StreamSupport.stream to construct a stream with exactly the same properties. The second parameter already tells whether the stream should be parallel.

In theory.

In practice, the current stream implementation will return different Spliterator implementations depending on whether the stream is parallel or not. This implies that recreating the stream as a parallel stream may end up with a stream that is incapable of doing parallel processing when the original stream wasn’t already parallel before calling spliterator().

It does work well, however, if there are no intermediate operations, e.g. when you directly pass in the Stream created from a collection or array.

Calling parallel() before spliterator() to get a parallel capable stream that may still run sequentially if you decide to do so, works in a lot of cases. However, if there are stateful intermediate operations like sorted() in the input stream, they may get fixed to run in parallel then, even if you do the collect sequentially (or vice versa).


Another problem is of a fundamental nature. The number of elements doesn’t actually say whether there will be a benefit in parallel processing or not. This does depend on the per-element workload, which does not only depend on your terminal collect operation but also on the operations already chained to the stream before entering your method. Even if you conclude that your collector’s workload is already high enough to deserve parallel processing, it might be that the incoming stream has operations like skip, limit or distinct (on an ordered stream), which often run worse in parallel and would require an entirely different threshold.

A simpler solution is to let the caller decide, as the caller knows something about the size and nature of the stream. You don’t even need to add an option to your method’s signature as the caller can already make the decision by calling parallel() or sequential() on the stream before passing it to your method, and you can respect that by simply not changing the mode.

like image 96
Holger Avatar answered Oct 21 '22 08:10

Holger