Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Add a new key/value pair to a Spark MapType column

Tags:

I have a Dataframe with a MapType field.

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> fields = StructType([
...         StructField('timestamp',      TimestampType(), True),
...         StructField('other_field',    StringType(), True),
...         StructField('payload',        MapType(
...                                         keyType=StringType(),
...                                         valueType=StringType()),
...                                                     True),   ])
>>> import datetime
>>> rdd = sc.parallelize([[datetime.datetime.now(), 'this should be in', {'akey': 'aValue'}]])
>>> df = rdd.toDF(fields)
>>> df.show()
+--------------------+-----------------+-------------------+
|           timestamp|      other_field|            payload|
+--------------------+-----------------+-------------------+
|2018-01-10 12:56:...|this should be in|Map(akey -> aValue)|
+--------------------+-----------------+-------------------+

I would like to add the other_field as a key in the payload field.

I know I can use a udf:

>>> def _add_to_map(name, value, map_field):
...     map_field[name] = value
...     return map_field
...
>>> add_to_map = udf(_add_to_map, MapType(StringType(),StringType()))
>>> df.select(add_to_map(lit('other_field'), 'other_field', 'payload')).show(1, False)
+------------------------------------------------------+
|PythonUDF#_add_to_map(other_field,other_field,payload)|
+------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue) |
+------------------------------------------------------+

Is there a way to do this without a udf?

like image 885
zemekeneng Avatar asked Jan 10 '18 21:01

zemekeneng


2 Answers

Here is one way to do it without udf if you know the keys ahead of time. Use the create_map function. As to whether or not this is more performant, I don't know.

from pyspark.sql.functions import col, lit, create_map

df.select(
    create_map(
        lit('other_field'),
        col('other_field'),
        lit('akey'),
        col('payload')['akey']
    )
).show(n=1, truncate=False)

Output:

+-----------------------------------------------------+
|map(other_field, other_field, akey, payload['akey']) |
+-----------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue)|
+-----------------------------------------------------+

Update

Here is a way to do it without having to hardcode the dictionary keys. Unfortunately it involves one collect() operation.

Mockup some data

Firstly, let's modify your original dataframe to include one more key-value pair in the MapType() field.

from pyspark.sql.functions import col, lit, create_map
import datetime
rdd = sc.parallelize(
    [
        [
            datetime.datetime.now(),
            'this should be in',
            {'akey': 'aValue', 'bkey': 'bValue'}
        ]
    ]
)
df = rdd.toDF(fields)
df.show(n=1, truncate=False)

Which creates the following:

+--------------------------+-----------------+-----------------------------------+
|timestamp                 |other_field      |payload                            |
+--------------------------+-----------------+-----------------------------------+
|2018-01-10 17:37:58.859603|this should be in|Map(bkey -> bValue, akey -> aValue)|
+--------------------------+-----------------+-----------------------------------+

Get the map's keys

Using explode() and collect(), you can get the keys as so:

from pyspark.sql.functions import explode

keys = [
    x['key'] for x in (df.select(explode("payload"))
                        .select("key")
                        .distinct()
                        .collect())
]

Create a new map with all of the fields

Now use create_map as above, but use the information from keys to create the key-value pairs dynamically. I used reduce(add, ...) because create_map expects the inputs to be key-value pairs in order- I couldn't think of another way to flatten the list.

from operator import add
df.select(
    create_map
    (
        *([lit('other_field'), col('other_field')] + reduce(add, [[lit(k), col('payload').getItem(k)] for k in keys]))
    )
).show(n=1, truncate=False)

Final result:

+---------------------------------------------------------------------------+
|map(other_field, other_field, akey, payload['akey'], bkey, payload['bkey'])|
+---------------------------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue, bkey -> bValue)      |
+---------------------------------------------------------------------------+

References

  1. pyspark: Create MapType Column from existing columns

  2. PySpark converting a column of type 'map' to multiple columns in a dataframe

like image 58
pault Avatar answered Sep 20 '22 13:09

pault


Using map_concat & create_map (pyspark 2.4+):

(
    df.withColumn(
        "new_map",
        F.map_concat(
            "old_map",
            F.create_map(F.lit("key"), F.lit("val"))
        )
    )
)

You can add multiple keys at once thanks to F.create_map, but F.map_concat won't replace old keys.

like image 38
utkarshgupta137 Avatar answered Sep 21 '22 13:09

utkarshgupta137