Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Spark SQL, how do you register and use a generic UDF?

In my Project, I want to achieve ADD(+) function, but my parameter maybe LongType, DoubleType, IntType. I use sqlContext.udf.register("add",XXX), but I don't know how to write XXX, which is to make generic functions.

like image 794
yjxyjx Avatar asked Apr 28 '16 12:04

yjxyjx


People also ask

How do I register a function as UDF in Spark Scala?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

Why UDF are not recommended in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .


1 Answers

You can create a generic UDF by creating a StructType with struct($"col1", $"col2") that holds your values and have your UDF work off of this. It gets passed into your UDF as a Row object, so you can do something like this:

val multiAdd = udf[Double,Row](r => {
  var n = 0.0
  r.toSeq.foreach(n1 => n = n + (n1 match {
    case l: Long => l.toDouble
    case i: Int => i.toDouble
    case d: Double => d
    case f: Float => f.toDouble
  }))
  n
})

val df = Seq((1.0,2),(3.0,4)).toDF("c1","c2")
df.withColumn("add", multiAdd(struct($"c1", $"c2"))).show
+---+---+---+
| c1| c2|add|
+---+---+---+
|1.0|  2|3.0|
|3.0|  4|7.0|
+---+---+---+

You can even do interesting things like take a variable number of columns as input. In fact, our UDF defined above already does that:

val df = Seq((1, 2L, 3.0f,4.0),(5, 6L, 7.0f,8.0)).toDF("int","long","float","double")

df.printSchema
root
 |-- int: integer (nullable = false)
 |-- long: long (nullable = false)
 |-- float: float (nullable = false)
 |-- double: double (nullable = false)

df.withColumn("add", multiAdd(struct($"int", $"long", $"float", $"double"))).show
+---+----+-----+------+----+
|int|long|float|double| add|
+---+----+-----+------+----+
|  1|   2|  3.0|   4.0|10.0|
|  5|   6|  7.0|   8.0|26.0|
+---+----+-----+------+----+

You can even add a hard-coded number into the mix:

df.withColumn("add", multiAdd(struct(lit(100), $"int", $"long"))).show
+---+----+-----+------+-----+
|int|long|float|double|  add|
+---+----+-----+------+-----+
|  1|   2|  3.0|   4.0|103.0|
|  5|   6|  7.0|   8.0|111.0|
+---+----+-----+------+-----+

If you want to use the UDF in SQL syntax, you can do:

sqlContext.udf.register("multiAdd", (r: Row) => {
  var n = 0.0
  r.toSeq.foreach(n1 => n = n + (n1 match {
    case l: Long => l.toDouble
    case i: Int => i.toDouble
    case d: Double => d
    case f: Float => f.toDouble
  }))
  n
})
df.registerTempTable("df")

//  Note that 'int' and 'long' are column names
sqlContext.sql("SELECT *, multiAdd(struct(int, long)) as add from df").show
+---+----+-----+------+----+
|int|long|float|double| add|
+---+----+-----+------+----+
|  1|   2|  3.0|   4.0| 3.0|
|  5|   6|  7.0|   8.0|11.0|
+---+----+-----+------+----+

This works too:

sqlContext.sql("SELECT *, multiAdd(struct(*)) as add from df").show
+---+----+-----+------+----+
|int|long|float|double| add|
+---+----+-----+------+----+
|  1|   2|  3.0|   4.0|10.0|
|  5|   6|  7.0|   8.0|26.0|
+---+----+-----+------+----+
like image 110
David Griffin Avatar answered Oct 30 '22 21:10

David Griffin