Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Spark ML's OneHotEncoder on multiple columns

I've been able to create a pipeline that will allow me to index multiple string columns at once, but I am getting stuck encoding them, because unlike indexing, the encoder is not an estimator so I never call fit according to the OneHotEncoder example in the docs.

import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, 

OneHotEncoder}
import org.apache.spark.ml.Pipeline

val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")

val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()


//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
  cname => new StringIndexer()
    .setInputCol(cname)
    .setOutputCol(s"${cname}_index")
)

// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)


//encoding columns
val indexColumns  = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
    cname => new OneHotEncoder()
     .setInputCol(cname)
     .setOutputCol(s"${cname}_vec")
)



val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)
val df_encoded = one_hot_pipeline.transform(df_indexed)

The OneHotEncoder object doesn't have a fit method so putting it in the same pipeline as the indexers will not work- it throws an error when I call fit on the pipeline. I can also not call transform on the pipeline that I made with the array of pipeline stages, one_hot_encoders.

I have not found a good solution for using the OneHotEncoder without individually creating and calling transform on that transforming itself for all of the columns I want to encode

like image 811
Michael Discenza Avatar asked Dec 08 '15 22:12

Michael Discenza


1 Answers

Spark >= 3.0:

In Spark 3.0 OneHotEncoderEstimator has been renamed to OneHotEncoder:

import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderModel}

val encoder = new OneHotEncoder()
  .setInputCols(indexColumns)
  .setOutputCols(indexColumns map (name => s"${name}_vec"))

Spark >= 2.3

Spark 2.3 introduced new classes OneHotEncoderEstimator, OneHotEncoderModel, which required fitting even if used outside Pipeline, and operate on multiple columns at the same time.

import org.apache.spark.ml.feature.{OneHotEncoderEstimator, OneHotEncoderModel}

val encoder = new OneHotEncoderEstimator()
  .setInputCols(indexColumns)
  .setOutputCols(indexColumns map (name => s"${name}_vec"))


encoder.fit(df_indexed).transform(df_indexed)

Spark < 2.3

Even if transformers you use don't require fitting you have to use fit method to create a PipelineModel which can be used to transform data.

one_hot_pipeline.fit(df_indexed).transform(df_indexed)

On a side note you can combine indexing and encoding into a single Pipeline:

val pipeline = new Pipeline()
  .setStages(index_transformers ++ one_hot_encoders)

val model = pipeline.fit(df)
model.transform(df)

Edit:

Error you see means that one of your columns contains an empty String. It is accepted by indexer but cannot be used for encoding. Depending on you requirements you can drop these or use a dummy label. Unfortunately you cannot use NULLs until SPARK-11569) is resolved.

like image 101
zero323 Avatar answered Oct 25 '22 18:10

zero323