Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark - create DataFrame Grouping columns in map type structure

My DataFrame has the following structure:

-------------------------
| Brand | type |  amount|
-------------------------
|  B   |   a  |   10   |
|  B   |   b  |   20   |
|  C   |   c  |   30   |
-------------------------

I want to reduce the amount of rows by grouping type and amount into one single column of type: Map So Brand will be unique and MAP_type_AMOUNT will have key,value for each type amount combination.

I think Spark.sql might have some functions to help in this process, or do I have to get the RDD being the DataFrame and make my "own" conversion to map type?

Expected:

   -------------------------
    | Brand | MAP_type_AMOUNT 
    -------------------------
    |  B    | {a: 10, b:20} |
    |  C    | {c: 30}       |
    -------------------------
like image 479
Alg_D Avatar asked Aug 06 '17 12:08

Alg_D


People also ask

How do I create a column map in Spark?

We can create a map column using createMapType() function on the DataTypes class. This method takes two arguments keyType and valueType as mentioned above and these two arguments should be of a type that extends DataType. This snippet creates “mapCol” object of type MapType with key and values as String type.

How do you define a map type in PySpark?

Create PySpark MapType MapType and use MapType() constructor to create a map object. MapType Key Points: The First param keyType is used to specify the type of the key in the map. The Second param valueType is used to specify the type of the value in the map.

How do I combine columns in Spark data frame?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.


1 Answers

Slight improvement to Prem's answer (sorry I can't comment yet)

Use func.create_map instead of func.struct. See documentation

import pyspark.sql.functions as func
df = sc.parallelize([('B','a',10),('B','b',20),
('C','c',30)]).toDF(['Brand','Type','Amount'])

df_converted = df.groupBy("Brand").\
    agg(func.collect_list(func.create_map(func.col("Type"),
    func.col("Amount"))).alias("MAP_type_AMOUNT"))

print df_converted.collect()

Output:

[Row(Brand=u'B', MAP_type_AMOUNT=[{u'a': 10}, {u'b': 20}]),
 Row(Brand=u'C', MAP_type_AMOUNT=[{u'c': 30}])]
like image 156
osbon123 Avatar answered Oct 23 '22 03:10

osbon123