Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Applying function to Spark Dataframe Column

Coming from R, I am used to easily doing operations on columns. Is there any easy way to take this function that I've written in scala

def round_tenths_place( un_rounded:Double ) : Double = {
    val rounded = BigDecimal(un_rounded).setScale(1, BigDecimal.RoundingMode.HALF_UP).toDouble
    return rounded
}

And apply it to a one column of a dataframe - kind of what I hoped this would do:

 bid_results.withColumn("bid_price_bucket", round_tenths_place(bid_results("bid_price")) )

I haven't found any easy way and am struggling to figure out how to do this. There's got to be an easier way than converting the dataframe to and RDD and then selecting from rdd of rows to get the right field and mapping the function across all of the values, yeah? And also something more succinct creating a SQL table and then doing this with a sparkSQL UDF?

like image 894
Michael Discenza Avatar asked Feb 05 '16 15:02

Michael Discenza


People also ask

How do you use lambda function in PySpark?

PySpark DataFrame doesn't have map() transformation to apply the lambda function, when you wanted to apply the custom transformation, you need to convert the DataFrame to RDD and apply the map() transformation.

How do you assign a value to a column in PySpark?

Method 1: Using Lit() function Select table by using select() method and pass the arguments first one is the column name, or “*” for selecting the whole table and second argument pass the lit() function with constant values.

How do I change the value of column in Spark DataFrame?

You can replace column values of PySpark DataFrame by using SQL string functions regexp_replace(), translate(), and overlay() with Python examples.

What does take () do in Spark?

Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.


1 Answers

You can define an UDF as follows:

val round_tenths_place_udf = udf(round_tenths_place _)
bid_results.withColumn(
  "bid_price_bucket", round_tenths_place_udf($"bid_price"))

although built-in Round expression is using exactly the same logic as your function and should be more than enough, not to mention much more efficient:

import org.apache.spark.sql.functions.round

bid_results.withColumn("bid_price_bucket", round($"bid_price", 1))

See also following:

  • Updating a dataframe column in spark
  • How to apply a function to a column of a Spark DataFrame?
like image 106
zero323 Avatar answered Oct 26 '22 07:10

zero323