Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Build Custom Column Function, user defined function

I’m using Scala and want to build my own DataFrame function. For example, I want to treat a column like an array , iterate through each element and make a calculation.

To start off, I’m trying to implement my own getMax method. So column x would have the values [3,8,2,5,9], and the expected output of the method would be 9.

Here is what it looks like in Scala

def getMax(inputArray: Array[Int]): Int = {
   var maxValue = inputArray(0)
   for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
     maxValue = inputArray(i)
   }
   maxValue
}

This is what I have so far, and get this error

"value length is not a member of org.apache.spark.sql.column", 

and I don't know how else to iterate through the column.

def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
    maxValue = col(i)
}
maxValue

}

Once I am able to implement my own method, I will create a column function

val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”)

And then I hope to be able to use this in a SQL statement, for example

val sample = sqlContext.sql("SELECT value_max(x) FROM table")

and the expected output would be 9, given input column [3,8,2,5,9]

I am following an answer from another thread Spark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data frame where they create a private method for standard deviation. The calculations I will do will be more complex than this, (e.g I will be comparing each element in the column) , am I going in the correct directions or should I be looking more into User Defined Functions?

like image 588
other15 Avatar asked Apr 11 '16 10:04

other15


People also ask

Can I use UDF in Spark SQL?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

Is it good to use UDF in Spark?

User Defined Functions is an important feature of Spark SQL which helps extend the language by adding custom constructs. UDFs are very useful for extending spark vocabulary but come with significant performance overhead.

Why should we avoid UDF in Spark?

Caveats of Using Spark UDFs: Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.

Which function is used to create a user defined function in Spark?

It can be created using the udf() method.


1 Answers

In a Spark DataFrame, you can't iterate through the elements of a Column using the approaches you thought of because a Column is not an iterable object.

However, to process the values of a column, you have some options and the right one depends on your task:

1) Using the existing built-in functions

Spark SQL already has plenty of useful functions for processing columns, including aggregation and transformation functions. Most of them you can find in the functions package (documentation here). Some others (binary functions in general) you can find directly in the Column object (documentation here). So, if you can use them, it's usually the best option. Note: don't forget the Window Functions.

2) Creating an UDF

If you can't complete your task with the built-in functions, you may consider defining an UDF (User Defined Function). They are useful when you can process each item of a column independently and you expect to produce a new column with the same number of rows as the original one (not an aggregated column). This approach is quite simple: first, you define a simple function, then you register it as an UDF, then you use it. Example:

def myFunc: (String => String) = { s => s.toLowerCase }

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

val newDF = df.withColumn("newCol", myUDF(df("oldCol")))

For more information, here's a nice article.

3) Using an UDAF

If your task is to create aggregated data, you can define an UDAF (User Defined Aggregation Function). I don't have a lot of experience with this, but I can point you to a nice tutorial:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4) Fall back to RDD processing

If you really can't use the options above, or if you processing task depends on different rows for processing one and it's not an aggregation, then I think you would have to select the column you want and process it using the corresponding RDD. Example:

val singleColumnDF = df("column")

val myRDD = singleColumnDF.rdd

// process myRDD

So, there was the options I could think of. I hope it helps.

like image 151
Daniel de Paula Avatar answered Oct 16 '22 05:10

Daniel de Paula