Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

GroupBy and concat array columns pyspark

Tags:

I have this data frame

df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])  +-----+---------+ |store|   values| +-----+---------+ |    1|[1, 2, 3]| |    1|[4, 5, 6]| |    2|      [2]| |    2|      [3]| +-----+---------+ 

and I would like to convert into the follwing df:

+-----+------------------+ |store|      values      | +-----+------------------+ |    1|[1, 2, 3, 4, 5, 6]| |    2|            [2, 3]| +-----+------------------+ 

I did this:

from  pyspark.sql import functions as F df.groupBy("store").agg(F.collect_list("values")) 

but the solution has this WrappedArrays

+-----+----------------------------------------------+ |store|collect_list(values)                          | +-----+----------------------------------------------+ |1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]| |2    |[WrappedArray(2), WrappedArray(3)]            | +-----+----------------------------------------------+ 

Is there any way to transform the WrappedArrays into concatenated arrays? Or can I do it differently?

Thank you!

like image 767
Carmen Pérez Carrillo Avatar asked Jan 23 '18 16:01

Carmen Pérez Carrillo


People also ask

How do I concatenate multiple columns in PySpark?

PySpark Concatenate Using concat()concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.

How do you use group by and count in PySpark?

PySpark Groupby Count is used to get the number of records for each group. So to perform the count, first, you need to perform the groupBy() on DataFrame which groups the records based on single or multiple column values, and then do the count() to get the number of records for each group.

What does Collect_list do in spark?

The Spark function collect_list() is used to aggregate the values into an ArrayType typically after group by and window partition.

How does groupBy work in PySpark?

Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.


2 Answers

You need a flattening UDF; starting from your own df:

spark.version # u'2.2.0'  from pyspark.sql import functions as F import pyspark.sql.types as T  def fudf(val):     return reduce (lambda x, y:x+y, val)  flattenUdf = F.udf(fudf, T.ArrayType(T.IntegerType()))  df2 = df.groupBy("store").agg(F.collect_list("values")) df2.show(truncate=False) # +-----+----------------------------------------------+  # |store|                         collect_list(values) |  # +-----+----------------------------------------------+  # |1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|  # |2    |[WrappedArray(2), WrappedArray(3)]            |  # +-----+----------------------------------------------+  df3 = df2.select("store", flattenUdf("collect_list(values)").alias("values")) df3.show(truncate=False) # +-----+------------------+ # |store|           values | # +-----+------------------+ # |1    |[1, 2, 3, 4, 5, 6]| # |2    |[2, 3]            | # +-----+------------------+ 

UPDATE (after comment):

The above snippet will work only with Python 2. With Python 3, you should modify the UDF as follows:

import functools  def fudf(val):     return functools.reduce(lambda x, y:x+y, val) 

Tested with Spark 2.4.4.

like image 186
desertnaut Avatar answered Sep 25 '22 20:09

desertnaut


For a simple problem like this, you could also use the explode function. I don't know the performance characteristics versus the selected udf answer though.

from pyspark.sql import functions as F  df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(['store', 'values'])  df2 = df.withColumn('values', F.explode('values')) # +-----+------+ # |store|values| # +-----+------+ # |    1|     1| # |    1|     2| # |    1|     3| # |    1|     4| # |    1|     5| # |    1|     6| # |    2|     2| # |    2|     3| # +-----+------+  df3 = df2.groupBy('store').agg(F.collect_list('values').alias('values')) # +-----+------------------+ # |store|           values | # +-----+------------------+ # |1    |[4, 5, 6, 1, 2, 3]| # |2    |[2, 3]            | # +-----+------------------+ 

Note: you could use F.collect_set() in the aggregation or .drop_duplicates() on df2 to remove duplicate values.

If you want to maintain ordered values in the collected list, I found the following method in another SO answer:

from pyspark.sql.window import Window  w = Window.partitionBy('store').orderBy('values') df3 = df2.withColumn('ordered_value_lists', F.collect_list('values').over(w)) # +-----+------+-------------------+ # |store|values|ordered_value_lists| # +-----+------+-------------------+ # |1    |1     |[1]                | # |1    |2     |[1, 2]             | # |1    |3     |[1, 2, 3]          | # |1    |4     |[1, 2, 3, 4]       | # |1    |5     |[1, 2, 3, 4, 5]    | # |1    |6     |[1, 2, 3, 4, 5, 6] | # |2    |2     |[2]                | # |2    |3     |[2, 3]             | # +-----+------+-------------------+  df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values')) df4.show(truncate=False) # +-----+------------------+ # |store|values            | # +-----+------------------+ # |1    |[1, 2, 3, 4, 5, 6]| # |2    |[2, 3]            | # +-----+------------------+ 

