I'm starting to learn Spark and am having a difficult time understanding the rationality behind Structured Streaming in Spark. Structured streaming treats all the data arriving as an unbounded input table, wherein every new item in the data stream is treated as new row in the table. I have the following piece of code to read in incoming files to the csvFolder
.
val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
val csvSchema = new StructType().add("street", "string").add("city", "string")
.add("zip", "string").add("state", "string").add("beds", "string")
.add("baths", "string").add("sq__ft", "string").add("type", "string")
.add("sale_date", "string").add("price", "string").add("latitude", "string")
.add("longitude", "string")
val streamingDF = spark.readStream.schema(csvSchema).csv("./csvFolder/")
val query = streamingDF.writeStream
.format("console")
.start()
What happens if I dump a 1GB file to the folder. As per the specs, the streaming job is triggered every few milliseconds. If Spark encounters such a huge file in the next instant, won't it run out of memory while trying to load the file. Or does it automatically batch it? If yes, is this batching parameter configurable?
The key idea is to treat any data stream as an unbounded table: new records added to the stream are like rows being appended to the table. This allows us to treat both batch and streaming data as tables.
Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.
The key idea is to treat any data stream as an unbounded table: new records added to the stream are like rows being appended to the table. This allows us to treat both batch and streaming data as tables. Since tables and DataFrames/Datasets are semantically synonymous, the same batch-like DataFrame/Dataset queries can be applied to both batch and streaming data.
In Structured Streaming Model, this is how the execution of this query is performed.
Question : If Spark encounters such a huge file in the next instant, won't it run out of memory while trying to load the file. Or does it automatically batch it? If yes, is this batching parameter configurable?
Answer : There is no point of OOM since it is RDD(DF/DS)lazily initialized. of course you need to re-partition before processing to ensure equal number of partitions and data spread across executors uniformly...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With