I need to creeate an new Spark DF MapType Column based on the existing columns where column name is the key and the value is the value.
As Example - i've this DF:
rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6), ('d23d', 1.5, 2.0, 2.2), ('as3d', 2.2, 4.3, 9.0) ]) schema = StructType([StructField('key', StringType(), True), StructField('metric1', FloatType(), True), StructField('metric2', FloatType(), True), StructField('metric3', FloatType(), True)]) df = sqlContext.createDataFrame(rdd, schema) +----+-------+-------+-------+ | key|metric1|metric2|metric3| +----+-------+-------+-------+ |123k| 1.3| 6.3| 7.6| |d23d| 1.5| 2.0| 2.2| |as3d| 2.2| 4.3| 9.0| +----+-------+-------+-------+
I'm already so far that i can create a structType from this:
nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric") df2 = df.select("key", nameCol) +----+-------------+ | key| metric| +----+-------------+ |123k|[1.3,6.3,7.6]| |d23d|[1.5,2.0,2.2]| |as3d|[2.2,4.3,9.0]| +----+-------------+
But what i need is an metric column with am MapType where the key is the column name:
+----+-------------------------+ | key| metric| +----+-------------------------+ |123k|Map(metric1 -> 1.3, me...| |d23d|Map(metric1 -> 1.5, me...| |as3d|Map(metric1 -> 2.2, me...| +----+-------------------------+
Any hints how i can transform the data?
Thanks!
PySpark Concatenate Using concat() concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.
Solution: PySpark SQL function create_map() is used to convert selected DataFrame columns to MapType , create_map() takes a list of columns you wanted to convert as an argument and returns a MapType column.
In Spark 2.0 or later you can use create_map
. First some imports:
from pyspark.sql.functions import lit, col, create_map from itertools import chain
create_map
expects an interleaved sequence of keys
and values
which can be created for example like this:
metric = create_map(list(chain(*( (lit(name), col(name)) for name in df.columns if "metric" in name )))).alias("metric")
and used with select
:
df.select("key", metric)
With example data the result is:
+----+---------------------------------------------------------+ |key |metric | +----+---------------------------------------------------------+ |123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6) | |d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2) | |as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0) | +----+---------------------------------------------------------+
If you use an earlier version of Spark you'll have to use UDF:
from pyspark.sql import Column from pyspark.sql.functions import struct from pyspark.sql.types import DataType, DoubleType, StringType, MapType def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column: args = [struct(lit(name), col(name)) for name in cols] as_map_ = udf( lambda *args: dict(args), MapType(StringType(), key_type) ) return as_map_(*args)
which could be used as follows:
df.select("key", as_map(*[name for name in df.columns if "metric" in name]).alias("metric"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With