Using Spark 1.6, I have a Spark DataFrame column
(named let's say col1
) with values A, B, C, DS, DNS, E, F, G and H and I want to create a new column (say col2
) with the values from the dict
here below, how do I map this? (so f.i. 'A' needs to be mapped to 'S' etc..)
dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}
Solution: PySpark SQL function create_map() is used to convert selected DataFrame columns to MapType , create_map() takes a list of columns you wanted to convert as an argument and returns a MapType column.
In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
As I said in the beginning, PySpark doesn't have a Dictionary type instead it uses MapType to store the dictionary object, below is an example of how to create a DataFrame column MapType using pyspark. sql. types. StructType .
Inefficient solution with UDF (version independent):
from pyspark.sql.types import StringType from pyspark.sql.functions import udf def translate(mapping): def translate_(col): return mapping.get(col) return udf(translate_, StringType()) df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key']) mapping = { 'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'} df.withColumn("value", translate(mapping)("key"))
with the result:
+-------+-----+ | key|value| +-------+-----+ | DS| S| | G| NS| |INVALID| null| +-------+-----+
Much more efficient (Spark >= 2.0, Spark < 3.0) is to create a MapType
literal:
from pyspark.sql.functions import col, create_map, lit from itertools import chain mapping_expr = create_map([lit(x) for x in chain(*mapping.items())]) df.withColumn("value", mapping_expr.getItem(col("key")))
with the same result:
+-------+-----+ | key|value| +-------+-----+ | DS| S| | G| NS| |INVALID| null| +-------+-----+
but more efficient execution plan:
== Physical Plan == *Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53] +- Scan ExistingRDD[key#15]
compared to UDF version:
== Physical Plan == *Project [key#15, pythonUDF0#61 AS value#57] +- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61] +- Scan ExistingRDD[key#15]
In Spark >= 3.0 getItem
should be replaced with __getitem__
([]
), i.e:
df.withColumn("value", mapping_expr[col("key")]).show()
Sounds like the simplest solution would be to use the replace function: http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace
mapping= { 'A': '1', 'B': '2' } df2 = df.replace(to_replace=mapping, subset=['yourColName'])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With