Why does the select
statement is printed every batch but the hello world
only once?
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)
val in: DataFrame = sparkSession.readStream
.schema(schema)
.format("csv")
.option("header", false)
.option("maxFilesPerTrigger", 1)
.option("delimiter", ";")
.load("s3://xxxxxxxx")
val input: DataFrame = in.select("*")
.transform { ds =>
println("hello world") // <-- Why is this printed out once?
ds
}
import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
.format("console")
.start
exactly once semantics are only possible if the source is re-playable and the sink is idempotent.
These are the three different values: Append mode: this is the default mode. Just the new rows are written to the sink. Complete mode: it writes all the rows.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.
Spark 2.1.0-SNAPSHOT here (built today) but I believe it didn't change between 2.0 and now.
$ ./bin/spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT
/_/
Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.
In Spark's Structured Streaming, your streaming application is a mere trick to apply the same physical query plan to the input data sources.
Please note that the physical query plan is what makes your Dataset
(and the more I'm with Spark SQL the more I see no difference between queries and datasets -- they're simply interchangeable these days).
When you describe a structured query (regardless of whether it is going to be a one-off or streaming query) it goes through 4 stages of parsing, analyzing, optimizing and finally producing physical plan. You can review it using explain(extended = true)
method.
scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]
The stages are lazy and executed only once.
Once you've got the physical plan, the stages won't be executed again. Your Dataset
pipeline is already computed and the only missing piece is the data to flow through the pipe.
That's why you see "hello world" only once -- when the streaming query plan was "executed" to produce the physical plan. It was executed once and optimized for processing the source Dataset
(and only the Dataset
so any side effects were already triggered).
An interesting case. That's a lot for bringing it up here!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With