Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly wait for apache spark launcher job during launching it from another application?

I am trying to avoid "while(true)" solution when i waiting until my spark apache job is done, but without success.

I have spark application which suppose to process some data and put a result to database, i do call it from my spring service and would like to wait until the job is done.

Example:

Launcher with method:

@Override
public void run(UUID docId, String query) throws Exception {
    launcher.addAppArgs(docId.toString(), query);

    SparkAppHandle sparkAppHandle = launcher.startApplication();

    sparkAppHandle.addListener(new SparkAppHandle.Listener() {
        @Override
        public void stateChanged(SparkAppHandle handle) {
            System.out.println(handle.getState() + " new  state");
        }

        @Override
        public void infoChanged(SparkAppHandle handle) {
            System.out.println(handle.getState() + " new  state");
        }
    });

    System.out.println(sparkAppHandle.getState().toString());
}

How to wait properly until state of handler is "Finished".

like image 788
Alex Aniska Avatar asked Mar 28 '16 14:03

Alex Aniska


People also ask

What happens when Spark job is submitted?

What happens when a Spark Job is submitted? When a client submits a spark user application code, the driver implicitly converts the code containing transformations and actions into a logical directed acyclic graph (DAG).

How do I schedule a Spark job in production?

By default, Spark's scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc.

How do I run a Spark job in local mode?

So, how do you run the spark in local mode? It is very simple. When we do not specify any --master flag to the command spark-shell, pyspark, spark-submit, or any other binary, it is running in local mode. Or we can specify --master option with local as argument which defaults to 1 thread.


1 Answers

I am also using SparkLauncher from a Spring application. Here is a summary of the approach that I took (by following examples in the JavaDoc).

The @Service used to launch the job also implements SparkHandle.Listener and passes a reference to itself via .startApplication e.g.

...
...
@Service
public class JobLauncher implements SparkAppHandle.Listener {
...
...
...
private SparkAppHandle launchJob(String mainClass, String[] args) throws Exception {

    String appResource = getAppResourceName();

    SparkAppHandle handle = new SparkLauncher()
        .setAppResource(appResource).addAppArgs(args)
        .setMainClass(mainClass)
        .setMaster(sparkMaster)
        .setDeployMode(sparkDeployMode)
        .setSparkHome(sparkHome)
        .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
        .startApplication(this);

    LOG.info("Launched [" + mainClass + "] from [" + appResource + "] State [" + handle.getState() + "]");

    return handle;
}

/**
* Callback method for changes to the Spark Job
*/
@Override
public void infoChanged(SparkAppHandle handle) {

    LOG.info("Spark App Id [" + handle.getAppId() + "] Info Changed.  State [" + handle.getState() + "]");

}

/**
* Callback method for changes to the Spark Job's state
*/
@Override
public void stateChanged(SparkAppHandle handle) {

    LOG.info("Spark App Id [" + handle.getAppId() + "] State Changed. State [" + handle.getState() + "]");

}

Using this approach, one can take action when the state changes to "FAILED", "FINISHED" or "KILLED".

I hope this information is helpful to you.

like image 164
tegatai Avatar answered Sep 18 '22 19:09

tegatai