Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Scala: How to transform a column in a DF

I have a dataframe in Spark with many columns and a udf that I defined. I want the same dataframe back, except with one column transformed. Furthermore, my udf takes in a string and returns a timestamp. Is there an easy way to do this? I tried

val test = myDF.select("my_column").rdd.map(r => getTimestamp(r))  

but this returns an RDD and just with the transformed column.

like image 329
mt88 Avatar asked May 04 '16 23:05

mt88


People also ask

How do I change a column value in spark DataFrame?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.

Does withColumn replace existing column?

The withColumn creates a new column with a given name. It creates a new column with same name if there exist already and drops the old one.

How do I add a column to a DataFrame in spark Scala?

You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().


1 Answers

If you really need to use your function, I can suggest two options:

1) Using map / toDF:

import org.apache.spark.sql.Row import sqlContext.implicits._  def getTimestamp: (String => java.sql.Timestamp) = // your function here  val test = myDF.select("my_column").rdd.map {   case Row(string_val: String) => (string_val, getTimestamp(string_val)) }.toDF("my_column", "new_column") 

2) Using UDFs (UserDefinedFunction):

import org.apache.spark.sql.functions._  def getTimestamp: (String => java.sql.Timestamp) = // your function here  val newCol = udf(getTimestamp).apply(col("my_column")) // creates the new column val test = myDF.withColumn("new_column", newCol) // adds the new column to original DF 

There's more detail about Spark SQL UDFs in this nice article by Bill Chambers .


Alternatively,

If you just want to transform a StringType column into a TimestampType column you can use the unix_timestamp column function available since Spark SQL 1.5:

val test = myDF   .withColumn("new_column", unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm").cast("timestamp")) 

Note: For spark 1.5.x, it is necessary to multiply the result of unix_timestamp by 1000 before casting to timestamp (issue SPARK-11724). The resulting code would be:

val test = myDF   .withColumn("new_column", (unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm") *1000L).cast("timestamp")) 

Edit: Added udf option

like image 200
Daniel de Paula Avatar answered Sep 28 '22 12:09

Daniel de Paula