Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to know which stage of a job is currently running in Apache Spark?

Consider I have a job as follow in Spark;

CSV File ==> Filter By A Column ==> Taking Sample ==> Save As JSON

Now my requirement is how do I know which step(Fetching file or Filtering or Sampling) of the job is currently executing programatically (Preferably using Java API)? Is there any way for this?

I can track Job,Stage and Task using SparkListener class. And it can be done like tracking a stage Id. But how to know which stage Id is for which step in the job chain.

What I want to send a notification to user when consider Filter By A Column is completed. For that I made a class that extends SparkListener class. But I can not find out from where I can get the name of currently executing transformation name. Is it possible to track at all?

public class ProgressListener extends SparkListener{

  @Override
  public void onJobStart(SparkListenerJobStart jobStart)
  {

  }

  @Override
  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted)
  {
      //System.out.println("Stage Name : "+stageSubmitted.stageInfo().getStatusString()); giving action name only
  }

  @Override
  public void onTaskStart(SparkListenerTaskStart taskStart)
  {
      //no such method like taskStart.name()
  }
}
like image 294
KOUSIK MANDAL Avatar asked Feb 14 '17 11:02

KOUSIK MANDAL


People also ask

How do I track a Spark job?

Click Analytics > Spark Analytics > Open the Spark Application Monitoring Page. Click Monitor > Workloads, and then click the Spark tab. This page displays the user names of the clusters that you are authorized to monitor and the number of applications that are currently running in each cluster.

What is result stage in Spark?

Each Stage can either be a shuffle map stage, in which case its tasks' results are input for another stage, or a result stage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes that each output partition is on.

What is the difference between stage and task in Spark?

A Spark application can have many jobs. A job can have many stages. A stage can have many tasks. A task executes a series of instructions.


1 Answers

You cannot exactly know when, e.g., the filter operation starts or finishes.

That's because you have transformations (filter,map,...) and actions (count, foreach,...). Spark will put as many operations into one stage as possible. Then the stage is executed in parallel on the different partitions of your input. And here comes the problem.

Assume you have several workers and the following program

LOAD ==> MAP ==> FILTER ==> GROUP BY + Aggregation

This program will probably have two stages: the first stage will load the file and apply the map and filter. Then the output will be shuffled to create the groups. In the second stage the aggregation will be performed.

Now, the problem is, that you have several workers and each will process a portion of your input data in parallel. That is, every executor in your cluster will receive a copy of your program(the current stage) and execute this on the assigned partition.

You see, you will have multiple instances of your map and filter operators that are executed in parallel, but not necessarily at the same time. In an extreme case, worker 1 will finish with stage 1 before worker 20 has started at all (and therefore finish with its filter operation before worker 20).

For RDDs Spark uses the iterator model inside a stage. For Datasets in the latest Spark version however, they create a single loop over the partition and execute the transformations. This means that in this case Spark itself does not really know when a transformation operator finished for a single task!

Long story short:

  1. You are not able the know when an operation inside a stage finishes
  2. Even if you could, there are multiple instances that will finish at different times.

So, now I already had the same problem:

In our Piglet project (please allow some adverstisement ;-) ) we generate Spark code from Pig Latin scripts and wanted to profile the scripts. I ended up in inserting mapPartition operator between all user operators that will send the partition ID and the current time to a server which will evaluate the messages. However, this solution also has its limitations... and I'm not completely satisfied yet.

However, unless you are able to modify the programs I'm afraid you cannot achieve what you want.

like image 116
hage Avatar answered Oct 17 '22 06:10

hage