Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark create new column with mapping from a dict

Using Spark 1.6, I have a Spark DataFrame column (named let's say col1) with values A, B, C, DS, DNS, E, F, G and H and I want to create a new column (say col2) with the values from the dict here below, how do I map this? (so f.i. 'A' needs to be mapped to 'S' etc..)

dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'} 
like image 830
ad_s Avatar asked Mar 23 '17 15:03

ad_s


People also ask

How do I map a column in Pyspark?

Solution: PySpark SQL function create_map() is used to convert selected DataFrame columns to MapType , create_map() takes a list of columns you wanted to convert as an argument and returns a MapType column.

How do I create a new column from an existing column in Pyspark?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

Can I use dictionary in Pyspark?

As I said in the beginning, PySpark doesn't have a Dictionary type instead it uses MapType to store the dictionary object, below is an example of how to create a DataFrame column MapType using pyspark. sql. types. StructType .


2 Answers

Inefficient solution with UDF (version independent):

from pyspark.sql.types import StringType from pyspark.sql.functions import udf  def translate(mapping):     def translate_(col):         return mapping.get(col)     return udf(translate_, StringType())  df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key']) mapping = {     'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S',      'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}  df.withColumn("value", translate(mapping)("key")) 

with the result:

+-------+-----+ |    key|value| +-------+-----+ |     DS|    S| |      G|   NS| |INVALID| null| +-------+-----+ 

Much more efficient (Spark >= 2.0, Spark < 3.0) is to create a MapType literal:

from pyspark.sql.functions import col, create_map, lit from itertools import chain  mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])  df.withColumn("value", mapping_expr.getItem(col("key"))) 

with the same result:

+-------+-----+ |    key|value| +-------+-----+ |     DS|    S| |      G|   NS| |INVALID| null| +-------+-----+ 

but more efficient execution plan:

== Physical Plan == *Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53] +- Scan ExistingRDD[key#15] 

compared to UDF version:

== Physical Plan == *Project [key#15, pythonUDF0#61 AS value#57] +- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61]    +- Scan ExistingRDD[key#15] 

In Spark >= 3.0 getItem should be replaced with __getitem__ ([]), i.e:

df.withColumn("value", mapping_expr[col("key")]).show() 
like image 193
zero323 Avatar answered Sep 18 '22 14:09

zero323


Sounds like the simplest solution would be to use the replace function: http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace

mapping= {         'A': '1',         'B': '2'     } df2 = df.replace(to_replace=mapping, subset=['yourColName']) 
like image 25
Haim Bendanan Avatar answered Sep 20 '22 14:09

Haim Bendanan