All,
I have a ml pipeline setup as below
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random
val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()
//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)
When i run this, spark seems to setup each discretizer as a separate job. Is there a way to run all the discretizers as a single job with or without a pipeline? Thanks for the help, appreciate it.
You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().
Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column.
By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.
QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins can be set using the numBuckets parameter.
support for this feature has been added in Spark 2.3.0. See release docs
You can now use setInputCols
and setOutputCols
to specify multiple columns, although it seems not to be yet reflected in the official docs. The performance has been greatly increased with this new patch when compared to dealing with each column one job at a time.
Your example may be adapted as follows:
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random
val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()
//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizer = new QuantileDiscretizer()
.setInputCols(continuous)
.setOutputCols(continuous.map(c => s"${c}_disc"))
.setNumBuckets(3)
val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With