Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

OneHotEncoder in Spark Dataframe in Pipeline

I've been trying to get an example running in Spark and Scala with the adult dataset .

Using Scala 2.11.8 and Spark 1.6.1.

The problem (for now) lies in the amount of categorical features in that dataset that all need to be encoded to numbers before a Spark ML algorithm can do its job..

So far I have this:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Adult {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Adult example").setMaster("local[*]")
    val sparkContext = new SparkContext(conf)
    val sqlContext = new SQLContext(sparkContext)

    val data = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true") // Use first line of all files as header
      .option("inferSchema", "true") // Automatically infer data types
      .load("src/main/resources/adult.data")

    val categoricals = data.dtypes filter (_._2 == "StringType")
    val encoders = categoricals map (cat => new OneHotEncoder().setInputCol(cat._1).setOutputCol(cat._1 + "_encoded"))
    val features = data.dtypes filterNot (_._1 == "label") map (tuple => if(tuple._2 == "StringType") tuple._1 + "_encoded" else tuple._1)

    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.01)
    val pipeline = new Pipeline()
      .setStages(encoders ++ Array(lr))

    val model = pipeline.fit(training)
  }
}

However, this doesn't work. Calling pipeline.fit still contains the original string features and thus throws an exception. How can I remove these "StringType" columns in a pipeline? Or maybe I'm doing it completely wrong, so if someone has a different suggestion I'm happy to all input :).

The reason why I choose to follow this flow is because I have an extensive background in Python and Pandas, but am trying to learn both Scala and Spark.

like image 757
Tim Avatar asked Jun 02 '16 08:06

Tim


People also ask

Why do we use Onehotencoder class in PySpark?

A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0] .

How do you make one hot encoding in PySpark?

Interpretable and CSV writable One Hot Encoding in PySpark To create an interpretable One Hot Encoder, we need to create a separate column for each distinct value. This is easily done using pyspark dataframe's in-built withColumn function by passing a UDF (user-defined function) as a parameter.

How do I change a value in a DataFrame PySpark?

You can replace column values of PySpark DataFrame by using SQL string functions regexp_replace(), translate(), and overlay() with Python examples.

What is spark StringIndexer?

Class StringIndexer A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0.


1 Answers

There is one thing that can be rather confusing here if you're used to higher level frameworks. You have to index the features before you can use encoder. As it is explained in the API docs:

one-hot encoder (...) maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index.

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder}

val df = Seq((1L, "foo"), (2L, "bar")).toDF("id", "x")

val categoricals = df.dtypes.filter (_._2 == "StringType") map (_._1)

val indexers = categoricals.map (
  c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx")
)

val encoders = categoricals.map (
  c => new OneHotEncoder().setInputCol(s"${c}_idx").setOutputCol(s"${c}_enc")
)

val pipeline = new Pipeline().setStages(indexers ++ encoders)

val transformed = pipeline.fit(df).transform(df)
transformed.show

// +---+---+-----+-------------+
// | id|  x|x_idx|        x_enc|
// +---+---+-----+-------------+
// |  1|foo|  1.0|    (1,[],[])|
// |  2|bar|  0.0|(1,[0],[1.0])|
// +---+---+-----+-------------+

As you can see there is no need to drop string columns from the pipeline. In practice OneHotEncoder will accept numeric column with NominalAttribute, BinaryAttribute or missing type attribute.

like image 86
zero323 Avatar answered Oct 13 '22 02:10

zero323