I am trying to avoid "while(true)" solution when i waiting until my spark apache job is done, but without success.
I have spark application which suppose to process some data and put a result to database, i do call it from my spring service and would like to wait until the job is done.
Example:
Launcher with method:
@Override
public void run(UUID docId, String query) throws Exception {
launcher.addAppArgs(docId.toString(), query);
SparkAppHandle sparkAppHandle = launcher.startApplication();
sparkAppHandle.addListener(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
System.out.println(handle.getState() + " new state");
}
@Override
public void infoChanged(SparkAppHandle handle) {
System.out.println(handle.getState() + " new state");
}
});
System.out.println(sparkAppHandle.getState().toString());
}
How to wait properly until state of handler is "Finished".
What happens when a Spark Job is submitted? When a client submits a spark user application code, the driver implicitly converts the code containing transformations and actions into a logical directed acyclic graph (DAG).
By default, Spark's scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc.
So, how do you run the spark in local mode? It is very simple. When we do not specify any --master flag to the command spark-shell, pyspark, spark-submit, or any other binary, it is running in local mode. Or we can specify --master option with local as argument which defaults to 1 thread.
I am also using SparkLauncher from a Spring application. Here is a summary of the approach that I took (by following examples in the JavaDoc).
The @Service used to launch the job also implements SparkHandle.Listener and passes a reference to itself via .startApplication e.g.
...
...
@Service
public class JobLauncher implements SparkAppHandle.Listener {
...
...
...
private SparkAppHandle launchJob(String mainClass, String[] args) throws Exception {
String appResource = getAppResourceName();
SparkAppHandle handle = new SparkLauncher()
.setAppResource(appResource).addAppArgs(args)
.setMainClass(mainClass)
.setMaster(sparkMaster)
.setDeployMode(sparkDeployMode)
.setSparkHome(sparkHome)
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.startApplication(this);
LOG.info("Launched [" + mainClass + "] from [" + appResource + "] State [" + handle.getState() + "]");
return handle;
}
/**
* Callback method for changes to the Spark Job
*/
@Override
public void infoChanged(SparkAppHandle handle) {
LOG.info("Spark App Id [" + handle.getAppId() + "] Info Changed. State [" + handle.getState() + "]");
}
/**
* Callback method for changes to the Spark Job's state
*/
@Override
public void stateChanged(SparkAppHandle handle) {
LOG.info("Spark App Id [" + handle.getAppId() + "] State Changed. State [" + handle.getState() + "]");
}
Using this approach, one can take action when the state changes to "FAILED", "FINISHED" or "KILLED".
I hope this information is helpful to you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With