Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create new column with function in Spark Dataframe

I'm trying to figure out the new dataframe API in Spark. Seems like a good step forward but having trouble doing something that should be pretty simple. I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a function something like this:

def coder(myAmt:Integer):String {   if (myAmt > 100) "Little"   else "Big" } 

When I try to use it like this:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")  myDF.withColumn("Code", coder(myDF("Amt"))) 

I get type mismatch errors

found   : org.apache.spark.sql.Column required: Integer 

I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting errors with the function compiling because it wants a boolean in the if statement.

Am I doing this wrong? Is there a better/another way to do this than using withColumn?

Thanks for your help.

like image 974
J Calbreath Avatar asked May 13 '15 15:05

J Calbreath


2 Answers

Let's say you have "Amt" column in your Schema:

import org.apache.spark.sql.functions._ val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet") val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"} val sqlfunc = udf(coder) myDF.withColumn("Code", sqlfunc(col("Amt"))) 

I think withColumn is the right way to add a column

like image 69
yjshen Avatar answered Sep 22 '22 22:09

yjshen


We should avoid defining udf functions as much as possible due to its overhead of serialization and deserialization of columns.

You can achieve the solution with simple when spark function as below

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")  myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big")) 
like image 40
Ramesh Maharjan Avatar answered Sep 22 '22 22:09

Ramesh Maharjan