Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement custom job listener/tracker in Spark?

I have a class like below, and when i run this through command line i want to see progress status. some thing like,

10% completed... 
30% completed... 
100% completed...Job done!

I am using spark 1.0 on yarn and using Java API.

public class MyJavaWordCount {
    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: MyJavaWordCount <master> <file>");
            System.exit(1);
        }
        System.out.println("args[0]: <master>="+args[0]);
        System.out.println("args[1]: <file>="+args[1]);

        JavaSparkContext ctx = new JavaSparkContext(
                args[0],
                "MyJavaWordCount",
                System.getenv("SPARK_HOME"),
                System.getenv("SPARK_EXAMPLES_JAR"));
        JavaRDD<String> lines = ctx.textFile(args[1], 1);

//      output                                            input   output         
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            //              output       input 
            public Iterable<String> call(String s) {
                return Arrays.asList(s.split(" "));
            }
        });

//          K       V                                                input   K       V 
        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            //            K       V             input 
            public Tuple2<String, Integer> call(String s) {
                //                K       V 
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2 tuple : output) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
        System.exit(0);
    }
}
like image 382
user3705662 Avatar asked Jun 28 '14 02:06

user3705662


People also ask

How do I track my Spark job?

Click Analytics > Spark Analytics > Open the Spark Application Monitoring Page. Click Monitor > Workloads, and then click the Spark tab. This page displays the user names of the clusters that you are authorized to monitor and the number of applications that are currently running in each cluster.

What is Sparklistener?

Spark listeners API allows developers to track events which Spark emits during application execution. Those events are typically application start/end, job start/end, stage start/end etc. You can find the full list in Spark JavaDoc. It's easy to configure and easy to use Spark Listeners to grab metrics.

How do you cancel all jobs in Spark that have been scheduled?

You can use following methods to cancel the running spark job : cancelJobGroup(String group Id) - Cancel active jobs for the specified group. cancelAllJobs() - Cancel all jobs that have been scheduled or are running.

What is Spark context used for?

A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Only one SparkContext should be active per JVM.


2 Answers

If you are using scala-spark this code will help you to adding spark listener.

Create your SparkContext

val sc=new SparkContext(sparkConf) 

Now you can add your spark listener in spark context

sc.addSparkListener(new SparkListener() {
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
    println("Spark ApplicationStart: " + applicationStart.appName);
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
    println("Spark ApplicationEnd: " + applicationEnd.time);
  }

});

Here is the list of Interface for listening to events from the Spark schedule.

like image 56
Gabber Avatar answered Sep 16 '22 17:09

Gabber


You should implement SparkListener. Just override whatever events you are interested in (job/stage/task start/end events), then call sc.addSparkListener(myListener).

It does not give you a straight-up percentage-based progress tracker, but at least you can track that progress is being made and its rough rate. The difficulty comes from how unpredictable the number of Spark stages can be, and also how the running times of each stage can be vastly different. The progress within a stage should be more predictable.

like image 29
Daniel Darabos Avatar answered Sep 16 '22 17:09

Daniel Darabos