I have a Dataframe with a MapType field.
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> fields = StructType([
... StructField('timestamp', TimestampType(), True),
... StructField('other_field', StringType(), True),
... StructField('payload', MapType(
... keyType=StringType(),
... valueType=StringType()),
... True), ])
>>> import datetime
>>> rdd = sc.parallelize([[datetime.datetime.now(), 'this should be in', {'akey': 'aValue'}]])
>>> df = rdd.toDF(fields)
>>> df.show()
+--------------------+-----------------+-------------------+
| timestamp| other_field| payload|
+--------------------+-----------------+-------------------+
|2018-01-10 12:56:...|this should be in|Map(akey -> aValue)|
+--------------------+-----------------+-------------------+
I would like to add the other_field
as a key in the payload
field.
I know I can use a udf:
>>> def _add_to_map(name, value, map_field):
... map_field[name] = value
... return map_field
...
>>> add_to_map = udf(_add_to_map, MapType(StringType(),StringType()))
>>> df.select(add_to_map(lit('other_field'), 'other_field', 'payload')).show(1, False)
+------------------------------------------------------+
|PythonUDF#_add_to_map(other_field,other_field,payload)|
+------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue) |
+------------------------------------------------------+
Is there a way to do this without a udf
?
Here is one way to do it without udf
if you know the keys ahead of time. Use the create_map
function. As to whether or not this is more performant, I don't know.
from pyspark.sql.functions import col, lit, create_map
df.select(
create_map(
lit('other_field'),
col('other_field'),
lit('akey'),
col('payload')['akey']
)
).show(n=1, truncate=False)
Output:
+-----------------------------------------------------+
|map(other_field, other_field, akey, payload['akey']) |
+-----------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue)|
+-----------------------------------------------------+
Here is a way to do it without having to hardcode the dictionary keys. Unfortunately it involves one collect()
operation.
Firstly, let's modify your original dataframe to include one more key-value pair in the MapType()
field.
from pyspark.sql.functions import col, lit, create_map
import datetime
rdd = sc.parallelize(
[
[
datetime.datetime.now(),
'this should be in',
{'akey': 'aValue', 'bkey': 'bValue'}
]
]
)
df = rdd.toDF(fields)
df.show(n=1, truncate=False)
Which creates the following:
+--------------------------+-----------------+-----------------------------------+
|timestamp |other_field |payload |
+--------------------------+-----------------+-----------------------------------+
|2018-01-10 17:37:58.859603|this should be in|Map(bkey -> bValue, akey -> aValue)|
+--------------------------+-----------------+-----------------------------------+
Using explode()
and collect()
, you can get the keys as so:
from pyspark.sql.functions import explode
keys = [
x['key'] for x in (df.select(explode("payload"))
.select("key")
.distinct()
.collect())
]
Now use create_map
as above, but use the information from keys
to create the key-value pairs dynamically. I used reduce(add, ...)
because create_map
expects the inputs to be key-value pairs in order- I couldn't think of another way to flatten the list.
from operator import add
df.select(
create_map
(
*([lit('other_field'), col('other_field')] + reduce(add, [[lit(k), col('payload').getItem(k)] for k in keys]))
)
).show(n=1, truncate=False)
+---------------------------------------------------------------------------+
|map(other_field, other_field, akey, payload['akey'], bkey, payload['bkey'])|
+---------------------------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue, bkey -> bValue) |
+---------------------------------------------------------------------------+
Using map_concat & create_map (pyspark 2.4+):
(
df.withColumn(
"new_map",
F.map_concat(
"old_map",
F.create_map(F.lit("key"), F.lit("val"))
)
)
)
You can add multiple keys at once thanks to F.create_map, but F.map_concat won't replace old keys.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With