Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use a external trigger to stop structured streaming query?

I am using spark structured streaming and I want to check if a stop file exists to exit my program.

I can do something like:

def main(args: Array[String]) = {
    val query = SparkSession...load.writeStream.foreachBatch{
      if (stop file exist) exit(0)
      // do some processing here
    }.start()
    // add Execute Listener here to listen query
    query.awaitTermination()
}

However, this could only be triggered if there are new rows appended to this query table. If there are no new rows, the stop file would have no impact whatsoever.

Any better idea to implement this trigger?


Above is the question, thanks to the accepted answer below, here is my code that finally works well.

object QueryManager {
  def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
    override def run() = {if(stop condition) query.stop()}
  }
  def listenTermination(query: StreamingQuery) = {
    Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
      queryTerminator(query), initialDelay=1, delay=1, SECONDS
    )
  }
}
// and in main method
def main(args: Array[String]) = {
    val query = SparkSession...load.writeStream.foreachBatch{      
      // do some processing here
    }.start()
    // add Execute Listener here to listen query
    QueryManager.listenTermination(query)

    query.awaitTermination()


    // I am not familar with scala, 
    // but it seems would not exit if we do not add this
    System.exit(0) 
}

Please let me know if there is any wrong about it.

like image 946
Litchy Avatar asked Jan 25 '23 12:01

Litchy


1 Answers

Any better idea to implement this trigger?

A streaming query is a separate daemon thread of a Structured Streaming application. It runs forever until it is stopped using StreamingQuery.stop.

There are at least two ways to access a running streaming query:

  1. DataStreamWriter.start()
  2. StreamingQueryManager

The idea is to have a "control thread" in your Structured Streaming application that would listen to stop requests (with the ID of the streaming query or queries) and simply execute stop on the running streaming queries.


Think of a Spark Structured Streaming application as a single-JVM application with multiple threads. You can have one more to control the threads. That's the basic idea.

like image 182
Jacek Laskowski Avatar answered May 16 '23 11:05

Jacek Laskowski