Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark, Incorrect behaviour when throwing SparkException in EMR

I'm running a spark job in EMR with YARN as resource manager and on 2 nodes. I need to purposefully fail the step if my condition is not met, so the next step doesn't execute as per the configuration. To achieve this I'm throwing a custom exception, after inserting a log message in dynamoDB.

It runs fine but the record in Dynamo is getting inserted twice.

Below is my code.

if(<condition>) {
  <method call to insert in dynamo> 
  throw new SparkException(<msg>);
  return;
}

If I remove the line to throw exception, it works fine but the step is completed.

How can I make the step fail, without getting the log message twice.

Thanks for the help.

Regards, Sorabh

like image 220
Sorabh Kumar Avatar asked Sep 26 '17 06:09

Sorabh Kumar


People also ask

What is Spark stage failure?

In Spark, stage failures happen when there's a problem with processing a Spark task. These failures can be caused by hardware issues, incorrect Spark configurations, or code problems. When a stage failure occurs, the Spark driver logs report an exception similar to the following: org.

Does EMR Spark use HDFS?

HDFS and EMRFS are the two main file systems used with Amazon EMR. Beginning with Amazon EMR release version 5.22.

How do I start my EMR Spark?

You can access the Spark shell by connecting to the master node with SSH and invoking spark-shell . For more information about connecting to the master node, see Connect to the master node using SSH in the Amazon EMR Management Guide. The following examples use Apache HTTP Server access logs stored in Amazon S3.


1 Answers

Probably the reason your dynamo message was inserted twice was because your error condition was hit and processed by two different executors. Spark is dividing up the work to be done among it's workers, and those workers don't share any knowledge.

I'm not sure what is driving your requirement to have the Spark step FAIL, but I would suggest instead tracking that failure case in your application code instead of trying to have spark die directly. In other words, write code that detects the error and passes that back to your spark driver, then act on it as appropriate.

One way to do this would be to use an accumulator to count any errors that occur as you are processing your data. It would look something roughly like this (I'm assuming scala and DataFrames, but you can adapt to RDD's and/or python as needed):

val accum = sc.longAccumulator("Error Counter")
def doProcessing(a: String, b: String): String = {
   if(condition) {
     accum.add(1)
     null
   }
   else {
     doComputation(a, b)
   }
}
val doProcessingUdf = udf(doProcessing _)

df = df.withColumn("result", doProcessing($"a", $"b"))

df.write.format(..).save(..)  // Accumulator value not computed until an action occurs!

if(accum.value > 0) {
    // An error detected during computation! Do whatever needs to be done.
    <insert dynamo message here>
}

One nice thing about this approach is if you are looking for feedback in the Spark UI you will be able to see the accumulator values there while it is running. For reference, here is the documentation on accumulators: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

like image 145
Ryan Widmaier Avatar answered Nov 11 '22 17:11

Ryan Widmaier