Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a new Spark DataFrame with new column value based on column in first dataframe Java

This should be easy but....using Spark 1.6.1.... I have DataFrame #1 with columns A, B, C. With Values:

A  B  C
1  2  A
2  2  A
3  2  B
4  2  C

I then create a new dataframe with a new column D so:

DataFrame df2 = df1.withColumn("D", df1.col("C"));

so far so good but I actually want the value in column D to be conditional ie:

// pseudo code
if (col C = "A") the col D = "X"
else if (col C = "B") the col D = "Y"
else col D = "Z"

I'll then drop column C and rename D to C. I've tried looking at the Column functions but nothing appears to fit the bill; I thought of using df1.rdd().map() and iterating over the rows but aside from not actually managing to get it to work, I kind of thought that the whole point of DataFrames was to move away from the RDD abstraction?

Unfortunately I have to do this in Java (and of course Spark with Java is not optimal!!). It seems like I'm missing the obvious and am happy to be shown to be an idiot when presented with the solution!

like image 522
user1128482 Avatar asked May 07 '16 15:05

user1128482


People also ask

How do I create a new column in spark data frame?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do I change the value of column in spark DataFrame?

You can replace column values of PySpark DataFrame by using SQL string functions regexp_replace(), translate(), and overlay() with Python examples.

What are the two arguments for the withColumn () function?

The withColumn() function takes two arguments, the first argument is the name of the new column and the second argument is the value of the column in Column type.


3 Answers

I believe you can use when to achieve that. Additionally, you probably can replace the old column directly. For your example, the code would be something like:

import static org.apache.spark.sql.functions.*;

Column newCol = when(col("C").equalTo("A"), "X")
    .when(col("C").equalTo("B"), "Y")
    .otherwise("Z");

DataFrame df2 = df1.withColumn("C", newCol);

For more details about when, check the Column Javadoc.

like image 113
Daniel de Paula Avatar answered Oct 12 '22 17:10

Daniel de Paula


Thanks to Daniel I have resolved this :)

The missing piece was the static import of the sql functions

import static org.apache.spark.sql.functions.*;

I must have tried a million different ways of using when, but got compile failures/runtime errors because I didn't do the import. Once imported Daniel's answer was spot on!

like image 29
user1128482 Avatar answered Oct 12 '22 19:10

user1128482


You may also use udf's to do the same job. Just write a simple if then else structure

import org.apache.spark.sql.functions.udf
val customFunct = udf { d =>
      //if then else construct
    }

val new_DF= df.withColumn(column_name, customFunct(df("data_column")))
like image 2
sudeepgupta90 Avatar answered Oct 12 '22 18:10

sudeepgupta90