Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stop Structured Streaming query gracefully

I'm using Spark 2.1 and trying to stop a Streaming query gracefully.

Is StreamingQuery.stop() a graceful stop because I haven't seen any detailed information on this method in the documentation:

void stop() Stops the execution of this query if it is running. This method blocks until the threads performing execution has stopped. Since: 2.0.0

Whereas in the past Streaming world (DStreams) there is an option to stop the execution of the streams, with option of ensuring all received data has been processed:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit Stop the execution of the streams, with option of ensuring all received data has been processed.

stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext will be stopped regardless of whether this StreamingContext has been started.

stopGracefully if true, stops gracefully by waiting for the processing of all received data to be completed

So the question is how to stop a Structured Streaming Query gracefully?

like image 265
shiv455 Avatar asked Aug 16 '17 15:08

shiv455


2 Answers

For PySpark users, this is a Python port of @ASe answer

# Helper method to stop a streaming query
def stop_stream_query(query, wait_time):
    """Stop a running streaming query"""
    while query.isActive:
        msg = query.status['message']
        data_avail = query.status['isDataAvailable']
        trigger_active = query.status['isTriggerActive']
        if not data_avail and not trigger_active and msg != "Initializing sources":
            print('Stopping query...')
            query.stop()
        time.sleep(0.5)

    # Okay wait for the stop to happen
    print('Awaiting termination...')
    query.awaitTermination(wait_time)
like image 167
Brian Wylie Avatar answered Oct 25 '22 20:10

Brian Wylie


If by "gracefully" you mean that the streaming query should complete processing of data, then void stop() will not do that. It will just wait until the threads performing execution has stopped (as mentioned in the documentation). Which doesn't mean that it will complete the processing.

For that, we need to make the query wait till current trigger of the query is complete. Which we can check via StreamingQueryStatus, like this:

while (query.status.isTriggerActive) {//do nothing}

It will wait till the query has completed the processing. And then we can call query.stop().

I hope it helps!

like image 29
himanshuIIITian Avatar answered Oct 25 '22 18:10

himanshuIIITian