We use a Spark cluster as yarn-client
to calculate several business, but sometimes we have a task run too long time:
We don't set timeout but I think default timeout a spark task is not too long such here ( 1.7h ).
Anyone give me an ideal to work around this issue ???
In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
Spark supports two modes for running on YARN, “yarn-cluster” mode and “yarn-client” mode.
Spark application can be submitted in two different ways – cluster mode and client mode. In cluster mode, the driver will get started within the cluster in any of the worker machines. So, the client can fire the job and forget it. In client mode, the driver will get started within the client.
There is no way for spark to kill its tasks if its taking too long.
But I figured out a way to handle this using speculation,
This means if one or more tasks are running slowly in a stage, they will be re-launched.
spark.speculation true
spark.speculation.multiplier 2
spark.speculation.quantile 0
Note: spark.speculation.quantile
means the "speculation" will kick in from your first task. So use it with caution. I am using it because some jobs get slowed down due to GC over time. So I think you should know when to use this - its not a silver bullet.
Some relevant links: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-always-wait-for-stragglers-to-finish-running-td14298.html and http://mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%3CCAPmMX=rOVQf7JtDu0uwnp1xNYNyz4xPgXYayKex42AZ_9Pvjug@mail.gmail.com%3E
Update
I found a fix for my issue (might not work for everyone). I had a bunch of simulations running per task, so I added timeout around the run. If a simulation is taking longer (due to a data skew for that specific run), it will timeout.
ExecutorService executor = Executors.newCachedThreadPool();
Callable<SimResult> task = () -> simulator.run();
Future<SimResult> future = executor.submit(task);
try {
result = future.get(1, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
future.cancel(true);
SPARKLOG.info("Task timed out");
}
Make sure you handle an interrupt inside the simulator
's main loop like:
if(Thread.currentThread().isInterrupted()){
throw new InterruptedException();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With