Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe Nested Case When Statement

I need to implement the below SQL logic in Spark DataFrame

SELECT KEY,
    CASE WHEN tc in ('a','b') THEN 'Y'
         WHEN tc in ('a') AND amt > 0 THEN 'N'
         ELSE NULL END REASON,
FROM dataset1;

My input DataFrame is as below:

val dataset1 = Seq((66, "a", "4"), (67, "a", "0"), (70, "b", "4"), (71, "d", "4")).toDF("KEY", "tc", "amt")

dataset1.show()
+---+---+---+
|KEY| tc|amt|
+---+---+---+
| 66|  a|  4|
| 67|  a|  0|
| 70|  b|  4|
| 71|  d|  4|
+---+---+---+

I have implement the nested case when statement as:

dataset1.withColumn("REASON", when(col("tc").isin("a", "b"), "Y")
  .otherwise(when(col("tc").equalTo("a") && col("amt").geq(0), "N")
    .otherwise(null))).show()
+---+---+---+------+
|KEY| tc|amt|REASON|
+---+---+---+------+
| 66|  a|  4|     Y|
| 67|  a|  0|     Y|
| 70|  b|  4|     Y|
| 71|  d|  4|  null|
+---+---+---+------+

Readability of the above logic with "otherwise" statement is little messy if the nested when statements goes further.

Is there any better way of implementing nested case when statements in Spark DataFrames?

like image 592
RaAm Avatar asked Oct 09 '17 07:10

RaAm


People also ask

How do you use when and otherwise in PySpark?

1. Using when() otherwise() on PySpark DataFrame. PySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column , when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition).

Can we use CASE statement in Spark SQL?

Like SQL "case when" statement and “ Swith" , "if then else" statement from popular programming languages, Spark SQL Dataframe also supports similar syntax using “ when otherwise ” or we can also use “ case when ” statement.

How do I use ISIN function in PySpark?

PySpark isin() or IN operator is used to check/filter if the DataFrame values are exists/contains in the list of values. isin() is a function of Column class which returns a boolean value True if the value of the expression is contained by the evaluated values of the arguments.


2 Answers

There is no nesting here, therefore there is no need for otherwise. All you need is chained when:

import spark.implicits._

when($"tc" isin ("a", "b"), "Y")
  .when($"tc" === "a" && $"amt" >= 0, "N")

ELSE NULL is implicit so you can omit it completely.

Pattern you use, is more more applicable for folding over a data structure:

val cases = Seq(
  ($"tc" isin ("a", "b"), "Y"),
  ($"tc" === "a" && $"amt" >= 0, "N")
)

where when - otherwise naturally follows recursion pattern and null provides the base case.

cases.foldLeft(lit(null)) {
  case (acc, (expr, value)) => when(expr, value).otherwise(acc)
}

Please note, that it is impossible to reach "N" outcome, with this chain of conditions. If tc is equal to "a" it will be captured by the first clause. If it is not, it will fail to satisfy both predicates and default to NULL. You should rather:

when($"tc" === "a" && $"amt" >= 0, "N")
 .when($"tc" isin ("a", "b"), "Y")
like image 57
zero323 Avatar answered Sep 18 '22 23:09

zero323


For more complex logic, I prefer to use UDFs for better readability:

val selectCase = udf((tc: String, amt: String) =>
  if (Seq("a", "b").contains(tc)) "Y"
  else if (tc == "a" && amt.toInt <= 0) "N"
  else null
)


dataset1.withColumn("REASON", selectCase(col("tc"), col("amt")))
  .show
like image 24
Raphael Roth Avatar answered Sep 22 '22 23:09

Raphael Roth