Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding a column of rowsums across a list of columns in Spark Dataframe

I have a Spark dataframe with several columns. I want to add a column on to the dataframe that is a sum of a certain number of the columns.

For example, my data looks like this:

ID var1 var2 var3 var4 var5
a   5     7    9    12   13
b   6     4    3    20   17
c   4     9    4    6    9
d   1     2    6    8    1

I want a column added summing the rows for specific columns:

ID var1 var2 var3 var4 var5   sums
a   5     7    9    12   13    46
b   6     4    3    20   17    50
c   4     9    4    6    9     32
d   1     2    6    8    10    27

I know it is possible to add columns together if you know the specific columns to add:

val newdf = df.withColumn("sumofcolumns", df("var1") + df("var2"))

But is it possible to pass a list of column names and add them together? Based off of this answer which is basically what I want but it is using the python API instead of scala (Add column sum as new column in PySpark dataframe) I think something like this would work:

//Select columns to sum
val columnstosum = ("var1", "var2","var3","var4","var5")

// Create new column called sumofcolumns which is sum of all columns listed in columnstosum
val newdf = df.withColumn("sumofcolumns", df.select(columstosum.head, columnstosum.tail: _*).sum)

This throws the error value sum is not a member of org.apache.spark.sql.DataFrame. Is there a way to sum across columns?

Thanks in advance for your help.

like image 803
Sarah Avatar asked Jun 03 '16 23:06

Sarah


People also ask

How do I sum multiple columns in spark?

Method -1 : Using select() method If we want to return the total value from multiple columns, we must use the sum() method inside the select() method by specifying the column name separated by a comma. Where, df is the input PySpark DataFrame. column_name is the column to get the sum value.

How do I add multiple columns to a DataFrame in spark?

You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().

How do you add a prefix to all columns in Pyspark?

If you would like to add a prefix or suffix to multiple columns in a pyspark dataframe, you could use a for loop and . withColumnRenamed(). You can amend sdf. columns as you see fit.

How do I add a column to an existing spark DataFrame?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .


2 Answers

You should try the following:

import org.apache.spark.sql.functions._

val sc: SparkContext = ...
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val input = sc.parallelize(Seq(
  ("a", 5, 7, 9, 12, 13),
  ("b", 6, 4, 3, 20, 17),
  ("c", 4, 9, 4, 6 , 9),
  ("d", 1, 2, 6, 8 , 1)
)).toDF("ID", "var1", "var2", "var3", "var4", "var5")

val columnsToSum = List(col("var1"), col("var2"), col("var3"), col("var4"), col("var5"))

val output = input.withColumn("sums", columnsToSum.reduce(_ + _))

output.show()

Then the result is:

+---+----+----+----+----+----+----+
| ID|var1|var2|var3|var4|var5|sums|
+---+----+----+----+----+----+----+
|  a|   5|   7|   9|  12|  13|  46|
|  b|   6|   4|   3|  20|  17|  50|
|  c|   4|   9|   4|   6|   9|  32|
|  d|   1|   2|   6|   8|   1|  18|
+---+----+----+----+----+----+----+
like image 137
Paweł Jurczenko Avatar answered Oct 22 '22 18:10

Paweł Jurczenko


Plain and simple:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{lit, col}

def sum_(cols: Column*) = cols.foldLeft(lit(0))(_ + _)

val columnstosum = Seq("var1", "var2", "var3", "var4", "var5").map(col _)
df.select(sum_(columnstosum: _*))

with Python equivalent:

from functools import reduce
from operator import add
from pyspark.sql.functions import lit, col

def sum_(*cols):
    return reduce(add, cols, lit(0))

columnstosum = [col(x) for x in ["var1", "var2", "var3", "var4", "var5"]]
select("*", sum_(*columnstosum))

Both will default to NA if there is a missing value in the row. You can use DataFrameNaFunctions.fill or coalesce function to avoid that.

like image 10
zero323 Avatar answered Oct 22 '22 17:10

zero323