I need to append multiple columns to the existing spark dataframe where column names are given in List assuming values for new columns are constant, for example given input columns and dataframe are
val columnsNames=List("col1","col2")
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4))
and after appending both columns, assuming constant values are "val1" for col1 and "val2" for col2,output data frame should be
+-----+---+-------+------+
| _1| _2|col1 |col2|
+-----+---+-------+------+
| one| 1|val1 |val2|
| two| 2|val1 |val2|
|three| 3|val1 |val2|
| four| 4|val1 |val2|
+-----+---+-------+------+
i have written a function to append columns
def appendColumns (cols: List[String], ds: DataFrame): DataFrame = {
cols match {
case Nil => ds
case h :: Nil => appendColumns(Nil, ds.withColumn(h, lit(h)))
case h :: tail => appendColumns(tail, ds.withColumn(h, lit(h)))
}
}
Is there any better way and more functional way to do it.
thanks
Yes, there is a better and simpler way. Basically, you make as many calls to withColumn
as you have columns. With lots of columns, catalyst, the engine that optimizes spark queries may feel a bit overwhelmed (I've had the experience in the past with a similar use case). I've even seen it cause an OOM on the driver when experimenting with thousands of columns. To avoid stressing catalyst (and write less code ;-) ), you can simply use select
like this below to get this done in one spark command:
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF
// let's assume that we have a map that associates column names to their values
val columnMap = Map("col1" -> "val1", "col2" -> "val2")
// Let's create the new columns from the map
val newCols = columnMap.keys.map(k => lit(columnMap(k)) as k)
// selecting the old columns + the new ones
data.select(data.columns.map(col) ++ newCols : _*).show
+-----+---+----+----+
| _1| _2|col1|col2|
+-----+---+----+----+
| one| 1|val1|val2|
| two| 2|val1|val2|
|three| 3|val1|val2|
| four| 4|val1|val2|
+-----+---+----+----+
As opposed to recursion the more general approach using a foldLeft would I think be more general, for a limited number of columns. Using Databricks Notebook:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import spark.implicits._
val columnNames = Seq("c3","c4")
val df = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("c1", "c2")
def addCols(df: DataFrame, columns: Seq[String]): DataFrame = {
columns.foldLeft(df)((acc, col) => {
acc.withColumn(col, lit(col)) })
}
val df2 = addCols(df, columnNames)
df2.show(false)
returns:
+-----+---+---+---+
|c1 |c2 |c3 |c4 |
+-----+---+---+---+
|one |1 |c3 |c4 |
|two |2 |c3 |c4 |
|three|3 |c3 |c4 |
|four |4 |c3 |c4 |
+-----+---+---+---+
Please beware of the following: https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 albeit in a slightly different context and the other answer alludes to this via the select approach.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With