I am using spark structured streaming and I want to check if a stop
file exists to exit my program.
I can do something like:
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
if (stop file exist) exit(0)
// do some processing here
}.start()
// add Execute Listener here to listen query
query.awaitTermination()
}
However, this could only be triggered if there are new rows appended to this query table. If there are no new rows, the stop
file would have no impact whatsoever.
Any better idea to implement this trigger?
Above is the question, thanks to the accepted answer below, here is my code that finally works well.
object QueryManager {
def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
override def run() = {if(stop condition) query.stop()}
}
def listenTermination(query: StreamingQuery) = {
Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
queryTerminator(query), initialDelay=1, delay=1, SECONDS
)
}
}
// and in main method
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
// do some processing here
}.start()
// add Execute Listener here to listen query
QueryManager.listenTermination(query)
query.awaitTermination()
// I am not familar with scala,
// but it seems would not exit if we do not add this
System.exit(0)
}
Please let me know if there is any wrong about it.
Any better idea to implement this trigger?
A streaming query is a separate daemon thread of a Structured Streaming application. It runs forever until it is stopped using StreamingQuery.stop.
There are at least two ways to access a running streaming query:
The idea is to have a "control thread" in your Structured Streaming application that would listen to stop requests (with the ID of the streaming query or queries) and simply execute stop
on the running streaming queries.
Think of a Spark Structured Streaming application as a single-JVM application with multiple threads. You can have one more to control the threads. That's the basic idea.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With