Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to dynamically stop Spark Structured Streaming?

In my scenario I have several dataSet that comes every now and then that i need to ingest in our platform. The ingestion processes involves several transformation steps. One of them being Spark. In particular I use spark structured streaming so far. The infrastructure also involve kafka from which spark structured streaming reads data.

I wonder if there is a way to detect when there is nothing else to consume from a topic for a while to decide to stop the job. That is i want to run it for the time it takes to consume that specific dataset and then stop it. For specific reasons we decided not to use the batch version of spark.

Hence is there any timeout or something that can be used to detect that there is no more data coming it and that everything has be processed.

Thank you

like image 450
MaatDeamon Avatar asked Sep 25 '18 12:09

MaatDeamon


People also ask

How do I stop Spark structured streaming?

Now the application will continuously check for that file at specified location. Now , If you want to stop it gracefully, you only need to delete that file and job will stop only after finishing current processing batch as well as queued batches so there is no lose of data.

Do Spark streaming programs run continuously?

Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.


3 Answers

Structured Streaming Monitoring Options

You can use query.lastProgress to get the timestamp and build logic around that. Don't forget to save your checkpoint to a durable, persistent, available store.

like image 151
Michael West Avatar answered Nov 07 '22 14:11

Michael West


Putting together a couple pieces of advice:

  1. As @Michael West pointed out, there are listeners to track progress
  2. From what I gather, Structured Streaming doesn't yet support graceful shutdown

So one option is to periodically check for query activity, dynamically shutting down depending on a configurable state (when you determine no further progress can/should be made):

// where you configure your spark job...
spark.streams.addListener(shutdownListener(spark))

// your job code starts here by calling "start()" on the stream...

// periodically await termination, checking for your shutdown state
while(!spark.sparkContext.isStopped) {
  if (shutdown) {
    println(s"Shutting down since first batch has completed...")
    spark.streams.active.foreach(_.stop())
    spark.stop()
  } else {
    // wait 10 seconds before checking again if work is complete
    spark.streams.awaitAnyTermination(10000)
  }
}

Your listener can dynamically shutdown in a variety of ways. For instance, if you're only waiting on a single batch, then just shutdown after the first update:

var shutdown = false
def shutdownListener(spark: SparkSession) = new StreamingQueryListener() {
  override def onQueryStarted(_: QueryStartedEvent): Unit = println("Query started: " + queryStarted.id)
  override def onQueryTerminated(_: QueryTerminatedEvent): Unit = println("Query terminated! " + queryTerminated.id)
  override def onQueryProgress(_: QueryProgressEvent): Unit = shutdown = true
}

Or, if you need to shutdown after more complicated state changes, you could parse the json body of the queryProgress.progress to determine whether or not to shutdown at a given onQueryUpdate event firing.

like image 38
ecoe Avatar answered Nov 07 '22 14:11

ecoe


You can probably use this:-

def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
    while (query.isActive) {
      try{
        if(query.lastProgress.numInputRows < 10){
          query.awaitTermination(1000)
        }
      }
      catch
      {
        case e:NullPointerException => println("First Batch")
      }
      Thread.sleep(500)
    }
  }

You can make a numInputRows variable.

like image 42
Nilesh Sinha Avatar answered Nov 07 '22 13:11

Nilesh Sinha