There is a data lake of CSV files that's updated throughout the day. I'm trying to create a Spark Structured Streaming job with the Trigger.Once
feature outlined in this blog post to periodically write the new data that's been written to the CSV data lake in a Parquet data lake.
Here's what I have:
val df = spark
.readStream
.schema(s)
.csv("s3a://csv-data-lake-files")
The following command wrote all the data to the Parquet lake, but didn't stop after all the data was written (I had to manually cancel the job).
processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
The following job also worked, but didn't stop after all the data was written either (I had to manually cancel the job):
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.awaitTermination()
The following command stopped the query before any data got written.
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.stop()
How can I configure the writeStream
query to wait until all the incremental data has been written to Parquet files and then stop?
I got Structured Streaming + Trigger.Once to work properly on a Parquet data lake.
I don't think it was working with the CSV data lake because the CSV data lake had a ton of small files in nested directories. Spark does not like working with small CSV files (I think it needs to open them all to read the headers) and really hates when it needs to glob S3 directories.
So I think the Spark Structured Streaming + Trigger.Once code is good - they just need to make the CSV reader tech better.
The main purpose of structured streaming is to process data continuously without a need to start/stop streams when new data arrives. Read this for more details.
Starting from Spark 2.0.0 StreamingQuery
has method processAllAvailable
that waits for all source data to be processed and committed to the sink. Please note that scala docs states to use this method for testing purpose only.
Therefore the code should look like this (if you still want it):
query.processAllAvailable
query.stop
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With