Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Spark Structured Streaming with Trigger.Once

There is a data lake of CSV files that's updated throughout the day. I'm trying to create a Spark Structured Streaming job with the Trigger.Once feature outlined in this blog post to periodically write the new data that's been written to the CSV data lake in a Parquet data lake.

Here's what I have:

val df = spark
  .readStream
  .schema(s)
  .csv("s3a://csv-data-lake-files")

The following command wrote all the data to the Parquet lake, but didn't stop after all the data was written (I had to manually cancel the job).

processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

The following job also worked, but didn't stop after all the data was written either (I had to manually cancel the job):

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.awaitTermination()

The following command stopped the query before any data got written.

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.stop()

How can I configure the writeStream query to wait until all the incremental data has been written to Parquet files and then stop?

like image 470
Powers Avatar asked Aug 16 '17 04:08

Powers


2 Answers

I got Structured Streaming + Trigger.Once to work properly on a Parquet data lake.

I don't think it was working with the CSV data lake because the CSV data lake had a ton of small files in nested directories. Spark does not like working with small CSV files (I think it needs to open them all to read the headers) and really hates when it needs to glob S3 directories.

So I think the Spark Structured Streaming + Trigger.Once code is good - they just need to make the CSV reader tech better.

like image 177
Powers Avatar answered Oct 08 '22 02:10

Powers


The main purpose of structured streaming is to process data continuously without a need to start/stop streams when new data arrives. Read this for more details.

Starting from Spark 2.0.0 StreamingQuery has method processAllAvailable that waits for all source data to be processed and committed to the sink. Please note that scala docs states to use this method for testing purpose only.

Therefore the code should look like this (if you still want it):

query.processAllAvailable
query.stop
like image 24
Yuriy Bondaruk Avatar answered Oct 08 '22 00:10

Yuriy Bondaruk