I currently have code in which I repeatedly apply the same procedure to multiple DataFrame Columns via multiple chains of .withColumn, and am wanting to create a function to streamline the procedure. In my case, I am finding cumulative sums over columns aggregated by keys:
val newDF = oldDF .withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time"))) .withColumn("cumB", sum("B").over(Window.partitionBy("ID").orderBy("time"))) .withColumn("cumC", sum("C").over(Window.partitionBy("ID").orderBy("time"))) //.withColumn(...)
What I would like is either something like:
def createCumulativeColums(cols: Array[String], df: DataFrame): DataFrame = { // Implement the above cumulative sums, partitioning, and ordering }
or better yet:
def withColumns(cols: Array[String], df: DataFrame, f: function): DataFrame = { // Implement a udf/arbitrary function on all the specified columns }
The withColumn() function takes two arguments, the first argument is the name of the new column and the second argument is the value of the column in Column type.
The withColumn creates a new column with a given name. It creates a new column with same name if there exist already and drops the old one.
In Spark SQL, the withColumn() function is the most popular one, which is used to derive a column from multiple columns, change the current value of a column, convert the datatype of an existing column, create a new column, and many more.
The triple equals operator === is normally the Scala type-safe equals operator, analogous to the one in Javascript. Spark overrides this with a method in Column to create a new Column object that compares the Column to the left with the object on the right, returning a boolean.
You can use select
with varargs including *
:
import spark.implicits._ df.select($"*" +: Seq("A", "B", "C").map(c => sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c") ): _*)
This:
Seq("A", ...).map(...)
$"*" +: ...
.... : _*
.and can be generalized as:
import org.apache.spark.sql.{Column, DataFrame} /** * @param cols a sequence of columns to transform * @param df an input DataFrame * @param f a function to be applied on each col in cols */ def withColumns(cols: Seq[String], df: DataFrame, f: String => Column) = df.select($"*" +: cols.map(c => f(c)): _*)
If you find withColumn
syntax more readable you can use foldLeft
:
Seq("A", "B", "C").foldLeft(df)((df, c) => df.withColumn(s"cum$c", sum(c).over(Window.partitionBy("ID").orderBy("time"))) )
which can be generalized for example to:
/** * @param cols a sequence of columns to transform * @param df an input DataFrame * @param f a function to be applied on each col in cols * @param name a function mapping from input to output name. */ def withColumns(cols: Seq[String], df: DataFrame, f: String => Column, name: String => String = identity) = cols.foldLeft(df)((df, c) => df.withColumn(name(c), f(c)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With