I am trying to implement a document classifier using Apache Spark MLlib and I am having some problems representing the data. My code is the following:
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.ml.feature.IDF
val sql = new SQLContext(sc)
// Load raw data from a TSV file
val raw = sc.textFile("data.tsv").map(_.split("\t").toSeq)
// Convert the RDD to a dataframe
val schema = StructType(List(StructField("class", StringType), StructField("content", StringType)))
val dataframe = sql.createDataFrame(raw.map(row => Row(row(0), row(1))), schema)
// Tokenize
val tokenizer = new Tokenizer().setInputCol("content").setOutputCol("tokens")
val tokenized = tokenizer.transform(dataframe)
// TF-IDF
val htf = new HashingTF().setInputCol("tokens").setOutputCol("rawFeatures").setNumFeatures(500)
val tf = htf.transform(tokenized)
tf.cache
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(tf)
val tfidf = idfModel.transform(tf)
// Create labeled points
val labeled = tfidf.map(row => LabeledPoint(row.getDouble(0), row.get(4)))
I need to use dataframes to generate the tokens and create the TF-IDF features. The problem appears when I try to convert this dataframe to a RDD[LabeledPoint]. I map the dataframe rows, but the get method of Row return an Any type, not the type defined on the dataframe schema (Vector). Therefore, I cannot construct the RDD I need to train a ML model.
What is the best option to get a RDD[LabeledPoint] after calculating a TF-IDF?
rdd is used to convert PySpark DataFrame to RDD; there are several transformations that are not available in DataFrame but present in RDD hence you often required to convert PySpark DataFrame to RDD. Since PySpark 1.3, it provides a property .
The SparkSession object has a utility method for creating a DataFrame – createDataFrame. This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema.
LabeledPoint (label: float, features: Iterable[float])[source] Class that represents the features and labels of a data point. New in version 1.0. 0.
In PySpark, toDF() function of the RDD is used to convert RDD to DataFrame.
Casting the object worked for me.
Try:
// Create labeled points
val labeled = tfidf.map(row => LabeledPoint(row.getDouble(0), row(4).asInstanceOf[Vector]))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With