I have a bash script that every night starts my standalone EC2 Spark cluster and executes an application. I would like to be noticed when the application has finished so I can stop the cluster.
I was wondering if there is some sort of callbacks based on spark application status.
I'm pretty new to spark so another hint to solve this will be appreciated.
Thanks.
UPDATE:
With the json provided by http://<master-host>:8080/metrics/master/json
or http://<master-host>:8080/metrics/applications/json
I can get the status of the application (WAITING,RUNNING,FINISHED) but I can't get the status of the driver that will tell you if the execution FAILED. I'm sure that there must be an specific configuration for the metrics to show this, but I couldn't find it.
In order to get this kind of status I scrapped the web UI provided in http://<master-host>:8080
to find the driver executing my application and get its status.
spark-submit --status $submitID --master $master 2>&1 | grep "success"
Disclaimer : This example requires code changes, has some service layout assumptions, and uses some internal-ish Spark classes.
After reading about hidden rest-apis and trying to wrap the SparkSubmit class to get Future objects, I found the SparkListener class. It has onJobStart/End, onApplicationStart/End etc for whatever granularity you require.
Here's a rough proof of concept for Jobs in a application's main method :
//... build spark conf
val sparkContext = new SparkContext(sparkConf)
//programmatically register listener
sparkContext.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
println(s"[ ${jobStart.jobId} ] Job started.")
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
println(s"[ ${jobEnd.jobId} ] Job completed with Result : ${jobEnd.jobResult}")
//(our other services already rely on ActiveMQ)
val connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616")
val connection = connectionFactory.createConnection
connection.setClientID("Client_" + Math.random())
connection.start
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val sendQueue = session.createQueue("job_queue.spark_job_completed")
val producer = session.createProducer(sendQueue)
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
val textMessage = session.createTextMessage( s"""{\"jobId\" : \"${jobEnd.jobId}\", \"jobResult\" : \"${jobEnd.jobResult}\"}""")
producer.send(textMessage)
connection.close
}
//api just sends the time :/
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
println(s"[ ${applicationEnd.time} ] Application Completed.")
}
})
// ... do spark work
Our team needed to notify external applications when Spark jobs/applications completed in Spark 1.5.2. Also, the Spark UI wasn't easily available without lots of port-forwarding so it could now integrate with preexisting monitoring tools.
Sources :
I use shell/ bash script to submit spark application, so here is the example how i get alerts of the application using bash do while loop , you should do spark submit in shell.
Spark-submit ........
once you do spark submit , sleep for 5 seconds
sleep 5s
Then start checking the status of your application by below do while loop , replace with your app name :-
current_status=$(yarn application --list | grep <your_application_name> | sort -n | tail -1 |awk -F' ' '{print $6}')
application_number=$(yarn application --list | grep <your_application_name> | sort -n | tail -1 |awk -F' ' '{print $1}')
while true; do
current_status=$(yarn application -status $application_number | sed -n '10p' | awk -F':' '{print $2}')
if [ ${current_status} == "RUNNING" ];then
continue
else
current_status_2=$(yarn application -status $application_number | sed -n '11p' | awk -F':' '{print $2}')
if [ ${current_status_2} == "SUCCEEDED" ];then
echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@ SPARK APPLICATION SUCCEEDED WITH $application_number ">> /log_folder/logfile`date +"%Y_%m_%d"`.log
elif [ ${current_status_2} == "FAILED" ];then
echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@ SPARK APPLICATION FAILED WITH $application_number">> /log_folder/logfile`date +"%Y_%m_%d"`.log
elif [ ${current_status_2} == "KILLED" ];then
echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@ SPARK APPLICATION KILLED WITH $application_number ">> /log_folder/logfile`date +"%Y_%m_%d"`.log
break;
fi
fi
sleep 5s
check=$(yarn application -status $application_number | sed -n '11p' | awk -F':' '{print $2}')
if [ ${check} == "UNDEFINED" ];then
continue
else
break;
fi
done
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With