I have labeled vectors (LabeledPoint-s) taged by some group number. For every group I need to create a separate Logistic Regression classifier:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val trainRDD = sc.parallelize(train)
val modelByGroup = trainRDD.groupByKey().map({case (group, iter) =>
(group, new LogisticRegressionWithLBFGS().run(iter))})
}
}
LogisticRegressionWithLBFGS().run(iter)
does not compile because run
works with RDD
and not with iterator that groupBy
returns.
Please advise how to build as many classifiers as there are groups (tags) in the input data.
Update - demonstrates that nested RDD iteration does not work:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val trainRDD = sc.parallelize(train)
val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct
for (key <- keys) {
// key is Int here!
// Get train data for the current group (key):
val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache()
/**
* Which results in org.apache.spark.SparkException:
* RDD transformations and actions can only be invoked by the driver,
* not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid
* because the values transformation and count action cannot be performed inside of the rdd1.map transformation.
* For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
*/
}
}
}
Looks like there is no way to use transformations inside other transformations, correct?
If your using classifier on each group you don't need mllib. Mllib is designed to use with distributed sets (your sets are not you have butch of local sets on each worker). You can just use some local machine learning library like weka on each group in map function.
EDIT:
val keys = wholeRDD.map(_._1).distinct.collect
var models = List()
for (key <- keys) {
val valuesForKey = wholeRDD.filter(_._1 == key)
// train model
...
models = model::models
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With