I have a pyspark DataFrame
a = [
('Bob', 562),
('Bob',880),
('Bob',380),
('Sue',85),
('Sue',963)
]
df = spark.createDataFrame(a, ["Person", "Amount"])
I need to create a column that hashes the Amount
and returns the amount. The problem is I can't use a UDF
so I have used a mapping function.
df.rdd.map(lambda x: hash(x["Amount"]))
Solution: PySpark SQL function create_map() is used to convert selected DataFrame columns to MapType , create_map() takes a list of columns you wanted to convert as an argument and returns a MapType column.
The syntax for Pyspark Apply Function to ColumnThe Import is to be used for passing the user-defined function. B:- The Data frame model used and the user-defined function that is to be passed for the column name. It takes up the column name as the parameter, and the function can be passed along.
We can create a map column using createMapType() function on the DataTypes class. This method takes two arguments keyType and valueType as mentioned above and these two arguments should be of a type that extends DataType. This snippet creates “mapCol” object of type MapType with key and values as String type.
If you can't use udf
you can use the map
function, but as you've currently written it, there will only be one column. To keep all the columns, do the following:
df = df.rdd\
.map(lambda x: (x["Person"], x["Amount"], hash(str(x["Amount"]))))\
.toDF(["Person", "Amount", "Hash"])
df.show()
#+------+------+--------------------+
#|Person|Amount| Hash|
#+------+------+--------------------+
#| Bob| 562|-4340709941618811062|
#| Bob| 880|-7718876479167384701|
#| Bob| 380|-2088598916611095344|
#| Sue| 85| 7168043064064671|
#| Sue| 963|-8844931991662242457|
#+------+------+--------------------+
Note: In this case, hash(x["Amount"])
is not very interesting so I changed it to hash Amount
converted to a string.
Essentially you have to map the row to a tuple containing all of the existing columns and add in the new column(s).
If your columns are too many to enumerate, you could also just add a tuple to the existing row.
df = df.rdd\
.map(lambda x: x + (hash(str(x["Amount"])),))\
.toDF(df.columns + ["Hash"])\
I should also point out that if hashing the values is your end goal, there is also a pyspark function pyspark.sql.functions.hash
that can be used to avoid the serialization to rdd
:
import pyspark.sql.functions as f
df.withColumn("Hash", f.hash("Amount")).show()
#+------+------+----------+
#|Person|Amount| Hash|
#+------+------+----------+
#| Bob| 562| 51343841|
#| Bob| 880|1241753636|
#| Bob| 380| 514174926|
#| Sue| 85|1944150283|
#| Sue| 963|1665082423|
#+------+------+----------+
This appears to use a different hashing algorithm than the python builtin.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With