Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark: Create MapType Column from existing columns

Tags:

I need to creeate an new Spark DF MapType Column based on the existing columns where column name is the key and the value is the value.

As Example - i've this DF:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),                       ('d23d', 1.5, 2.0, 2.2),                        ('as3d', 2.2, 4.3, 9.0)                           ]) schema = StructType([StructField('key', StringType(), True),                      StructField('metric1', FloatType(), True),                      StructField('metric2', FloatType(), True),                      StructField('metric3', FloatType(), True)]) df = sqlContext.createDataFrame(rdd, schema)  +----+-------+-------+-------+ | key|metric1|metric2|metric3| +----+-------+-------+-------+ |123k|    1.3|    6.3|    7.6| |d23d|    1.5|    2.0|    2.2| |as3d|    2.2|    4.3|    9.0| +----+-------+-------+-------+ 

I'm already so far that i can create a structType from this:

nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric") df2 = df.select("key", nameCol)  +----+-------------+ | key|       metric| +----+-------------+ |123k|[1.3,6.3,7.6]| |d23d|[1.5,2.0,2.2]| |as3d|[2.2,4.3,9.0]| +----+-------------+ 

But what i need is an metric column with am MapType where the key is the column name:

+----+-------------------------+ | key|                   metric| +----+-------------------------+ |123k|Map(metric1 -> 1.3, me...| |d23d|Map(metric1 -> 1.5, me...| |as3d|Map(metric1 -> 2.2, me...| +----+-------------------------+ 

Any hints how i can transform the data?

Thanks!

like image 899
Michael Weber Avatar asked Dec 22 '16 17:12

Michael Weber


People also ask

How do I combine two columns in PySpark?

PySpark Concatenate Using concat() concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.

How do I map a column in PySpark?

Solution: PySpark SQL function create_map() is used to convert selected DataFrame columns to MapType , create_map() takes a list of columns you wanted to convert as an argument and returns a MapType column.


1 Answers

In Spark 2.0 or later you can use create_map. First some imports:

from pyspark.sql.functions import lit, col, create_map from itertools import chain 

create_map expects an interleaved sequence of keys and values which can be created for example like this:

metric = create_map(list(chain(*(     (lit(name), col(name)) for name in df.columns if "metric" in name )))).alias("metric") 

and used with select:

df.select("key", metric) 

With example data the result is:

+----+---------------------------------------------------------+ |key |metric                                                   | +----+---------------------------------------------------------+ |123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)      | |d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)      | |as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)      | +----+---------------------------------------------------------+ 

If you use an earlier version of Spark you'll have to use UDF:

from pyspark.sql import Column from pyspark.sql.functions import struct from pyspark.sql.types import DataType, DoubleType, StringType, MapType  def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column:     args = [struct(lit(name), col(name)) for name in cols]     as_map_ = udf(         lambda *args: dict(args),         MapType(StringType(), key_type)     )     return as_map_(*args) 

which could be used as follows:

df.select("key",      as_map(*[name for name in df.columns if "metric" in name]).alias("metric")) 
like image 174
zero323 Avatar answered Sep 23 '22 02:09

zero323