Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I pass extra parameters to UDFs in Spark SQL?

Tags:

I want to parse the date columns in a DataFrame, and for each date column, the resolution for the date may change (i.e. 2011/01/10 => 2011 /01 if the resolution is set to "Month").

I wrote the following code:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = {   import org.apache.spark.sql.functions._   val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}   val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}    val allColNames = dataframe.columns   val allCols = allColNames.map(name => dataframe.col(name))    val mappedCols =   {     for(i <- allCols.indices) yield     {       schema(i) match       {         case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))         case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))         case _ => allCols(i)       }     }   }    dataframe.select(mappedCols:_*)  }} 

However it doesn't work. It seems that I can only pass Columns to UDFs. And I wonder if it will be very slow if I convert the DataFrame to RDD and apply the function on each row.

Does anyone know the correct solution? Thank you!

like image 309
DarkZero Avatar asked Feb 22 '16 05:02

DarkZero


People also ask

Why are spark UDFs slow?

Since 30th October, 2017, Spark just introduced vectorized udfs for pyspark. The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.

How do you aggregate in spark?

You need to define a key or grouping in aggregation. You can also define an aggregation function that specifies how the transformations will be performed among the columns. If you give multiple values as input, the aggregation function will generate one result for each group.

How do PySpark UDFs work?

PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.

What is UDTFs in spark SQL?

In addition, Hive also supports UDTFs (User Defined Tabular Functions) that act on one row as input and return multiple rows as output. To use Hive UDFs/UDAFs/UTFs, the user should register them in Spark, and then use them in Spark SQL queries.


2 Answers

Just use a little bit of currying:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) =>    SparkDateTimeConverter.convertDate(x, resolution)) 

and use it as follows:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i)) 

On a side note you should take a look at sql.functions.trunc and sql.functions.date_format. These should at least part of the job without using UDFs at all.

Note:

In Spark 2.2 or later you can use typedLit function:

import org.apache.spark.sql.functions.typedLit 

which support a wider range of literals like Seq or Map.

like image 111
zero323 Avatar answered Nov 01 '22 21:11

zero323


You can create a literal Column to pass to a udf using the lit(...) function defined in org.apache.spark.sql.functions

For example:

val takeRight = udf((s: String, i: Int) => s.takeRight(i)) df.select(takeRight($"stringCol", lit(1))) 
like image 38
Michael Armbrust Avatar answered Nov 01 '22 19:11

Michael Armbrust