Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does transform do side effects (println) only once in Structured Streaming?

Why does the select statement is printed every batch but the hello world only once?

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

val in: DataFrame = sparkSession.readStream
 .schema(schema)
 .format("csv")
 .option("header", false)
 .option("maxFilesPerTrigger", 1)
 .option("delimiter", ";")
 .load("s3://xxxxxxxx")

val input: DataFrame = in.select("*")
 .transform { ds =>
   println("hello world")  // <-- Why is this printed out once?
   ds
}

import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
  .format("console")
  .start
like image 308
T. Bombeke Avatar asked Aug 23 '16 07:08

T. Bombeke


People also ask

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

What are different output mode in Spark structured streaming?

These are the three different values: Append mode: this is the default mode. Just the new rows are written to the sink. Complete mode: it writes all the rows.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

What is the use of saveAsObjectFiles () operation on Dstreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.


1 Answers

Spark 2.1.0-SNAPSHOT here (built today) but I believe it didn't change between 2.0 and now.

$ ./bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
      /_/

Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.

In Spark's Structured Streaming, your streaming application is a mere trick to apply the same physical query plan to the input data sources.

Please note that the physical query plan is what makes your Dataset (and the more I'm with Spark SQL the more I see no difference between queries and datasets -- they're simply interchangeable these days).

When you describe a structured query (regardless of whether it is going to be a one-off or streaming query) it goes through 4 stages of parsing, analyzing, optimizing and finally producing physical plan. You can review it using explain(extended = true) method.

scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]

The stages are lazy and executed only once.

Once you've got the physical plan, the stages won't be executed again. Your Dataset pipeline is already computed and the only missing piece is the data to flow through the pipe.

That's why you see "hello world" only once -- when the streaming query plan was "executed" to produce the physical plan. It was executed once and optimized for processing the source Dataset (and only the Dataset so any side effects were already triggered).

An interesting case. That's a lot for bringing it up here!

like image 112
Jacek Laskowski Avatar answered Sep 22 '22 15:09

Jacek Laskowski