I have Spark streaming application which basically gets a trigger message from Kafka which kick starts the batch processing which could potentially take up to 2 hours.
There were incidents where some of the jobs were hanging indefinitely and didn't get completed within the usual time and currently there is no way we could figure out the status of the job without checking the Spark UI manually. I want to have a way where the currently running spark jobs are hanging or not. So basically if it's hanging for more than 30 minutes I want to notify the users so they can take an action. What all options do I have?
I see I can use metrics from driver and executors. If I were to choose the most important one, it would be the last received batch records. When StreamingMetrics.streaming.lastReceivedBatch_records == 0
it probably means that Spark streaming job has been stopped or failed.
But in my scenario, I will receive only 1 streaming trigger event and then it will kick start the processing which may take up to 2 hours so I won't be able to rely on the records received.
Is there a better way? TIA
Increase driver and executor memory Out of memory issues and random crashes of the application were solved by increasing the memory from 20g per executor to 40g per executor as well as 40g for the driver. Happily, the machines in the production cluster were heavily provisioned with memory.
A checkpoint helps build fault-tolerant and resilient Spark applications. Spark Structured Streaming maintains an intermediate state on HDFS compatible file systems to recover from failures. To specify the checkpoint in a streaming query, we use the checkpointLocation parameter.
Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2.
Maybe a simple solution like.
At the start of the processing - launch a waiting thread.
val TWO_HOURS = 2 * 60 * 60 * 1000
val t = new Thread(new Runnable {
override def run(): Unit = {
try {
Thread.sleep(TWO_HOURS)
// send an email that job didn't end
} catch {
case _: Exception => _
}
}
})
And in the place where you can say that batch processing is ended
t.interrupt()
If processing is done within 2 hours - waiter thread is interrupted and e-mail is not sent. If processing is not done - e-mail will be sent.
Let me draw your attention towards Streaming Query listeners. These are quite amazing lightweight things that can monitor your streaming query progress.
In an application that has multiple queries, you can figure out which queries are lagging or have stopped due to some exception.
Please find below sample code to understand its implementation. I hope that you can use this and convert this piece to better suit your needs. Thanks!
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: QueryStartedEvent) {
//logger message to show that the query has started
}
override def onQueryProgress(event: QueryProgressEvent) {
synchronized {
if(event.progress.name.equalsIgnoreCase("QueryName"))
{
recordsReadCount = recordsReadCount + event.progress.numInputRows
//Logger messages to show continuous progress
}
}
}
override def onQueryTerminated(event: QueryTerminatedEvent) {
synchronized {
//logger message to show the reason of termination.
}
}
})
YARN provides the REST API to check the status of application and status of cluster resource utilization as well.
with API call it will give a list of running applications and their start times and other details. you can have simple REST client that triggers maybe once in every 30 min or so and check if the job is running for more than 2 hours then send a simple mail alert.
Here is the API documentation:
https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With