Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Task not Serializable for UDF on DataFrame

Tags:

I get org.apache.spark.SparkException: Task not serializable when I try to execute the following on Spark 1.4.1:

import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat  object ConversionUtils {   val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")    def tsUTC(s: String): Timestamp = new Timestamp(iso8601.parse(s).getTime)    val castTS = udf[Timestamp, String](tsUTC _) }  val df = frame.withColumn("ts", ConversionUtils.castTS(frame("ts_str"))) df.first 

Here, frame is a DataFrame that lives within a HiveContext. That data frame does not have any issues.

I have similar UDFs for integers and they work without any problem. However, the one with timestamps seems to cause problems. According to the documentation, java.sql.TimeStamp implements Serializable, so that's not the problem. The same is true for SimpleDateFormat as can be seen here.

This causes me to believe it's the UDF that's causing problems. However, I'm not sure what and how to fix it.

The relevant section of the trace:

Caused by: java.io.NotSerializableException: ... Serialization stack:         - object not serializable (class: ..., value: ...$ConversionUtils$@63ed11dd)         - field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$)         - object (class ...$ConversionUtils$$anonfun$3, <function1>)         - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1)         - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>)         - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1)         - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683))         - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)         - object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146)         - element of array (index: 35)         - array (class [Ljava.lang.Object;, size 36)         - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)         - object (class scala.collection.mutable.ArrayBuffer, 
like image 977
Ian Avatar asked Apr 22 '16 13:04

Ian


People also ask

Why UDF is not recommended in Spark?

Also, by using built-in Spark SQL functions we cut down our testing effort, as everything is performed on Spark's side. These functions are designed by JVM experts so UDFs are not likely to achieve better performance.

Why are UDFs slow in PySpark?

The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.

What is the use of Spark UDF Register ()?

User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.


1 Answers

Try:

object ConversionUtils extends Serializable {   ... } 
like image 177
David Griffin Avatar answered Sep 20 '22 15:09

David Griffin