Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to slice and sum elements of array column?

I would like to sum (or perform other aggregate functions too) on the array column using SparkSQL.

I have a table as

+-------+-------+---------------------------------+
|dept_id|dept_nm|                      emp_details|
+-------+-------+---------------------------------+
|     10|Finance|        [100, 200, 300, 400, 500]|
|     20|     IT|                [10, 20, 50, 100]|
+-------+-------+---------------------------------+

I would like to sum the values of this emp_details column .

Expected query:

sqlContext.sql("select sum(emp_details) from mytable").show

Expected result

1500
180

Also I should be able to sum on the range elements too like :

sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show

result

600
80

when doing sum on the Array type as expected it shows that sum expects argument to be numeric type not array type.

I think we need to create UDF for this . but how ?

Will I be facing any performance hits with UDFs ? and is there any other solution apart from the UDF one ?

like image 919
serious_black Avatar asked Oct 20 '16 09:10

serious_black


People also ask

How do you sum a column in an array?

The numpy. sum() function is available in the NumPy package of Python. This function is used to compute the sum of all elements, the sum of each row, and the sum of each column of a given array. Essentially, this sum ups the elements of an array, takes the elements within a ndarray, and adds them together.

How do you sum all elements in an array in Matlab?

S = sum( A , 'all' ) computes the sum of all elements of A . This syntax is valid for MATLAB® versions R2018b and later. S = sum( A , dim ) returns the sum along dimension dim . For example, if A is a matrix, then sum(A,2) is a column vector containing the sum of each row.

How do I sum a column in a Dataframe spark?

Using agg() method: The agg() method returns the aggregate sum of the passed parameter column. Where, The dataframe is the input dataframe. The column_name is the column in the dataframe.


1 Answers

Spark 2.4.0

As of Spark 2.4, Spark SQL supports higher-order functions that are to manipulate complex data structures, including arrays.

The "modern" solution would be as follows:

scala> input.show(false)
+-------+-------+-------------------------+
|dept_id|dept_nm|emp_details              |
+-------+-------+-------------------------+
|10     |Finance|[100, 200, 300, 400, 500]|
|20     |IT     |[10, 20, 50, 100]        |
+-------+-------+-------------------------+

input.createOrReplaceTempView("mytable")

val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (acc, value) -> acc + value) as sum from mytable"
scala> sql(sqlText).show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

You can find a good reading on higher-order functions in the following articles and video:

  1. Introducing New Built-in and Higher-Order Functions for Complex Data Types in Apache Spark 2.4
  2. Working with Nested Data Using Higher Order Functions in SQL on Databricks
  3. An Introduction to Higher Order Functions in Spark SQL with Herman van Hovell (Databricks)

Spark 2.3.2 and earlier

DISCLAIMER I would not recommend this approach (even though it got the most upvotes) because of the deserialization that Spark SQL does to execute Dataset.map. The query forces Spark to deserialize the data and load it onto JVM (from memory regions that are managed by Spark outside JVM). That will inevitably lead to more frequent GCs and hence make performance worse.

One solution would be to use Dataset solution where the combination of Spark SQL and Scala could show its power.

scala> val inventory = Seq(
     |   (10, "Finance", Seq(100, 200, 300, 400, 500)),
     |   (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]

// I'm too lazy today for a case class
scala> inventory.as[(Long, String, Seq[Int])].
  map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }.
  toDF("dept_id", "dept_nm", "sum").
  show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

I'm leaving the slice part as an exercise as it's equally simple.

like image 95
Jacek Laskowski Avatar answered Nov 12 '22 15:11

Jacek Laskowski