Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scale(Normalise) a column in SPARK Dataframe - Pyspark

I am trying to normalize a column in SPARK DataFrame using python.

My dataset:

--------------------------
userID|Name|Revenue|No.of.Days|
--------------------------
1      A     12560    45
2      B     2312890  90
.      .       .       .
.      .       .       .
.      .       .       .
--------------------------

In this dataset, except the userID and Name, I have to normalize the Revenue and No.of Days.

The output should look like this


userID|Name|Revenue|No.of.Days|
--------------------------
1      A     0.5     0.5
2      B     0.9       1
.      .       1     0.4
.      .     0.6       .
.      .       .       .
--------------------------

The formula used to calculate or normalizing the values in each column is

val = (ei-min)/(max-min)
ei = column value at i th position
min = min value in that column
max = max value in that column

How can I do this in easy steps using PySpark?

like image 519
Jack Daniel Avatar asked Oct 31 '16 06:10

Jack Daniel


Video Answer


1 Answers

Hope the following code suffices your requirement.

Code :

df = spark.createDataFrame([ (1, 'A',12560,45),
                             (1, 'B',42560,90),
                             (1, 'C',31285,120),
                             (1, 'D',10345,150)
                           ], ["userID", "Name","Revenue","No_of_Days"])

print("Before Scaling :")
df.show(5)

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterating over columns to be scaled
for i in ["Revenue","No_of_Days"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

print("After Scaling :")
df.show(5)

Output:

Output

like image 95
Rinaz Belhaj Avatar answered Sep 21 '22 14:09

Rinaz Belhaj