I am trying to create a Spark ML Pipeline with the Random Forest Classifier to perform classification (not regression), but I am getting an error saying the predicted label in my training set should be double instead of an integer. I am following instructions from these pages:
"Classification and regression - spark.ml" (apache.org)
"How to create correct data frame for classification in Spark ML" (stack overflow.com)
"Spark MLLib - Predict Store Sales with ML Pipelines" (sparktutorials.net)
I have a Spark dataframe with following columns:
scala> df.show(5)
+-------+----------+----------+---------+-----+
| userId|duration60|duration30|duration1|label|
+-------+----------+----------+---------+-----+
|user000| 11| 21| 35| 3|
|user001| 28| 41| 28| 4|
|user002| 17| 6| 8| 2|
|user003| 39| 29| 0| 1|
|user004| 26| 23| 25| 3|
+-------+----------+----------+---------+-----+
scala> df.printSchema()
root
|-- userId: string (nullable = true)
|-- duration60: integer (nullable = true)
|-- duration30: integer (nullable = true)
|-- duration1: integer (nullable = true)
|-- label: integer (nullable = true)
I am using the feature columns duration60, duration30, and duration1 to predict the categorical column label.
I then set up my Spark script like so:
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.{Pipeline, PipelineModel}
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.
format("com.databricks.spark.csv").
option("header", "true"). // Use first line of all files as header
option("inferSchema", "true"). // Automatically infer data types
load("/tmp/features.csv").
withColumnRenamed("satisfaction", "label").
select("userId", "duration60", "duration30", "duration1", "label")
val assembler = new VectorAssembler().
setInputCols(Array("duration60", "duration30", "duration1")).
setOutputCol("features")
val randomForest = new RandomForestClassifier().
setLabelCol("label").
setFeaturesCol("features").
setNumTrees(10)
var pipeline = new Pipeline().setStages(Array(assembler, randomForest))
var model = pipeline.fit(df);
The transformed dataframe is the following:
scala> assembler.transform(df).show(5)
+-------+----------+----------+---------+-----+----------------+
| userId|duration60|duration30|duration1|label| features|
+-------+----------+----------+---------+-----+----------------+
|user000| 11| 21| 35| 3|[11.0,21.0,35.0]|
|user001| 28| 41| 28| 4|[28.0,41.0,28.0]|
|user002| 17| 6| 8| 2| [17.0,6.0,8.0]|
|user003| 39| 29| 0| 1| [39.0,29.0,0.0]|
|user004| 26| 23| 25| 3|[26.0,23.0,25.0]|
+-------+----------+----------+---------+-----+----------------+
However, the last line throws an exception:
java.lang.IllegalArgumentException: requirement failed: Column label must be of type DoubleType but was actually IntegerType.
What does this mean, and how do I fix it?
Why does the label
column need to be a double? I am doing prediction, not regression, so I thought a string or an integer is proper. A double value for a predicted column usually implies regression.
Do the cast DoubleType
since that is the type the algorithm expects.
import org.apache.spark.sql.types._
df.withColumn("label", 'label cast DoubleType)
So, just before you val df
in your application, in the last line in the sequence do the casting:
import org.apache.spark.sql.types._
val df = sqlContext.read.
format("com.databricks.spark.csv").
option("header", "true"). // Use first line of all files as header
option("inferSchema", "true"). // Automatically infer data types
load("/tmp/features.csv").
withColumnRenamed("satisfaction", "label").
select("userId", "duration60", "duration30", "duration1", "label")
.withColumn("label", 'label cast DoubleType) // <-- HERE
Note that I've used 'label
symbol (a single quote '
followed by a name) to reference the column label
(which I might have also done using $"label"
or col("label")
or df("label")
or column("label")
).
In pyspark
from pyspark.sql.types import DoubleType
df = df.withColumn("label", df.label.cast(DoubleType()))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With