I get org.apache.spark.SparkException: Task not serializable
when I try to execute the following on Spark 1.4.1:
import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat object ConversionUtils { val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX") def tsUTC(s: String): Timestamp = new Timestamp(iso8601.parse(s).getTime) val castTS = udf[Timestamp, String](tsUTC _) } val df = frame.withColumn("ts", ConversionUtils.castTS(frame("ts_str"))) df.first
Here, frame
is a DataFrame
that lives within a HiveContext
. That data frame does not have any issues.
I have similar UDFs for integers and they work without any problem. However, the one with timestamps seems to cause problems. According to the documentation, java.sql.TimeStamp
implements Serializable
, so that's not the problem. The same is true for SimpleDateFormat
as can be seen here.
This causes me to believe it's the UDF that's causing problems. However, I'm not sure what and how to fix it.
The relevant section of the trace:
Caused by: java.io.NotSerializableException: ... Serialization stack: - object not serializable (class: ..., value: ...$ConversionUtils$@63ed11dd) - field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$) - object (class ...$ConversionUtils$$anonfun$3, <function1>) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683)) - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146) - element of array (index: 35) - array (class [Ljava.lang.Object;, size 36) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer,
Also, by using built-in Spark SQL functions we cut down our testing effort, as everything is performed on Spark's side. These functions are designed by JVM experts so UDFs are not likely to achieve better performance.
The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.
User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.
Try:
object ConversionUtils extends Serializable { ... }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With