In the javaodoc for the stream
package, at the end of the section Parallelism
, I read:
Most stream operations accept parameters that describe user-specified behavior, which are often lambda expressions. To preserve correct behavior, these behavioral parameters must be non-interfering, and in most cases must be stateless.
I have hard time understanding this "in most cases". In which cases is it acceptable/desirable to have a stateful stream operation?
I mean, I know it is possible, specially when using sequential streams, but the same javadoc clearly states:
Except for operations identified as explicitly nondeterministic, such as
findAny()
, whether a stream executes sequentially or in parallel should not change the result of the computation.
And also:
Note also that attempting to access mutable state from behavioral parameters presents you with a bad choice with respect to safety and performance; [...] The best approach is to avoid stateful behavioral parameters to stream operations entirely; there is usually a way to restructure the stream pipeline to avoid statefulness.
So, my question is: in which circumstances is it a good practice to use a stateful stream operation (and not for methods working by side-effect, such as forEach
)?
A related question could be: why are there operations working by side effect, such as forEach
? I always end up doing a good old for
loop to avoid having side-effects in my lambda expression.
Kafka Streams is a Java library for developing stream processing applications on top of Apache Kafka. This is the first in a series of blog posts on Kafka Streams and its APIs.
A stream supports two types of operation with reference to the way they pull data elements from the data source; one is called lazy or terminal operation and the other is called eager or intermediate operation.
In general, stateful stream processing is an application design pattern for processing an unbounded stream of events. Stateful stream processing means a “State” is shared between events(stream entities). And therefore past events can influence the way the current events are processed.
And there are stateful operations, such as distinct() , limit() , sorted() , reduce() , and collect() , which may pass the state from previously processed elements to the processing of the next element.
Examples of stateful stream lambdas:
collect(Collector)
: The Collector
is by definition stateful, since it has to collect all the elements in a collection (state).forEach(Consumer)
: The Consumer
is by definition stateful, well except if it's a black hole (no-op).peek(Consumer)
: The Consumer
is by definition stateful, because why peek if not to store it somewhere (e.g. log).So, Collector
and Consumer
are two lambda interfaces that by definition are stateful.
All the others, e.g. Predicate
, Function
, UnaryOperator
, BinaryOperator
, and Comparator
, should be stateless.
I have hard time understanding this "in most cases". In which cases is it acceptable/desirable to have a stateful stream operation?
Suppose following scenario. You have a Stream<String>
and you need to list the items in natural order prefexing each one with order number. So, for example on input you have: Banana
, Apple
and Grape
. Output should be:
1. Apple
2. Banana
3. Grape
How you solve this task in Java Stream API? Pretty easily:
List<String> f = asList("Banana", "Apple", "Grape");
AtomicInteger number = new AtomicInteger(0);
String result = f.stream()
.sorted()
.sequential()
.map(i -> String.format("%d. %s", number.incrementAndGet(), i))
.collect(Collectors.joining("\n"));
Now if you look at this pipeline you'll see 3 stateful operations:
sorted()
– stateful by definition. See documetation to Stream.sorted()
:
This is a stateful intermediate operation
map()
– by itself could be stateless or not, but in this case it is not. To label positions you need to keep track of how much items already labeled;collect()
– is mutable reduction operation (from docs to Stream.collect()
). Mutable operations are stateful by definition, because they change (mutate) shared state.There are some controversy about why sorted()
is stateful. From the Stream API documentation:
Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements.
So when applying term stateful/stateless to a Stream API we're talking more about function processing element of a stream, and not about function processing stream as a whole.
Also note that there is some confusion between terms stateless and deterministic. They are not the same.
Deterministic function provide same result given same arguments.
Stateless function retain no state from previous calls.
Those are different definitions. And in general case doesn't depend on each other. Determinism is about function result value while statelessness about function implementation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With