Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MinMax Normalization in scala

I have an org.apache.spark.sql.DataFrame with multiple columns. I want to scale 1 column (lat_long_dist) using MinMax Normalization or any technique to scale the data between -1 and 1 and retain the data type as org.apache.spark.sql.DataFrame

scala> val df = sqlContext.csvFile("tenop.csv")
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string,
  ip_crowding: string, lat_long_dist: double, stream_name_1: string]

I found the StandardScaler option but that requires to transform the dataset before I can do the transformation.Is there a simple clean way.

like image 206
steven Avatar asked Nov 25 '15 19:11

steven


3 Answers

Here's another suggestion when you are already playing with Spark.

Why don't you use MinMaxScaler in ml package?

Let's try this with the same example from zero323.

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.functions.udf

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v"))))

val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) )
val df2 = df.withColumn("vVec", vectorizeCol(df("v"))

val scaler = new MinMaxScaler()
    .setInputCol("vVec")
    .setOutputCol("vScaled")
    .setMax(1)
    .setMin(-1)

scaler.fit(df2).transform(df2).show
+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

Take advantage of scaling multiple columns at once.

val df = sc.parallelize(Seq(
    (1.0, -1.0, 2.0),
    (2.0, 0.0, 0.0),
    (0.0, 1.0, -1.0)
)).toDF("a", "b", "c")

import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler()
    .setInputCols(Array("a", "b", "c"))
    .setOutputCol("features")

val df2 = assembler.transform(df)

// Reusing the scaler instance above with the same min(-1) and max(1) 
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show
+---+----+----+--------------+--------------------+
|  a|   b|   c|      features|      scaledFeatures|
+---+----+----+--------------+--------------------+
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]|      [0.0,-1.0,1.0]|
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...|
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]|     [-1.0,1.0,-1.0]|
+---+----+----+--------------+--------------------+
like image 97
Lyle Avatar answered Oct 05 '22 12:10

Lyle


I guess what you want is something like this

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{min, max, lit}

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match {
  case Row(x: Double, y: Double) => (x, y)
}

val scaledRange = lit(2) // Range of the scaled variable
val scaledMin = lit(-1)  // Min value of the scaled variable
val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range

val vScaled = scaledRange * vNormalized + scaledMin

df.withColumn("vScaled", vScaled).show

// +---+-----+--------------------+
// |  k|    v|             vScaled|
// +---+-----+--------------------+
// |  1|  0.5| -0.3093093093093092|
// |  2| 10.2| 0.27327327327327344|
// |  3|  5.7|0.003003003003003...|
// |  4|-11.0|                -1.0|
// |  5| 22.3|                 1.0|
// +---+-----+--------------------+
like image 44
zero323 Avatar answered Oct 05 '22 13:10

zero323


there is another solution. Take codes from Matt, Lyle and zero323, thanks!

import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler}

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

val assembler = new VectorAssembler().setInputCols(Array("v")).setOutputCol("vVec")
val df2= assembler.transform(df)

val scaler = new MinMaxScaler().setInputCol("vVec").setOutputCol("vScaled").setMax(1).setMin(-1)

scaler.fit(df2).transform(df2).show

result:

+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

btw: the other solutions produce error on my side

java.lang.IllegalArgumentException: requirement failed: Column vVec must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:43)
  at org.apache.spark.ml.feature.MinMaxScalerParams$class.validateAndTransformSchema(MinMaxScaler.scala:67)
  at org.apache.spark.ml.feature.MinMaxScaler.validateAndTransformSchema(MinMaxScaler.scala:93)
  at org.apache.spark.ml.feature.MinMaxScaler.transformSchema(MinMaxScaler.scala:129)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.feature.MinMaxScaler.fit(MinMaxScaler.scala:119)
  ... 50 elided

THANKS A LOT whatever!

like image 41
ShawnXuan Avatar answered Oct 05 '22 12:10

ShawnXuan