Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark application finished callback

I have a bash script that every night starts my standalone EC2 Spark cluster and executes an application. I would like to be noticed when the application has finished so I can stop the cluster.

I was wondering if there is some sort of callbacks based on spark application status.

I'm pretty new to spark so another hint to solve this will be appreciated.

Thanks.

UPDATE:

With the json provided by http://<master-host>:8080/metrics/master/json or http://<master-host>:8080/metrics/applications/json I can get the status of the application (WAITING,RUNNING,FINISHED) but I can't get the status of the driver that will tell you if the execution FAILED. I'm sure that there must be an specific configuration for the metrics to show this, but I couldn't find it.

In order to get this kind of status I scrapped the web UI provided in http://<master-host>:8080 to find the driver executing my application and get its status.

enter image description here

like image 739
Marco Avatar asked Nov 26 '14 17:11

Marco


3 Answers

spark-submit --status $submitID --master $master 2>&1 | grep "success"
like image 136
Jeff Tsai Avatar answered Nov 14 '22 18:11

Jeff Tsai


Disclaimer : This example requires code changes, has some service layout assumptions, and uses some internal-ish Spark classes.

After reading about hidden rest-apis and trying to wrap the SparkSubmit class to get Future objects, I found the SparkListener class. It has onJobStart/End, onApplicationStart/End etc for whatever granularity you require.

Here's a rough proof of concept for Jobs in a application's main method :

//... build spark conf
val sparkContext = new SparkContext(sparkConf)
//programmatically register listener
sparkContext.addSparkListener(new SparkListener {

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    println(s"[ ${jobStart.jobId} ] Job started.")
  }

  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    println(s"[ ${jobEnd.jobId} ] Job completed with Result : ${jobEnd.jobResult}")
    //(our other services already rely on ActiveMQ)
    val connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616")
    val connection = connectionFactory.createConnection
    connection.setClientID("Client_" + Math.random())
    connection.start

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val sendQueue = session.createQueue("job_queue.spark_job_completed")

    val producer = session.createProducer(sendQueue)
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

    val textMessage = session.createTextMessage( s"""{\"jobId\" : \"${jobEnd.jobId}\", \"jobResult\" : \"${jobEnd.jobResult}\"}""")

    producer.send(textMessage)

    connection.close
  }

  //api just sends the time :/
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    println(s"[ ${applicationEnd.time} ] Application Completed.")
  }
})
// ... do spark work

Our team needed to notify external applications when Spark jobs/applications completed in Spark 1.5.2. Also, the Spark UI wasn't easily available without lots of port-forwarding so it could now integrate with preexisting monitoring tools.

Sources :

  1. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/exercises/spark-exercise-custom-scheduler-listener.html
  2. https://myadventuresincoding.wordpress.com/2011/10/15/jms-how-to-do-synchronous-messaging-with-activemq-in-scala/
  3. http://arturmkrtchyan.com/apache-spark-hidden-rest-api
like image 2
Tom H Avatar answered Nov 14 '22 18:11

Tom H


I use shell/ bash script to submit spark application, so here is the example how i get alerts of the application using bash do while loop , you should do spark submit in shell.

Spark-submit ........

once you do spark submit , sleep for 5 seconds

sleep 5s

Then start checking the status of your application by below do while loop , replace with your app name :-

 current_status=$(yarn application --list | grep <your_application_name> | sort -n | tail -1 |awk -F' ' '{print $6}')
application_number=$(yarn application --list | grep <your_application_name> | sort -n | tail -1 |awk -F' ' '{print $1}')

while true; do
  current_status=$(yarn application -status $application_number | sed -n '10p' | awk -F':' '{print $2}')


if [ ${current_status} == "RUNNING" ];then
continue
else
current_status_2=$(yarn application -status $application_number  | sed -n '11p' | awk -F':' '{print $2}')

    if [ ${current_status_2} == "SUCCEEDED" ];then
    echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@  SPARK APPLICATION SUCCEEDED WITH $application_number ">> /log_folder/logfile`date +"%Y_%m_%d"`.log

    elif [ ${current_status_2} == "FAILED" ];then
    echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@  SPARK APPLICATION FAILED WITH $application_number">> /log_folder/logfile`date +"%Y_%m_%d"`.log

    elif [ ${current_status_2} == "KILLED" ];then
    echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@  SPARK APPLICATION KILLED WITH $application_number ">> /log_folder/logfile`date +"%Y_%m_%d"`.log

    break;
    fi

fi
sleep 5s
  check=$(yarn application -status $application_number | sed -n '11p' | awk -F':' '{print $2}')
if [ ${check} == "UNDEFINED" ];then
continue
else
break;
fi

done
like image 1
j pavan kumar Avatar answered Nov 14 '22 20:11

j pavan kumar