If the values themselves don't determine the order, you can use F.posexplode() and use the 'pos' column in your window functions instead of 'values' to determine order. Note: you will also need a higher level order column to order the original arrays, then use the position in the array to order the elements of the array.

df = sc.parallelize([(1, [1, 2, 3], 1), (1, [4, 5, 6], 2) , (2, [2], 1),(2, [3], 2)]).toDF(['store', 'values', 'array_order']) # +-----+---------+-----------+ # |store|values   |array_order| # +-----+---------+-----------+ # |1    |[1, 2, 3]|1          | # |1    |[4, 5, 6]|2          | # |2    |[2]      |1          | # |2    |[3]      |2          | # +-----+---------+-----------+  df2 = df.select('*', F.posexplode('values')) # +-----+---------+-----------+---+---+ # |store|values   |array_order|pos|col| # +-----+---------+-----------+---+---+ # |1    |[1, 2, 3]|1          |0  |1  | # |1    |[1, 2, 3]|1          |1  |2  | # |1    |[1, 2, 3]|1          |2  |3  | # |1    |[4, 5, 6]|2          |0  |4  | # |1    |[4, 5, 6]|2          |1  |5  | # |1    |[4, 5, 6]|2          |2  |6  | # |2    |[2]      |1          |0  |2  | # |2    |[3]      |2          |0  |3  | # +-----+---------+-----------+---+---+  w = Window.partitionBy('store').orderBy('array_order', 'pos') df3 = df2.withColumn('ordered_value_lists', F.collect_list('col').over(w)) # +-----+---------+-----------+---+---+-------------------+ # |store|values   |array_order|pos|col|ordered_value_lists| # +-----+---------+-----------+---+---+-------------------+ # |1    |[1, 2, 3]|1          |0  |1  |[1]                | # |1    |[1, 2, 3]|1          |1  |2  |[1, 2]             | # |1    |[1, 2, 3]|1          |2  |3  |[1, 2, 3]          | # |1    |[4, 5, 6]|2          |0  |4  |[1, 2, 3, 4]       | # |1    |[4, 5, 6]|2          |1  |5  |[1, 2, 3, 4, 5]    | # |1    |[4, 5, 6]|2          |2  |6  |[1, 2, 3, 4, 5, 6] | # |2    |[2]      |1          |0  |2  |[2]                | # |2    |[3]      |2          |0  |3  |[2, 3]             | # +-----+---------+-----------+---+---+-------------------+  df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values')) # +-----+------------------+ # |store|values            | # +-----+------------------+ # |1    |[1, 2, 3, 4, 5, 6]| # |2    |[2, 3]            | # +-----+------------------+ 

Edit: If you'd like to keep some columns along for the ride and they don't need to be aggregated, you can include them in the groupBy or rejoin them after aggregation (examples below). If they do require aggregation, only group by 'store' and just add whatever aggregation function you need on the 'other' column/s to the .agg() call.

from pyspark.sql import functions as F  df = sc.parallelize([(1, [1, 2, 3], 'a'), (1, [4, 5, 6], 'a') , (2, [2], 'b'), (2, [3], 'b')]).toDF(['store', 'values', 'other']) # +-----+---------+-----+ # |store|   values|other| # +-----+---------+-----+ # |    1|[1, 2, 3]|    a| # |    1|[4, 5, 6]|    a| # |    2|      [2]|    b| # |    2|      [3]|    b| # +-----+---------+-----+  df2 = df.withColumn('values', F.explode('values')) # +-----+------+-----+ # |store|values|other| # +-----+------+-----+ # |    1|     1|    a| # |    1|     2|    a| # |    1|     3|    a| # |    1|     4|    a| # |    1|     5|    a| # |    1|     6|    a| # |    2|     2|    b| # |    2|     3|    b| # +-----+------+-----+  df3 = df2.groupBy('store', 'other').agg(F.collect_list('values').alias('values')) # +-----+-----+------------------+ # |store|other|            values| # +-----+-----+------------------+ # |    1|    a|[1, 2, 3, 4, 5, 6]| # |    2|    b|            [2, 3]| # +-----+-----+------------------+  df4 = (     df.drop('values')     .join(         df2.groupBy('store')         .agg(F.collect_list('values').alias('values')),         on=['store'], how='inner'     )     .drop_duplicates() ) # +-----+-----+------------------+ # |store|other|            values| # +-----+-----+------------------+ # |    1|    a|[1, 2, 3, 4, 5, 6]| # |    2|    b|            [2, 3]| # +-----+-----+------------------+ 
like image 25
Mike Souder Avatar answered Sep 22 '22 20:09

Mike Souder