I have an org.apache.spark.sql.DataFrame with multiple columns. I want to scale 1 column (lat_long_dist) using MinMax Normalization or any technique to scale the data between -1 and 1 and retain the data type as org.apache.spark.sql.DataFrame
scala> val df = sqlContext.csvFile("tenop.csv")
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string,
ip_crowding: string, lat_long_dist: double, stream_name_1: string]
I found the StandardScaler option but that requires to transform the dataset before I can do the transformation.Is there a simple clean way.
Here's another suggestion when you are already playing with Spark.
Why don't you use MinMaxScaler in ml package?
Let's try this with the same example from zero323.
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.functions.udf
val df = sc.parallelize(Seq(
(1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v"))))
val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) )
val df2 = df.withColumn("vVec", vectorizeCol(df("v"))
val scaler = new MinMaxScaler()
.setInputCol("vVec")
.setOutputCol("vScaled")
.setMax(1)
.setMin(-1)
scaler.fit(df2).transform(df2).show
+---+-----+-------+--------------------+
| k| v| vVec| vScaled|
+---+-----+-------+--------------------+
| 1| 0.5| [0.5]|[-0.3093093093093...|
| 2| 10.2| [10.2]|[0.27327327327327...|
| 3| 5.7| [5.7]|[0.00300300300300...|
| 4|-11.0|[-11.0]| [-1.0]|
| 5| 22.3| [22.3]| [1.0]|
+---+-----+-------+--------------------+
Take advantage of scaling multiple columns at once.
val df = sc.parallelize(Seq(
(1.0, -1.0, 2.0),
(2.0, 0.0, 0.0),
(0.0, 1.0, -1.0)
)).toDF("a", "b", "c")
import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler()
.setInputCols(Array("a", "b", "c"))
.setOutputCol("features")
val df2 = assembler.transform(df)
// Reusing the scaler instance above with the same min(-1) and max(1)
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show
+---+----+----+--------------+--------------------+
| a| b| c| features| scaledFeatures|
+---+----+----+--------------+--------------------+
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]| [0.0,-1.0,1.0]|
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...|
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]| [-1.0,1.0,-1.0]|
+---+----+----+--------------+--------------------+
I guess what you want is something like this
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{min, max, lit}
val df = sc.parallelize(Seq(
(1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match {
case Row(x: Double, y: Double) => (x, y)
}
val scaledRange = lit(2) // Range of the scaled variable
val scaledMin = lit(-1) // Min value of the scaled variable
val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range
val vScaled = scaledRange * vNormalized + scaledMin
df.withColumn("vScaled", vScaled).show
// +---+-----+--------------------+
// | k| v| vScaled|
// +---+-----+--------------------+
// | 1| 0.5| -0.3093093093093092|
// | 2| 10.2| 0.27327327327327344|
// | 3| 5.7|0.003003003003003...|
// | 4|-11.0| -1.0|
// | 5| 22.3| 1.0|
// +---+-----+--------------------+
there is another solution. Take codes from Matt, Lyle and zero323, thanks!
import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler}
val df = sc.parallelize(Seq(
(1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
val assembler = new VectorAssembler().setInputCols(Array("v")).setOutputCol("vVec")
val df2= assembler.transform(df)
val scaler = new MinMaxScaler().setInputCol("vVec").setOutputCol("vScaled").setMax(1).setMin(-1)
scaler.fit(df2).transform(df2).show
result:
+---+-----+-------+--------------------+
| k| v| vVec| vScaled|
+---+-----+-------+--------------------+
| 1| 0.5| [0.5]|[-0.3093093093093...|
| 2| 10.2| [10.2]|[0.27327327327327...|
| 3| 5.7| [5.7]|[0.00300300300300...|
| 4|-11.0|[-11.0]| [-1.0]|
| 5| 22.3| [22.3]| [1.0]|
+---+-----+-------+--------------------+
btw: the other solutions produce error on my side
java.lang.IllegalArgumentException: requirement failed: Column vVec must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:43)
at org.apache.spark.ml.feature.MinMaxScalerParams$class.validateAndTransformSchema(MinMaxScaler.scala:67)
at org.apache.spark.ml.feature.MinMaxScaler.validateAndTransformSchema(MinMaxScaler.scala:93)
at org.apache.spark.ml.feature.MinMaxScaler.transformSchema(MinMaxScaler.scala:129)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.feature.MinMaxScaler.fit(MinMaxScaler.scala:119)
... 50 elided
THANKS A LOT whatever!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With