Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Monitoring Structured Streaming

I have a structured stream set up that is running just fine, but I was hoping to monitor it while it is running.

I have built an EventCollector

class EventCollector extends StreamingQueryListener{
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
    println("Start")
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    println(event.queryStatus.prettyJson)
  }

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println("Term")
  }

I have built an EventCollector and added the listener to my spark session

val listener = new EventCollector()
spark.streams.addListener(listener)

Then I fire off the query

val query = inputDF.writeStream
  //.format("console")
  .queryName("Stream")
  .foreach(writer)
  .start()

query.awaitTermination()

However, onQueryProgress never gets hit. onQueryStarted does, but I was hoping to get the progress of the query at a certain interval to monitor how the queries are doing. Can anyone assist with this?

like image 751
Leyth G Avatar asked Dec 02 '16 17:12

Leyth G


People also ask

What is structured Streaming?

Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.

How do you handle late data in structured Streaming?

Watermarking is a feature in Spark Structured Streaming that is used to handle the data that arrives late. Spark Structured Streaming can maintain the state of the data that arrives, store it in memory, and update it accurately by aggregating it with the data that arrived late.

What is difference between DStream and structured Streaming?

Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.

What is readStream in Spark?

Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.


1 Answers

After much research into this topic, this is what I have found...

OnQueryProgress gets hit in between queries. I am not sure if this intentional functionality or not, but while we are streaming data from a file, the OnQueryProgress does not fire off.

A solution I have found was to rely on the foreach writer sink and perform my own analysis of performance within the process function. Unfortunately, we cannot access specific information about the query that is running. Or, I have not figured out how to yet. This is what I have implemented in my sandbox to analyze performance:

val writer = new ForeachWriter[rawDataRow] {
    def open(partitionId: Long, version: Long):Boolean = {
        //We end up here in between files
        true
    }
    def process(value: rawDataRow) = {
        counter += 1

        if(counter % 1000 == 0) {
            val currentTime = System.nanoTime()
            val elapsedTime = (currentTime - startTime)/1000000000.0

            println(s"Records Written:  $counter")
            println(s"Time Elapsed: $elapsedTime seconds")
        }
     }
}

An alternative way to get metrics:

Another way to get information about the running queries is to hit the GET endpoint that spark provides us.

http://localhost:4040/metrics

or

http://localhost:4040/api/v1/

Documentation here: http://spark.apache.org/docs/latest/monitoring.html

Update Number 2 Sept 2017: Tested on regular spark streaming, not structured streaming

Disclaimer, this may not apply to structured streaming, I need to set up a test bed to confirm. However, it does work with regular spark streaming(Consuming from Kafka in this example).

I believe, since spark streaming 2.2 has been released, new endpoints exist that can retrieve more metrics on the performance of the stream. This may have existed in previous versions and I just missed it, but I wanted to make sure it was documented for anyone else searching for this information.

http://localhost:4040/api/v1/applications/{applicationIdHere}/streaming/statistics

This is the endpoint that looks like it was added in 2.2 (Or it already existed and was just added the documentation, I'm not sure, I haven't checked).

Anyways, it adds metrics in this format for the streaming application specified:

{
  "startTime" : "2017-09-13T14:02:28.883GMT",
  "batchDuration" : 1000,
  "numReceivers" : 0,
  "numActiveReceivers" : 0,
  "numInactiveReceivers" : 0,
  "numTotalCompletedBatches" : 90379,
  "numRetainedCompletedBatches" : 1000,
  "numActiveBatches" : 0,
  "numProcessedRecords" : 39652167,
  "numReceivedRecords" : 39652167,
  "avgInputRate" : 771.722,
  "avgSchedulingDelay" : 2,
  "avgProcessingTime" : 85,
  "avgTotalDelay" : 87
}

This gives us the ability to build our own custom metric/monitoring applications using the REST endpoints that are exposed by Spark.

like image 165
Leyth G Avatar answered Oct 08 '22 08:10

Leyth G