I have a dataframe in Spark with many columns and a udf that I defined. I want the same dataframe back, except with one column transformed. Furthermore, my udf takes in a string and returns a timestamp. Is there an easy way to do this? I tried
val test = myDF.select("my_column").rdd.map(r => getTimestamp(r))
but this returns an RDD and just with the transformed column.
You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.
The withColumn creates a new column with a given name. It creates a new column with same name if there exist already and drops the old one.
You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().
If you really need to use your function, I can suggest two options:
1) Using map / toDF:
import org.apache.spark.sql.Row import sqlContext.implicits._ def getTimestamp: (String => java.sql.Timestamp) = // your function here val test = myDF.select("my_column").rdd.map { case Row(string_val: String) => (string_val, getTimestamp(string_val)) }.toDF("my_column", "new_column")
2) Using UDFs (UserDefinedFunction
):
import org.apache.spark.sql.functions._ def getTimestamp: (String => java.sql.Timestamp) = // your function here val newCol = udf(getTimestamp).apply(col("my_column")) // creates the new column val test = myDF.withColumn("new_column", newCol) // adds the new column to original DF
There's more detail about Spark SQL UDFs in this nice article by Bill Chambers .
Alternatively,
If you just want to transform a StringType
column into a TimestampType
column you can use the unix_timestamp
column function available since Spark SQL 1.5:
val test = myDF .withColumn("new_column", unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm").cast("timestamp"))
Note: For spark 1.5.x, it is necessary to multiply the result of unix_timestamp
by 1000
before casting to timestamp (issue SPARK-11724). The resulting code would be:
val test = myDF .withColumn("new_column", (unix_timestamp(col("my_column"), "yyyy-MM-dd HH:mm") *1000L).cast("timestamp"))
Edit: Added udf option
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With