Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala and Spark UDF function

I made a simple UDF to convert or extract some values from a time field in a temptabl in spark. I register the function but when I call the function using sql it throws a NullPointerException. Below is my function and process of executing it. I am using Zeppelin. Strangly this was working yesterday but it stopped working this morning.

Function

def convert( time:String ) : String = {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  return sdf.format(time1)
}

Register the Function

sqlContext.udf.register("convert",convert _)

Test the function without SQL -- This works

convert(12:12:12) -> returns 12:12

Test the function with SQL in Zeppelin this FAILS.

%sql
select convert(time) from temptable limit 10

Structure of temptable

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- request: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- sourceip: string (nullable = true)

Part of the stacktrace that I am getting.

java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
like image 730
fanbondi Avatar asked Jul 28 '16 10:07

fanbondi


People also ask

What is UDF in Spark Scala?

Description. User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.

How do you write UDF in Scala?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

Is it good to use UDF in Spark?

UDFs are very useful for extending spark vocabulary but come with significant performance overhead. These are black boxes for Spark optimizer, blocking several helpful optimizations like WholeStageCodegen, Null optimization etc.

Why should we avoid UDF in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .


2 Answers

Use udf instead of define a function directly

import org.apache.spark.sql.functions._

val convert = udf[String, String](time => {
        val sdf = new java.text.SimpleDateFormat("HH:mm")
        val time1 = sdf.parse(time)
        sdf.format(time1)
    }
)

A udf's input parameter is Column(or Columns). And the return type is Column.

case class UserDefinedFunction protected[sql] (
    f: AnyRef,
    dataType: DataType,
    inputTypes: Option[Seq[DataType]]) {

  def apply(exprs: Column*): Column = {
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
  }
}
like image 140
Rockie Yang Avatar answered Oct 26 '22 03:10

Rockie Yang


You have to define your function as a UDF.

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

val convertUDF: UserDefinedFunction = udf((time:String) => {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  sdf.format(time1)
})

Next you would apply your UDF on your DataFrame.

// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing

Now, as to your actual problem, one reason you are receiving this error could be because your DataFrame contains rows which are nulls. If you filter them out before you apply the UDF, you should be able to continue no problem.

dataFrame.filter(col("time").isNotNull)

I'm curious what else causes a NullPointerException when running a UDF other than it encountering a null, if you found a reason different than my suggestion, I'd be glad to know.

like image 32
kfkhalili Avatar answered Oct 26 '22 03:10

kfkhalili