Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to encode categorical features in Apache Spark

I have a set of data based on which I want to create a classification model. Each row has the following form:


There are about 1M users, 2 classes, and 1M products. What I would like to do next is create the sparse vectors (something already supported by MLlib) BUT in order to apply that function I have to create the dense vectors (with the 0s), first. In other words, I have to binarize my data. What's the easiest (or most elegant) way of doing that?

Given that I am a newbie in regards to MLlib, may I ask you to provide a concrete example? I am using MLlib 1.2.


I have ended up with the following piece of code but is turns out to be really slow... Any other ideas provided that I can only use MLlib 1.2?

val data = test11.map(x=> ((x(0) , x(1)) , x(2))).groupByKey().map(x=> (x._1 , x._2.toArray)).map{x=>
  var lt : Array[Double] = new Array[Double](test12.size)
  val id = x._1._1
  val cl = x._1._2
  val dt = x._2
  var i = -1
  test12.foreach{y => i += 1; lt(i) = if(dt contains y) 1.0 else 0.0}
  val vs = Vectors.dense(lt)
  (id , cl , vs)
like image 928
user706838 Avatar asked Aug 07 '15 07:08


People also ask

How does PySpark encode categorical data?

The two most common ways to encode categorical features in Spark are using StringIndexer and OneHotEncoder . StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels] ordered by label frequencies, so the most frequent label gets index 0.

How do you encode categorical data?

Target encoding is the method of converting a categorical value into the mean of the target variable. This type of encoding is a type of bayesian encoding method where bayesian encoders use target variables to encode the categorical value.

How do you do encoding in PySpark?

To perform one-hot encoding in PySpark, we must: convert the categorical column into a numeric column ( 0 , 1 , ...) using StringIndexer. convert the numeric column into one-hot encoded columns using OneHotEncoder.

How do you convert PySpark to categorical data to numerical data?

In pyspark, there are two methods available that we can use for the conversion process: String Indexer and OneHotEncoder. When a feature contains only two categories/ groups, in that case, we can directly apply the StringIndexer method for conversion.

2 Answers

You can use spark.ml's OneHotEncoder.

You first use:

OneHotEncoder.categories(rdd, categoricalFields)

Where categoricalField is the sequence of indexes at which your RDD contains categorical data. categories, given a dataset and the index of columns which are categorical variables, returns a structure that, for each field, describes the values that are present for in the dataset. That map is meant to be used as input to the encode method:

OneHotEncoder.encode(rdd, categories)

Which returns your vectorized RDD[Array[T]].

like image 65
Francois G Avatar answered Oct 09 '22 23:10

Francois G

If using built-in OneHotEncoder is not an option and you have only a single variable implementing poor man's one-hot is more or less straightforward. First lets create an example data:

import org.apache.spark.mllib.linalg.{Vector, Vectors}

val rdd = sc.parallelize(List(
    Array("user1", "class1", "product1"),
    Array("user1", "class1", "product2"),
    Array("user1", "class1", "product5"),
    Array("user2", "class1", "product2"),
    Array("user2", "class1", "product5"),
    Array("user3", "class2", "product1")))

Next we have to create a mapping from a value to the index:

val prodMap = sc.broadcast(rdd.map(_(2)).distinct.zipWithIndex.collectAsMap)

and a simple encoding function:

def encodeProducts(products: Iterable[String]): Vector =  {
        products.map(product => (prodMap.value(product).toInt, 1.0)).toSeq

Finally we can apply it to the dataset:

rdd.map(x => ((x(0), x(1)), x(2))).groupByKey.mapValues(encodeProducts)

It is relatively easy to extend above to handle multiple variables.


If number of products is to large to make broadcasting useful it should be possible to use join instead. First we can create similar mapping from product to index but keep it as a RDD:

import org.apache.spark.HashPartitioner

val nPartitions = ???

val prodMapRDD = rdd
     .partitionBy(new HashPartitioner(nPartitions))

val nProducts = prodMapRDD.count // Should be < Int.MaxValue

Next we reshape input RDD to get PairRDD indexed by product:

val pairs = rdd
    .map(rec => (rec(2), (rec(0), rec(1))))
    .partitionBy(new HashPartitioner(nPartitions))

Finally we can join both

def indicesToVec(n: Int)(indices: Iterable[Long]): Vector = {
     Vectors.sparse(n, indices.map(x => (x.toInt, 1.0)).toSeq)

like image 24
zero323 Avatar answered Oct 09 '22 21:10
