Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Spark/Scala repeated calls to withColumn() using the same function on multiple columns


I currently have code in which I repeatedly apply the same procedure to multiple DataFrame Columns via multiple chains of .withColumn, and am wanting to create a function to streamline the procedure. In my case, I am finding cumulative sums over columns aggregated by keys:

val newDF = oldDF   .withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time")))   .withColumn("cumB", sum("B").over(Window.partitionBy("ID").orderBy("time")))   .withColumn("cumC", sum("C").over(Window.partitionBy("ID").orderBy("time")))   //.withColumn(...) 

What I would like is either something like:

def createCumulativeColums(cols: Array[String], df: DataFrame): DataFrame = {   // Implement the above cumulative sums, partitioning, and ordering } 

or better yet:

def withColumns(cols: Array[String], df: DataFrame, f: function): DataFrame = {   // Implement a udf/arbitrary function on all the specified columns } 
like image 498
Damian Satterthwaite-Phillips Avatar asked Dec 30 '16 17:12

Damian Satterthwaite-Phillips

People also ask

What are the two arguments for the withColumn () function?

The withColumn() function takes two arguments, the first argument is the name of the new column and the second argument is the value of the column in Column type.

Does withColumn replace existing column?

The withColumn creates a new column with a given name. It creates a new column with same name if there exist already and drops the old one.

What does withColumn do in spark?

In Spark SQL, the withColumn() function is the most popular one, which is used to derive a column from multiple columns, change the current value of a column, convert the datatype of an existing column, create a new column, and many more.

What does === mean in Scala?

The triple equals operator === is normally the Scala type-safe equals operator, analogous to the one in Javascript. Spark overrides this with a method in Column to create a new Column object that compares the Column to the left with the object on the right, returning a boolean.

Video Answer

1 Answers

You can use select with varargs including *:

import spark.implicits._  df.select($"*" +: Seq("A", "B", "C").map(c =>    sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c") ): _*) 


  • Maps columns names to window expressions with Seq("A", ...).map(...)
  • Prepends all pre-existing columns with $"*" +: ....
  • Unpacks combined sequence with ... : _*.

and can be generalized as:

import org.apache.spark.sql.{Column, DataFrame}  /**  * @param cols a sequence of columns to transform  * @param df an input DataFrame  * @param f a function to be applied on each col in cols  */ def withColumns(cols: Seq[String], df: DataFrame, f: String => Column) =   df.select($"*" +: cols.map(c => f(c)): _*) 

If you find withColumn syntax more readable you can use foldLeft:

Seq("A", "B", "C").foldLeft(df)((df, c) =>   df.withColumn(s"cum$c",  sum(c).over(Window.partitionBy("ID").orderBy("time"))) ) 

which can be generalized for example to:

/**  * @param cols a sequence of columns to transform  * @param df an input DataFrame  * @param f a function to be applied on each col in cols  * @param name a function mapping from input to output name.  */ def withColumns(cols: Seq[String], df: DataFrame,      f: String =>  Column, name: String => String = identity) =   cols.foldLeft(df)((df, c) => df.withColumn(name(c), f(c))) 
like image 98
zero323 Avatar answered Sep 19 '22 06:09
