Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Complete output mode require aggregation?

I work with the latest Structured Streaming in Apache Spark 2.2 and got the following exception:

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;

Why does Complete output mode require a streaming aggregation? What would happen if Spark allowed Complete output mode with no aggregations in a streaming query?

scala> spark.version
res0: String = 2.2.0

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.SQLContext
implicit val sqlContext: SQLContext = spark.sqlContext
val source = MemoryStream[(Int, Int)]
val ids = source.toDS.toDF("time", "id").
  withColumn("time", $"time" cast "timestamp"). // <-- convert time column from Int to Timestamp
  dropDuplicates("id").
  withColumn("time", $"time" cast "long")  // <-- convert time column back from Timestamp to Int

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
scala> val q = ids.
     |   writeStream.
     |   format("memory").
     |   queryName("dups").
     |   outputMode(OutputMode.Complete).  // <-- memory sink supports checkpointing for Complete output mode only
     |   trigger(Trigger.ProcessingTime(30.seconds)).
     |   option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts
     |   start
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;
Project [cast(time#10 as bigint) AS time#15L, id#6]
+- Deduplicate [id#6], true
   +- Project [cast(time#5 as timestamp) AS time#10, id#6]
      +- Project [_1#2 AS time#5, _2#3 AS id#6]
         +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, _2#3]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:247)
  ... 57 elided
like image 919
Jacek Laskowski Avatar asked Aug 18 '17 12:08

Jacek Laskowski


People also ask

What is output modes in Spark structured streaming?

The output mode specifies the way the data is written to the result table. These are the three different values: Append mode: this is the default mode. Just the new rows are written to the sink. Complete mode: it writes all the rows.

Which output mode is used in the case where the whole of the result table is outputted to the sink after a trigger?

Streaming – Complete Output Mode OutputMode in which all the rows in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. Use complete as output mode outputMode("complete") when you want to aggregate the data and output the entire results to sink every time.

Which of the output mode you will use when you know that you are not going to update any previous output and each batch will write new records only?

Append mode: It involves writing only the new incoming data to the sink. So this can be used when it is required to insert only the new data but not update the previous state of data. It is the default output mode. It does not support aggregation operations since aggregation depends on old data.

What is output mode?

OutputMode is used to what data will be written to a streaming sink when there is new data available in a streaming DataFrame/Dataset.


2 Answers

From the Structured Streaming Programming Guide - other queries (excluding aggregations, mapGroupsWithState and flatMapGroupsWithState):

Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.

To answer the question:

What would happen if Spark allowed Complete output mode with no aggregations in a streaming query?

Probably OOM.

The puzzling part is why dropDuplicates("id") is not marked as aggregation.

like image 107
Alper t. Turker Avatar answered Nov 13 '22 08:11

Alper t. Turker


I think the problem is the output mode. instead of using OutputMode.Complete, use OutputMode.Append as shown below.

scala> val q = ids
    .writeStream
    .format("memory")
    .queryName("dups")
    .outputMode(OutputMode.Append)
    .trigger(Trigger.ProcessingTime(30.seconds))
    .option("checkpointLocation", "checkpoint-dir")
    .start
like image 34
Thomas Okonkwo Avatar answered Nov 13 '22 08:11

Thomas Okonkwo