Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In which cases Stream operations should be stateful?

In the javaodoc for the stream package, at the end of the section Parallelism, I read:

Most stream operations accept parameters that describe user-specified behavior, which are often lambda expressions. To preserve correct behavior, these behavioral parameters must be non-interfering, and in most cases must be stateless.

I have hard time understanding this "in most cases". In which cases is it acceptable/desirable to have a stateful stream operation?

I mean, I know it is possible, specially when using sequential streams, but the same javadoc clearly states:

Except for operations identified as explicitly nondeterministic, such as findAny(), whether a stream executes sequentially or in parallel should not change the result of the computation.

And also:

Note also that attempting to access mutable state from behavioral parameters presents you with a bad choice with respect to safety and performance; [...] The best approach is to avoid stateful behavioral parameters to stream operations entirely; there is usually a way to restructure the stream pipeline to avoid statefulness.

So, my question is: in which circumstances is it a good practice to use a stateful stream operation (and not for methods working by side-effect, such as forEach)?

A related question could be: why are there operations working by side effect, such as forEach? I always end up doing a good old for loop to avoid having side-effects in my lambda expression.

like image 681
FBB Avatar asked Oct 10 '15 20:10

FBB


People also ask

Which method is used for developing stateless streaming applications?

Kafka Streams is a Java library for developing stream processing applications on top of Apache Kafka. This is the first in a series of blog posts on Kafka Streams and its APIs.

What kind of operations are supported by streams?

A stream supports two types of operation with reference to the way they pull data elements from the data source; one is called lazy or terminal operation and the other is called eager or intermediate operation.

What is stateful stream processing?

In general, stateful stream processing is an application design pattern for processing an unbounded stream of events. Stateful stream processing means a “State” is shared between events(stream entities). And therefore past events can influence the way the current events are processed.

Which of the following is a stateful operation?

And there are stateful operations, such as distinct() , limit() , sorted() , reduce() , and collect() , which may pass the state from previously processed elements to the processing of the next element.


2 Answers

Examples of stateful stream lambdas:

  • collect(Collector): The Collector is by definition stateful, since it has to collect all the elements in a collection (state).
  • forEach(Consumer): The Consumer is by definition stateful, well except if it's a black hole (no-op).
  • peek(Consumer): The Consumer is by definition stateful, because why peek if not to store it somewhere (e.g. log).

So, Collector and Consumer are two lambda interfaces that by definition are stateful.

All the others, e.g. Predicate, Function, UnaryOperator, BinaryOperator, and Comparator, should be stateless.

like image 100
Andreas Avatar answered Oct 23 '22 01:10

Andreas


I have hard time understanding this "in most cases". In which cases is it acceptable/desirable to have a stateful stream operation?

Suppose following scenario. You have a Stream<String> and you need to list the items in natural order prefexing each one with order number. So, for example on input you have: Banana, Apple and Grape. Output should be:

1. Apple
2. Banana
3. Grape

How you solve this task in Java Stream API? Pretty easily:

List<String> f = asList("Banana", "Apple", "Grape");

AtomicInteger number = new AtomicInteger(0);
String result = f.stream()
  .sorted()
  .sequential()
  .map(i -> String.format("%d. %s", number.incrementAndGet(), i))
  .collect(Collectors.joining("\n"));

Now if you look at this pipeline you'll see 3 stateful operations:

  • sorted() – stateful by definition. See documetation to Stream.sorted():

    This is a stateful intermediate operation

  • map() – by itself could be stateless or not, but in this case it is not. To label positions you need to keep track of how much items already labeled;
  • collect() – is mutable reduction operation (from docs to Stream.collect()). Mutable operations are stateful by definition, because they change (mutate) shared state.

There are some controversy about why sorted() is stateful. From the Stream API documentation:

Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements.

So when applying term stateful/stateless to a Stream API we're talking more about function processing element of a stream, and not about function processing stream as a whole.

Also note that there is some confusion between terms stateless and deterministic. They are not the same.

Deterministic function provide same result given same arguments.

Stateless function retain no state from previous calls.

Those are different definitions. And in general case doesn't depend on each other. Determinism is about function result value while statelessness about function implementation.

like image 2
Denis Bazhenov Avatar answered Oct 23 '22 01:10

Denis Bazhenov