Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use spark quantilediscretizer on multiple columns

All,

I have a ml pipeline setup as below

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)

When i run this, spark seems to setup each discretizer as a separate job. Is there a way to run all the discretizers as a single job with or without a pipeline? Thanks for the help, appreciate it.

like image 314
sramalingam24 Avatar asked Apr 26 '17 16:04

sramalingam24


People also ask

How do I add multiple columns in spark?

You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().

How do I merge two columns in spark in Scala?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column.

How many rows of data can spark handle?

By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.

What is QuantileDiscretizer?

QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins can be set using the numBuckets parameter.


1 Answers

support for this feature has been added in Spark 2.3.0. See release docs

  • Multiple column support for several feature transformers:
    • [SPARK-13030]: OneHotEncoderEstimator (Scala/Java/Python)
    • [SPARK-22397]: QuantileDiscretizer (Scala/Java)
    • [SPARK-20542]: Bucketizer (Scala/Java/Python)

You can now use setInputCols and setOutputCols to specify multiple columns, although it seems not to be yet reflected in the official docs. The performance has been greatly increased with this new patch when compared to dealing with each column one job at a time.

Your example may be adapted as follows:

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)

val discretizer = new QuantileDiscretizer()
  .setInputCols(continuous)
  .setOutputCols(continuous.map(c => s"${c}_disc"))
  .setNumBuckets(3)

val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)
like image 79
jarias Avatar answered Sep 28 '22 11:09

jarias