I have a structured stream set up that is running just fine, but I was hoping to monitor it while it is running.
I have built an EventCollector
class EventCollector extends StreamingQueryListener{
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println("Start")
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(event.queryStatus.prettyJson)
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println("Term")
}
I have built an EventCollector and added the listener to my spark session
val listener = new EventCollector()
spark.streams.addListener(listener)
Then I fire off the query
val query = inputDF.writeStream
//.format("console")
.queryName("Stream")
.foreach(writer)
.start()
query.awaitTermination()
However, onQueryProgress never gets hit. onQueryStarted does, but I was hoping to get the progress of the query at a certain interval to monitor how the queries are doing. Can anyone assist with this?
Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.
Watermarking is a feature in Spark Structured Streaming that is used to handle the data that arrives late. Spark Structured Streaming can maintain the state of the data that arrives, store it in memory, and update it accurately by aggregating it with the data that arrived late.
Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.
Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.
After much research into this topic, this is what I have found...
OnQueryProgress gets hit in between queries. I am not sure if this intentional functionality or not, but while we are streaming data from a file, the OnQueryProgress does not fire off.
A solution I have found was to rely on the foreach writer sink and perform my own analysis of performance within the process function. Unfortunately, we cannot access specific information about the query that is running. Or, I have not figured out how to yet. This is what I have implemented in my sandbox to analyze performance:
val writer = new ForeachWriter[rawDataRow] {
def open(partitionId: Long, version: Long):Boolean = {
//We end up here in between files
true
}
def process(value: rawDataRow) = {
counter += 1
if(counter % 1000 == 0) {
val currentTime = System.nanoTime()
val elapsedTime = (currentTime - startTime)/1000000000.0
println(s"Records Written: $counter")
println(s"Time Elapsed: $elapsedTime seconds")
}
}
}
An alternative way to get metrics:
Another way to get information about the running queries is to hit the GET endpoint that spark provides us.
http://localhost:4040/metrics
or
http://localhost:4040/api/v1/
Documentation here: http://spark.apache.org/docs/latest/monitoring.html
Update Number 2 Sept 2017: Tested on regular spark streaming, not structured streaming
Disclaimer, this may not apply to structured streaming, I need to set up a test bed to confirm. However, it does work with regular spark streaming(Consuming from Kafka in this example).
I believe, since spark streaming 2.2 has been released, new endpoints exist that can retrieve more metrics on the performance of the stream. This may have existed in previous versions and I just missed it, but I wanted to make sure it was documented for anyone else searching for this information.
http://localhost:4040/api/v1/applications/{applicationIdHere}/streaming/statistics
This is the endpoint that looks like it was added in 2.2 (Or it already existed and was just added the documentation, I'm not sure, I haven't checked).
Anyways, it adds metrics in this format for the streaming application specified:
{
"startTime" : "2017-09-13T14:02:28.883GMT",
"batchDuration" : 1000,
"numReceivers" : 0,
"numActiveReceivers" : 0,
"numInactiveReceivers" : 0,
"numTotalCompletedBatches" : 90379,
"numRetainedCompletedBatches" : 1000,
"numActiveBatches" : 0,
"numProcessedRecords" : 39652167,
"numReceivedRecords" : 39652167,
"avgInputRate" : 771.722,
"avgSchedulingDelay" : 2,
"avgProcessingTime" : 85,
"avgTotalDelay" : 87
}
This gives us the ability to build our own custom metric/monitoring applications using the REST endpoints that are exposed by Spark.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With