Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine array of maps into single map in pyspark dataframe

Is there a function similar to the collect_list or collect_set to aggregate a column of maps into a single map in a (grouped) pyspark dataframe? For example, this function might have the following behavior:

>>>df.show()

+--+---------------------------------+
|id|                             map |
+--+---------------------------------+
| 1|                    Map(k1 -> v1)|
| 1|                    Map(k2 -> v2)|
| 1|                    Map(k3 -> v3)|
| 2|                    Map(k5 -> v5)|
| 3|                    Map(k6 -> v6)|
| 3|                    Map(k7 -> v7)|
+--+---------------------------------+

>>>df.groupBy('id').agg(collect_map('map')).show()

+--+----------------------------------+
|id|                 collect_map(map) |
+--+----------------------------------+
| 1| Map(k1 -> v1, k2 -> v2, k3 -> v3)|
| 2|                     Map(k5 -> v5)|
| 3|           Map(k6 -> v6, k7 -> v7)|
+--+----------------------------------+

It probably wouldn't be too difficult to produce the desired result using one of the other collect_ aggregations and a udf, but it seems like something like this should already exist.

like image 242
DavidWayne Avatar asked May 01 '17 17:05

DavidWayne


1 Answers

I know it is probably poor form to provide an answer to your own question before others have had a chance to answer, but in case someone is looking for a udf based version, here is one possible answer.

from pyspark.sql.functions import udf,collect_list
from pyspark.sql.types import MapType,StringType

combineMap=udf(lambda maps: {key:f[key] for f in maps for key in f},
               MapType(StringType(),StringType()))

df.groupBy('id')\
  .agg(collect_list('map')\
  .alias('maps'))\
  .select('id',combineMap('maps').alias('combined_map')).show()
like image 72
DavidWayne Avatar answered Sep 30 '22 18:09

DavidWayne