Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the purpose of OutputMode in flatMapGroupsWithState? How/where is it used?

I'm exploring KeyValueGroupedDataset.flatMapGroupsWithState for arbitrary stateful aggregation in Spark Structured Streaming.

The signature of the KeyValueGroupedDataset.flatMapGroupsWithState operator is as follows:

flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]

What's the purpose of the OutputMode argument?

While reviewing the source code (of the FlatMapGroupsWithStateExec that is the underlying physical operator) I could not find any place the OutputMode would be used.

like image 525
Jacek Laskowski Avatar asked Jul 07 '19 11:07

Jacek Laskowski


People also ask

When can you use the OutputMode update method?

OutputMode is used to what data will be written to a streaming sink when there is new data available in a streaming DataFrame/Dataset.

What does the OutputMode append option do?

Streaming – Append Output Mode OutputMode in which only the new rows in the streaming DataFrame/Dataset will be written to the sink. This is the default mode. Use append as output mode outputMode("append") when you want to output only new rows to the output sink.

What does Spark readStream do?

Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset.

What is the difference between Spark streaming and structured streaming?

Both the Apache Spark streaming and the structured streaming models use micro- (or mini-) batching as their primary processing mechanisms. But it is the detail that changes. Ergo, Apache Spark uses DStreams, while structured streaming uses DataFrames to process these streams of data pouring into the analytics engine.


1 Answers

Indeed, I didn't find any uses as well. I have several theories about that:

  1. The mode is here to stay consistent with the signature of org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState logical operator. If you check org.apache.spark.sql.execution.SparkStrategies.BasicOperators apply method, you will notice that very often the logical operator passes all its parameter to the physical operator. I'm not sure but it looks like a design guideline but it's only my supposition.

  2. It can also be a legacy reason. FlatMapGroupsWithState evolved from MapGroupsWithState in order to enforce output modes semantics. It was implemented in this PR: https://github.com/apache/spark/pull/17197/files (SPARK-19858) with MapGroupsWithState renamed to FlatMapGroupsWithState and outputMode added as a parameter. Maybe - if my theory from previous point is wrong - it's simply here because it passed the PR and nobody wanted to complain about it because "it was already here" principle?

  3. Maybe in the future an outputMode will be passed to the mapping function? I found that the node used to save streaming aggregations (StateStoreSaveExec) uses output mode to figure out the entries to persist in the state store. Maybe it will be a new feature added soon for *withState transformation, as stated in the comment btw:

    • @param outputMode the output mode of func
like image 187
Bartosz Konieczny Avatar answered Sep 21 '22 13:09

Bartosz Konieczny