Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to normalize or standardize the data having multiple columns/variables in spark using scala?

I am new to apache spark and scala. I have data set like this which I am taking from csv file and converting it into RDD using scala.

+-----------+-----------+----------+
| recent    | Freq      | Monitor  |
+-----------+-----------+----------+
|        1  |       1234 |   199090|
|        4  |       2553|    198613|
|        6  |       3232 |   199090|
|        1  |       8823 |   498831|
|        7  |       2902 |   890000|
|        8  |       7991 |   081097|
|        9  |       7391 |   432370|
|        12 |       6138 |   864981|
|        7  |       6812 |   749821|
+-----------+-----------+----------+

I want to calculate z-score value or to standardize the data. So I am calculating the z-score for each column and then try to combine them so I get standard scale.

Here is my code for calculating the z-score for first column

val scores1 = sorted.map(_.split(",")(0)).cache
val count = scores.count
val mean = scores.sum / count
val devs = scores.map(score => (score - mean) * (score - mean))
val stddev = Math.sqrt(devs.sum / count)
val zscore = sorted.map(x => math.round((x.toDouble - mean)/stddev)) 

How do I calculate for each column ? Or is there any other way to normalize or standardize the data ?

My requirement is to assign the rank(or scale).

Thanks

like image 901
Niranjanp Avatar asked Apr 20 '16 07:04

Niranjanp


People also ask

How do I sort multiple columns in spark?

Using sort() to sort multiple columns In Spark, We can use sort() function of the DataFrame to sort the multiple columns. If you wanted to ascending and descending, use asc and desc on Column.

What is normalization in spark?

Data normalization is a required data preparation step for many Machine Learning algorithms. These algorithms are sensitive to the relative values of the feature attributes. Data normalization is the process of bringing all the attribute values within some desired range.

Should I standardize or normalize my data?

Normalization is useful when your data has varying scales and the algorithm you are using does not make assumptions about the distribution of your data, such as k-nearest neighbors and artificial neural networks. Standardization assumes that your data has a Gaussian (bell curve) distribution.

How do you normalize variable data?

When we normalize a variable we first shift the scale so that it starts at 0, and then compress it so that it ends on 1. We do so by first subtracting the minimum value, and then divide by the new maximum value (which is the old max value minus the old min value).


2 Answers

If you want to standardize the columns, you can use the StandardScaler class from Spark MLlib. Data should be in the form of RDD[Vectors[Double], where Vectors are a part of MLlib Linalg package. You can choose to use mean or standard deviation or both to standardize your data.

import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Vectors

val data = sc.parallelize(Array(
    Array(1.0,2.0,3.0),
    Array(4.0,5.0,6.0),
    Array(7.0,8.0,9.0),
    Array(10.0,11.0,12.0)))

// Converting RDD[Array] to RDD[Vectors]
val features = data.map(a => Vectors.dense(a))
// Creating a Scaler model that standardizes with both mean and SD
val scaler = new StandardScaler(withMean = true, withStd = true).fit(features)
// Scale features using the scaler model
val scaledFeatures = scaler.transform(features)

This scaledFeatures RDD contains the Z-score of all columns.

Hope this answer helps. Check the Documentation for more info.

like image 131
ar7 Avatar answered Nov 15 '22 06:11

ar7


You may want to use below code to perform Standard Scaling on required columns.Vector Assembler is used to select required columns that need to be transformed. StandardScaler constructor also provides you an option to select values of Mean and Standard deviation

Code:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature
import org.apache.spark.ml.feature.StandardScaler

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/data/your_dataset.csv")
df.show(Int.MaxValue)

val assembler = new VectorAssembler().setInputCols(Array("recent","Freq","Monitor")).setOutputCol("features")

val transformVector = assembler.transform(df)

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)

val scalerModel = scaler.fit(transformVector)
val scaledData = scalerModel.transform(transformVector)

scaledData.show() 20, False
scaledData.show(Int.MaxValue)
scaledData.show(20, false)
like image 28
Akash Singh Avatar answered Nov 15 '22 05:11

Akash Singh