I'm exploring KeyValueGroupedDataset.flatMapGroupsWithState
for arbitrary stateful aggregation in Spark Structured Streaming.
The signature of the KeyValueGroupedDataset.flatMapGroupsWithState
operator is as follows:
flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]
What's the purpose of the OutputMode
argument?
While reviewing the source code (of the FlatMapGroupsWithStateExec that is the underlying physical operator) I could not find any place the OutputMode
would be used.
OutputMode is used to what data will be written to a streaming sink when there is new data available in a streaming DataFrame/Dataset.
Streaming – Append Output Mode OutputMode in which only the new rows in the streaming DataFrame/Dataset will be written to the sink. This is the default mode. Use append as output mode outputMode("append") when you want to output only new rows to the output sink.
Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset.
Both the Apache Spark streaming and the structured streaming models use micro- (or mini-) batching as their primary processing mechanisms. But it is the detail that changes. Ergo, Apache Spark uses DStreams, while structured streaming uses DataFrames to process these streams of data pouring into the analytics engine.
Indeed, I didn't find any uses as well. I have several theories about that:
The mode is here to stay consistent with the signature of org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState
logical operator. If you check org.apache.spark.sql.execution.SparkStrategies.BasicOperators
apply method, you will notice that very often the logical operator passes all its parameter to the physical operator. I'm not sure but it looks like a design guideline but it's only my supposition.
It can also be a legacy reason. FlatMapGroupsWithState
evolved from MapGroupsWithState
in order to enforce output modes semantics. It was implemented in this PR: https://github.com/apache/spark/pull/17197/files (SPARK-19858) with MapGroupsWithState
renamed to FlatMapGroupsWithState
and outputMode
added as a parameter. Maybe - if my theory from previous point is wrong - it's simply here because it passed the PR and nobody wanted to complain about it because "it was already here" principle?
Maybe in the future an outputMode
will be passed to the mapping function? I found that the node used to save streaming aggregations (StateStoreSaveExec
) uses output mode to figure out the entries to persist in the state store. Maybe it will be a new feature added soon for *withState
transformation, as stated in the comment btw:
- @param outputMode the output mode of
func
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With