Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark aggregation function for "any value"

I have a PySpark Dataframe with an A field, few B fields that dependent on A (A->B) and C fields that I want to aggregate per each A. For example:

A | B | C
----------
A | 1 | 6
A | 1 | 7
B | 2 | 8
B | 2 | 4

I wish to group by A , present any of B and run aggregation (let's say SUM) on C.

The expected result would be:

A | B | C
----------
A | 1 | 13
B | 2 | 12

SQL-wise I would do:

SELECT A, COALESCE(B) as B, SUM(C) as C
FROM T
GROUP BY A

What is the PySpark way to do that?

I can group by A and B together or select MIN(B) per each A, for example:

df.groupBy('A').agg(F.min('B').alias('B'),F.sum('C').alias('C'))

or

df.groupBy(['A','B']).agg(F.sum('C').alias('C'))

but that seems inefficient. Is there is anything similar to SQL coalesce in PySpark?

Thanks

like image 492
Dimgold Avatar asked Feb 25 '18 12:02

Dimgold


People also ask

What does aggregate function do in PySpark?

AVG: This is the average aggregate function that returns the result set by grouping the column based on the average of a set of values. COUNT: This is the count aggregate function that returns the total number of sets of values in a column corresponding to the group function.

How does DataFrame aggregate data in PySpark?

Method 1: Using groupBy() Method In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. Here the aggregate function is sum(). sum(): This will return the total values for each group.

How do you use when and otherwise in PySpark?

PySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column , when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition). otherwise(default) .

How do you coalesce in PySpark?

PySpark Coalesce is a function in PySpark that is used to work with the partition data in a PySpark Data Frame. The Coalesce method is used to decrease the number of partitions in a Data Frame; The coalesce function avoids the full shuffling of data.


1 Answers

You'll just need to use first instead :

from pyspark.sql.functions import first, sum, col
from pyspark.sql import Row

array = [Row(A="A", B=1, C=6),
         Row(A="A", B=1, C=7),
         Row(A="B", B=2, C=8),
         Row(A="B", B=2, C=4)]
df = sqlContext.createDataFrame(sc.parallelize(array))

results = df.groupBy(col("A")).agg(first(col("B")).alias("B"), sum(col("C")).alias("C"))

Let's now check the results :

results.show()
# +---+---+---+
# |  A|  B|  C|
# +---+---+---+
# |  B|  2| 12|
# |  A|  1| 13|
# +---+---+---+

From the comments:

Is first here is computationally equivalent to any ?

groupBy causes shuffle. Thus a non deterministic behaviour is to expect.

Which is confirmed in the documentation of first :

Aggregate function: returns the first value in a group. The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned. note:: The function is non-deterministic because its results depends on order of rows which may be non-deterministic after a shuffle.

So yes, computationally there are the same, and that's one of the reasons you need to use sorting if you need a deterministic behaviour.

I hope this helps !

like image 95
eliasah Avatar answered Oct 17 '22 15:10

eliasah