Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL exception handling

In order to handle Spark exception on RDD operations I can use the following approach with additional exceptions column:

val df: DataFrame = ...

val rddWithExcep = df.rdd.map { row: Row =>
  val memberIdStr = row.getAs[String]("member_id")
  val memberIdInt = Try(memberIdStr.toInt) match {
    case Success(integer) => List(integer, null)
    case Failure(ex) => List(null, ex.toString)
  }
  Row.fromSeq(row.toSeq.toList ++ memberIdInt)
}

val castWithExceptionSchema = StructType(df.schema.fields ++ Array(StructField("member_id_int", IntegerType, true)
  , StructField("exceptions", StringType, true)))

val castExcepDf = sparkSession.sqlContext.createDataFrame(rddWithExcep, castWithExceptionSchema)

castExcepDf.printSchema()
castExcepDf.show()

Is it possible to handle such exception on Spark SQL? For example, currently in case of any errors, Spark SQL simply returns null value and hides the error.

For example division by 0 will be resulted into null value and not into an error.. In my opinion - this is a very serious issue in Spark SQL because it can simple produces unexpected/wrong data that you won't even notice.

Is it possible to override this behavior and let Spark fail with an appropriate detailed exception?

like image 510
alexanoid Avatar asked Nov 12 '19 13:11

alexanoid


People also ask

How does Spark handle out of memory exception?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

What is permissive mode in Spark?

The PERMISSIVE mode sets to null field values when corrupted records are detected. By default, if you don't specify the parameter mode, Spark sets the PERMISSIVE value. from pyspark. sql import SparkSession from pyspark.

How do you throw an exception in Pyspark?

As a Python developer you can choose to throw an exception if a condition occurs. To throw (or raise) an exception, use the raise keyword.


1 Answers

Since Spark 3.0, you can set property spark.sql.ansi.enabled to true in your spark session to throw exception and stop spark execution instead of saving null value in column. However, failure will be global and not on row per row basis. See ANSI Compliance page on Spark's documentation website for more details.

So the following code snippet:

sparkSession.conf.set("spark.sql.ansi.enabled", "true")

Seq(1, 2, 3).toDF("MyCol")
  .withColumn("divideByZero", col("MyCol") / 0)
  .show(false)

throws the following exception:

Exception in thread "main" org.apache.spark.SparkArithmeticException: divide by zero
    at org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:140)
    at org.apache.spark.sql.catalyst.expressions.DivModLike.eval(arithmetic.scala:437)
    at org.apache.spark.sql.catalyst.expressions.DivModLike.eval$(arithmetic.scala:425)
    at org.apache.spark.sql.catalyst.expressions.Divide.eval(arithmetic.scala:534)
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:168)
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.$anonfun$applyOrElse$80(Optimizer.scala:1840)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1840)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1835)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
    at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
    at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
    at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:447)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1835)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1833)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
    at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
    ...
like image 75
Vincent Doba Avatar answered Oct 23 '22 11:10

Vincent Doba