Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to prepare for training data in mllib

TL;DR; How do I use mllib to train my wiki data (text & category) for prediction against tweets?

I have trouble figuring out how to convert my tokenized wiki data so that it can be trained through either NaiveBayes or LogisticRegression. My goal is to use the trained model for comparison against tweets*. I've tried using pipelines with LR and HashingTF with IDF for NaiveBayes but I keep getting wrong predictions. Here's what I've tried:

*Note that I would like to use the many categories in the wiki data for my labels...I've only seen binary classification (it's one category or another)....is it possible to do what I want?

Pipeline w LR

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.RegexTokenizer

case class WikiData(category: String, text: String)
case class LabeledData(category: String, text: String, label: Double)

val wikiData = sc.parallelize(List(WikiData("Spark", "this is about spark"), WikiData("Hadoop","then there is hadoop")))

val categoryMap = wikiData.map(x=>x.category).distinct.zipWithIndex.mapValues(x=>x.toDouble/1000).collectAsMap

val labeledData = wikiData.map(x=>LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF

val tokenizer = new RegexTokenizer()
  .setInputCol("text")
  .setOutputCol("words")
  .setPattern("/W+")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

val model = pipeline.fit(labeledData)

model.transform(labeledData).show

Naive Bayes

val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documentsAsWordSequenceAlready)

import org.apache.spark.mllib.feature.IDF

tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

//to create tfidfLabeled (below) I ran a map set the labels...but again it seems to have to be 1.0 or 0.0?

NaiveBayes.train(tfidfLabeled)
  .predict(hashingTF.transform(tweet))
  .collect
like image 960
Justin Pihony Avatar asked Oct 19 '22 02:10

Justin Pihony


1 Answers

ML LogisticRegression doesn't support multinomial classification yet, but it is supported by both MLLib NaiveBayes and LogisticRegressionWithLBFGS. In the first case it should work by default:

import org.apache.spark.mllib.classification.NaiveBayes

val nbModel = new NaiveBayes()
  .setModelType("multinomial") // This is default value
  .run(train)

but for logistic regression you should provide a number of classes:

import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS

val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(n) // Set number of classes
  .run(trainingData)

Regarding preprocessing steps it is a quite broad topic and it is hard to give you a meaningful advice without an access to your data so everything you find below is just a wild guess:

  • as far I understand you use wiki data for training and tweets for testing. If that's true it is generally speaking a bad idea. You can expect that both sets use significantly different vocabulary, grammar and spelling
  • simple regex tokenizer can perform pretty well on standardized text but from my experience it won't work well on informal text like tweets
  • HashingTF can be a good way to obtain a baseline model but it is extremely simplified approach, especially if you don't apply any filtering steps. If you decide to use it you should at least increase number of features or use a default value (2^20)

EDIT (Preparing data for Naive Bayes with IDF)

  • using ML Pipelines:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.IDF
import org.apache.spark.sql.Row

val tokenizer = ???

val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("rawFeatures")

val idf = new IDF()
  .setInputCol(hashingTF.getOutputCol)
  .setOutputCol("features")

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))
val model = pipeline.fit(labeledData)

model
 .transform(labeledData)
 .select($"label", $"features")
 .map{case Row(label: Double, features: Vector) => LabeledPoint(label, features)}
  • using MLlib transformers:
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.feature.{IDF, IDFModel}

val labeledData = wikiData.map(x => 
  LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0)))

val p = "\\W+".r
val raw = labeledData.map{
    case LabeledData(_, text, label) => (label, p.split(text))}

val hashingTF: org.apache.spark.mllib.feature.HashingTF = new HashingTF(1000)
val tf = raw.map{case (label, text) => (label, hashingTF.transform(text))}

val idf: org.apache.spark.mllib.feature.IDFModel = new IDF().fit(tf.map(_._2))
tf.map{
  case (label, rawFeatures) => LabeledPoint(label, idf.transform(rawFeatures))}

Note: Since transformers require JVM access MLlib version won't work in PySpark. If you prefer Python you have to split data transform and zip.

EDIT (Preparing data for ML algorithms):

While following piece of code looks valid at first glance

val categoryMap = wikiData
  .map(x=>x.category)
  .distinct
  .zipWithIndex
  .mapValues(x=>x.toDouble/1000)
  .collectAsMap

val labeledData = wikiData.map(x=>LabeledData(
    x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF

it won't generate valid labels for ML algorithms.

First of all ML expects labels to be in (0.0, 1.0, ..., n.0) where n is number of classes. If your example pipeline where one of the classes get label 0.001 you'll get an error like this:

ERROR LogisticRegression: Classification labels should be in {0 to 0 Found 1 invalid labels.

The obvious solution is to avoid division when you generate mapping

.mapValues(x=>x.toDouble)

While it will work for LogisticRegression other ML algorithms will still fail. For example with RandomForestClassifier you'll get

RandomForestClassifier was given input with invalid label column label, without the number of classes specified. See StringIndexer.

What it interesting ML version of RandomForestClassifier, unlike its MLlib counterpart, doesn't provide a method to set a number of classes. Turns out it expects special attributes to be set on a DataFrame column. The simplest approach is to use StringIndexer mentioned in the error message:

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("label")

val pipeline = new Pipeline()
  .setStages(Array(indexer, tokenizer, hashingTF, idf, lr))

val model = pipeline.fit(wikiData.toDF)
like image 147
zero323 Avatar answered Nov 01 '22 12:11

zero323