Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark - Add map function as column

I have a pyspark DataFrame

a = [
    ('Bob', 562),
    ('Bob',880),
    ('Bob',380),
    ('Sue',85),
    ('Sue',963)
] 
df = spark.createDataFrame(a, ["Person", "Amount"])

I need to create a column that hashes the Amount and returns the amount. The problem is I can't use a UDF so I have used a mapping function.

df.rdd.map(lambda x: hash(x["Amount"]))
like image 462
Bryce Ramgovind Avatar asked Apr 17 '18 13:04

Bryce Ramgovind


People also ask

How do I map a column in PySpark?

Solution: PySpark SQL function create_map() is used to convert selected DataFrame columns to MapType , create_map() takes a list of columns you wanted to convert as an argument and returns a MapType column.

How do you apply a function to a column in PySpark?

The syntax for Pyspark Apply Function to ColumnThe Import is to be used for passing the user-defined function. B:- The Data frame model used and the user-defined function that is to be passed for the column name. It takes up the column name as the parameter, and the function can be passed along.

How do I create a column map in spark?

We can create a map column using createMapType() function on the DataTypes class. This method takes two arguments keyType and valueType as mentioned above and these two arguments should be of a type that extends DataType. This snippet creates “mapCol” object of type MapType with key and values as String type.


1 Answers

If you can't use udf you can use the map function, but as you've currently written it, there will only be one column. To keep all the columns, do the following:

df = df.rdd\
    .map(lambda x: (x["Person"], x["Amount"], hash(str(x["Amount"]))))\
    .toDF(["Person", "Amount", "Hash"])

df.show()
#+------+------+--------------------+
#|Person|Amount|                Hash|
#+------+------+--------------------+
#|   Bob|   562|-4340709941618811062|
#|   Bob|   880|-7718876479167384701|
#|   Bob|   380|-2088598916611095344|
#|   Sue|    85|    7168043064064671|
#|   Sue|   963|-8844931991662242457|
#+------+------+--------------------+

Note: In this case, hash(x["Amount"]) is not very interesting so I changed it to hash Amount converted to a string.

Essentially you have to map the row to a tuple containing all of the existing columns and add in the new column(s).

If your columns are too many to enumerate, you could also just add a tuple to the existing row.

df = df.rdd\
    .map(lambda x: x + (hash(str(x["Amount"])),))\
    .toDF(df.columns + ["Hash"])\

I should also point out that if hashing the values is your end goal, there is also a pyspark function pyspark.sql.functions.hash that can be used to avoid the serialization to rdd:

import pyspark.sql.functions as f
df.withColumn("Hash", f.hash("Amount")).show()
#+------+------+----------+
#|Person|Amount|      Hash|
#+------+------+----------+
#|   Bob|   562|  51343841|
#|   Bob|   880|1241753636|
#|   Bob|   380| 514174926|
#|   Sue|    85|1944150283|
#|   Sue|   963|1665082423|
#+------+------+----------+

This appears to use a different hashing algorithm than the python builtin.

like image 140
pault Avatar answered Sep 27 '22 23:09

pault