I am trying to normalize a column in SPARK DataFrame using python.
My dataset:
--------------------------
userID|Name|Revenue|No.of.Days|
--------------------------
1 A 12560 45
2 B 2312890 90
. . . .
. . . .
. . . .
--------------------------
In this dataset, except the userID and Name, I have to normalize the Revenue and No.of Days.
The output should look like this
userID|Name|Revenue|No.of.Days|
--------------------------
1 A 0.5 0.5
2 B 0.9 1
. . 1 0.4
. . 0.6 .
. . . .
--------------------------
The formula used to calculate or normalizing the values in each column is
val = (ei-min)/(max-min)
ei = column value at i th position
min = min value in that column
max = max value in that column
How can I do this in easy steps using PySpark?
Hope the following code suffices your requirement.
Code :
df = spark.createDataFrame([ (1, 'A',12560,45),
(1, 'B',42560,90),
(1, 'C',31285,120),
(1, 'D',10345,150)
], ["userID", "Name","Revenue","No_of_Days"])
print("Before Scaling :")
df.show(5)
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())
# Iterating over columns to be scaled
for i in ["Revenue","No_of_Days"]:
# VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])
# Fitting pipeline on dataframe
df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
print("After Scaling :")
df.show(5)
Output:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With