I have a PySpark Dataframe with an A
field, few B
fields that dependent on A
(A->B
) and C
fields that I want to aggregate per each A. For example:
A | B | C
----------
A | 1 | 6
A | 1 | 7
B | 2 | 8
B | 2 | 4
I wish to group by A
, present any of B
and run aggregation (let's say SUM
) on C
.
The expected result would be:
A | B | C
----------
A | 1 | 13
B | 2 | 12
SQL-wise I would do:
SELECT A, COALESCE(B) as B, SUM(C) as C
FROM T
GROUP BY A
What is the PySpark way to do that?
I can group by A and B together or select MIN(B)
per each A, for example:
df.groupBy('A').agg(F.min('B').alias('B'),F.sum('C').alias('C'))
or
df.groupBy(['A','B']).agg(F.sum('C').alias('C'))
but that seems inefficient. Is there is anything similar to SQL coalesce
in PySpark?
Thanks
AVG: This is the average aggregate function that returns the result set by grouping the column based on the average of a set of values. COUNT: This is the count aggregate function that returns the total number of sets of values in a column corresponding to the group function.
Method 1: Using groupBy() Method In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. Here the aggregate function is sum(). sum(): This will return the total values for each group.
PySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column , when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition). otherwise(default) .
PySpark Coalesce is a function in PySpark that is used to work with the partition data in a PySpark Data Frame. The Coalesce method is used to decrease the number of partitions in a Data Frame; The coalesce function avoids the full shuffling of data.
You'll just need to use first
instead :
from pyspark.sql.functions import first, sum, col
from pyspark.sql import Row
array = [Row(A="A", B=1, C=6),
Row(A="A", B=1, C=7),
Row(A="B", B=2, C=8),
Row(A="B", B=2, C=4)]
df = sqlContext.createDataFrame(sc.parallelize(array))
results = df.groupBy(col("A")).agg(first(col("B")).alias("B"), sum(col("C")).alias("C"))
Let's now check the results :
results.show()
# +---+---+---+
# | A| B| C|
# +---+---+---+
# | B| 2| 12|
# | A| 1| 13|
# +---+---+---+
From the comments:
Is
first
here is computationally equivalent toany
?
groupBy
causes shuffle. Thus a non deterministic behaviour is to expect.
Which is confirmed in the documentation of first
:
Aggregate function: returns the first value in a group. The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned. note:: The function is non-deterministic because its results depends on order of rows which may be non-deterministic after a shuffle.
So yes, computationally there are the same, and that's one of the reasons you need to use sorting if you need a deterministic behaviour.
I hope this helps !
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With