Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create correct data frame for classification in Spark ML

I am trying to run random forest classification by using Spark ML api but I am having issues with creating right data frame input into pipeline.

Here is sample data:

age,hours_per_week,education,sex,salaryRange 38,40,"hs-grad","male","A" 28,40,"bachelors","female","A" 52,45,"hs-grad","male","B" 31,50,"masters","female","B" 42,40,"bachelors","male","B" 

age and hours_per_week are integers while other features including label salaryRange are categorical (String)

Loading this csv file (lets call it sample.csv) can be done by Spark csv library like this:

val data = sqlContext.csvFile("/home/dusan/sample.csv") 

By default all columns are imported as string so we need to change "age" and "hours_per_week" to Int:

val toInt    = udf[Int, String]( _.toInt) val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week"))) 

Just to check how schema looks now:

scala> dataFixed.printSchema root  |-- age: integer (nullable = true)  |-- hours_per_week: integer (nullable = true)  |-- education: string (nullable = true)  |-- sex: string (nullable = true)  |-- salaryRange: string (nullable = true) 

Then lets set the cross validator and pipeline:

val rf = new RandomForestClassifier() val pipeline = new Pipeline().setStages(Array(rf))  val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator) 

Error shows up when running this line:

val cmModel = cv.fit(dataFixed) 

java.lang.IllegalArgumentException: Field "features" does not exist.

It is possible to set label column and feature column in RandomForestClassifier ,however I have 4 columns as predictors (features) not only one.

How I should organize my data frame so it has label and features columns organized correctly?

For your convenience here is full code :

import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.tuning.CrossValidator import org.apache.spark.ml.Pipeline import org.apache.spark.sql.DataFrame  import org.apache.spark.sql.functions._ import org.apache.spark.mllib.linalg.{Vector, Vectors}   object SampleClassification {    def main(args: Array[String]): Unit = {      //set spark context     val conf = new SparkConf().setAppName("Simple Application").setMaster("local");     val sc = new SparkContext(conf)     val sqlContext = new org.apache.spark.sql.SQLContext(sc)      import sqlContext.implicits._     import com.databricks.spark.csv._      //load data by using databricks "Spark CSV Library"      val data = sqlContext.csvFile("/home/dusan/sample.csv")      //by default all columns are imported as string so we need to change "age" and  "hours_per_week" to Int     val toInt    = udf[Int, String]( _.toInt)     val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))       val rf = new RandomForestClassifier()      val pipeline = new Pipeline().setStages(Array(rf))      val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)      // this fails with error     //java.lang.IllegalArgumentException: Field "features" does not exist.     val cmModel = cv.fit(dataFixed)    }  } 

Thanks for help!

like image 837
Dusan Grubjesic Avatar asked Jun 24 '15 14:06

Dusan Grubjesic


People also ask

Which object is used for classification in Spark?

For classification tasks in Spark, you have logistic regression, naïve Bayes, support vector machines (SVM), decision trees, and random forests at your disposal.

What is a Spark data frame?

What Are DataFrames? In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.


Video Answer


1 Answers

As of Spark 1.4, you can use Transformer org.apache.spark.ml.feature.VectorAssembler. Just provide column names you want to be features.

val assembler = new VectorAssembler()   .setInputCols(Array("col1", "col2", "col3"))   .setOutputCol("features") 

and add it to your pipeline.

like image 118
WeiChing 林煒清 Avatar answered Sep 26 '22 08:09

WeiChing 林煒清