I'm using Spark 2.1 and trying to stop a Streaming query gracefully.
Is StreamingQuery.stop()
a graceful stop because I haven't seen any detailed information on this method in the documentation:
void stop()
Stops the execution of this query if it is running. This method blocks until the threads performing execution has stopped. Since: 2.0.0
Whereas in the past Streaming world (DStreams) there is an option to stop the execution of the streams, with option of ensuring all received data has been processed:
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
Stop the execution of the streams, with option of ensuring all received data has been processed.
stopSparkContext
if true, stops the associated SparkContext. The underlying SparkContext will be stopped regardless of whether this StreamingContext has been started.
stopGracefully
if true, stops gracefully by waiting for the processing of all received data to be completed
So the question is how to stop a Structured Streaming Query gracefully?
For PySpark users, this is a Python port of @ASe answer
# Helper method to stop a streaming query
def stop_stream_query(query, wait_time):
"""Stop a running streaming query"""
while query.isActive:
msg = query.status['message']
data_avail = query.status['isDataAvailable']
trigger_active = query.status['isTriggerActive']
if not data_avail and not trigger_active and msg != "Initializing sources":
print('Stopping query...')
query.stop()
time.sleep(0.5)
# Okay wait for the stop to happen
print('Awaiting termination...')
query.awaitTermination(wait_time)
If by "gracefully" you mean that the streaming query should complete processing of data, then void stop()
will not do that. It will just wait until the threads performing execution has stopped (as mentioned in the documentation). Which doesn't mean that it will complete the processing.
For that, we need to make the query wait till current trigger of the query is complete. Which we can check via StreamingQueryStatus
, like this:
while (query.status.isTriggerActive) {//do nothing}
It will wait till the query has completed the processing. And then we can call query.stop()
.
I hope it helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With