I want to parse the date columns in a DataFrame
, and for each date column, the resolution for the date may change (i.e. 2011/01/10 => 2011 /01 if the resolution is set to "Month").
I wrote the following code:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = { import org.apache.spark.sql.functions._ val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)} val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)} val allColNames = dataframe.columns val allCols = allColNames.map(name => dataframe.col(name)) val mappedCols = { for(i <- allCols.indices) yield { schema(i) match { case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i))) case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i)) case _ => allCols(i) } } } dataframe.select(mappedCols:_*) }}
However it doesn't work. It seems that I can only pass Column
s to UDFs. And I wonder if it will be very slow if I convert the DataFrame
to RDD
and apply the function on each row.
Does anyone know the correct solution? Thank you!
Since 30th October, 2017, Spark just introduced vectorized udfs for pyspark. The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.
You need to define a key or grouping in aggregation. You can also define an aggregation function that specifies how the transformations will be performed among the columns. If you give multiple values as input, the aggregation function will generate one result for each group.
PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.
In addition, Hive also supports UDTFs (User Defined Tabular Functions) that act on one row as input and return multiple rows as output. To use Hive UDFs/UDAFs/UTFs, the user should register them in Spark, and then use them in Spark SQL queries.
Just use a little bit of currying:
def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => SparkDateTimeConverter.convertDate(x, resolution))
and use it as follows:
case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
On a side note you should take a look at sql.functions.trunc
and sql.functions.date_format
. These should at least part of the job without using UDFs at all.
Note:
In Spark 2.2 or later you can use typedLit
function:
import org.apache.spark.sql.functions.typedLit
which support a wider range of literals like Seq
or Map
.
You can create a literal Column
to pass to a udf using the lit(...)
function defined in org.apache.spark.sql.functions
For example:
val takeRight = udf((s: String, i: Int) => s.takeRight(i)) df.select(takeRight($"stringCol", lit(1)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With