Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write streaming DataFrame into multiple sinks in Spark Structured Streaming

I have a set of SQL rules which I need to apply on a streaming dataframe inside foreachBatch(). After applying those rules, the resultant/filtered dataframe should be written to multiple destinations like "delta" and "cosmos DB".

Below is what I have tried: Using that static dataframe coming from forEachBatch() method, I am trying to create a temp view as below.

df.writeStream
  .format("delta")
  .foreachBatch(writeToDelta _)
  .outputMode("update")
  .start()

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
    microBatchOutputDF.createOrReplaceTempView("testTable")
}

But while running the code, its showing as table or view 'testTable' not found.

Is it possible to create a temp table/view using static dataframe in spark structured streaming ?

Or how can I write to multiple sinks?

like image 489
Shane Avatar asked Oct 23 '25 04:10

Shane


1 Answers

From the comments clarifying OPs question:

"I have a set of SQL rules which I need to apply on the dataframe inside forEachBatch(). After applying the rules, the resultant/filtered dataframe will be written to multiple destinations like delta and cosmos DB."

The foreachBatch allows you to

  • Reuse existing batch data sources
  • Write to multiple locations

In your case I understand you want to apply different transformations on your streaming dataframe and write it to multiple locations. You can do it like below:

df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

  // persist dataframe in case you are reusing it multiple times
  batchDF.persist()

  // apply SQL logic using `selectExpr` or just the DataFrame API
  val deltaBatchDf = batchDF.selectExpr("") 
  val cosmosBatchDf = batchDF.selectExpr("") 

  // write to multiple sinks like you would do with batch DataFrames
  // add more locations if required
  deltaBatchDf.write.format("delta").options(...).save(...)
  cosmosBatchDf.write.format("cosmos").options(...).save(...)

  // free memory
  batchDF.unpersist()
}

like image 69
Michael Heil Avatar answered Oct 25 '25 23:10

Michael Heil



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!