Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use a partial function composed with orElse as a udf in spark

As the question states, I would like to use a partial function, composed with orElse, as a udf in spark. Here is an example that can be run in spark shell:

val df = sc.parallelize(1 to 15).toDF("num")
df.show

//Testing out a normal udf - this works
val gt5: (Int => String) = num => (num > 5).toString
val gt5Udf = udf(gt5)
df.withColumn("gt5", gt5Udf(col("num"))).show

//Now create a udf of a partial function composed with orElse
val baseline: PartialFunction[Int, String] = { case _ => "baseline" }
val ge3: PartialFunction[Int, String] = { case x if x >= 3 => ">=3" }
val ge7: PartialFunction[Int, String] = { case x if x >= 7 => ">=7" }
val ge12: PartialFunction[Int, String] = { case x if x >= 12 => ">=12" }

val composed: PartialFunction[Int, String] = ge12 orElse ge7 orElse ge3 orElse baseline
val composedUdf = udf(composed)

//This fails (but this is what I'd like to do)
df.withColumn("pf", composedUdf(col("num"))).show

//Use a partial function not composed with orElse - this works
val baselineUdf = udf(baseline)
df.withColumn("pf", baselineUdf(col("num"))).show 

I'm currently running this on a three node standalone cluster with the following configuration:

  • spark: 1.6.0
  • hdfs: 2.4.1
  • scala: 2.10.5

I found what I think is a clue in this answer: Why Scala can serialize Function but not PartialFunction?

so I tried:

scala> composed.isInstanceOf[Serializable]
res: Boolean = false

scala> composedUdf.isInstanceOf[Serializable]
res: Boolean = true

scala> baseline.isInstanceOf[Serializable]
res: Boolean = true

scala> baselineUdf.isInstanceOf[Serializable]
res: Boolean = true

I'm getting fuzzy here, but it seems that composing a partial function with orElse removes the serialization?

I think the most informative errors are:

org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse
...

How do I fix that? Or am I off base?

Thanks in advance for any help!

like image 290
ajp619 Avatar asked Oct 07 '16 20:10

ajp619


2 Answers

It should work if you lift it and wrap it in an another function.

val composed: Int => Option[String] = 
  x => (ge12 orElse ge7 orElse ge3 orElse baseline).lift.apply(x)
like image 198
lpiepiora Avatar answered Nov 10 '22 12:11

lpiepiora


While this doesn't directly address your problem I would like to suggest and alternative solution using SQL functions.

First you'll have to import required functions:

import org.apache.spark.sql.functions.{when, lit}

and some implicits for brevity:

import sqlContext.implicits._

Next you can express the same conditions as in your code:

val baseline = lit("baseline")
val ge3 = when($"num" >= 3,  ">=3")
val ge7 = when($"num" >= 7, ">=7")
val ge12 = when($"num" >= 12, ">=12")

val composed = ge12 otherwise (ge7 otherwise (ge3 otherwise baseline))

In this form it a little bit less elegant but you can without any effort compose expression like this using standard collection API (foldLeft / foldRight) and, unlike UDFs, result can be optimized by the Catalyst Optimizer.

like image 37
zero323 Avatar answered Nov 10 '22 10:11

zero323