Let's say I have a DataFrame (that I read in from a csv on HDFS) and I want to train some algorithms on it via MLlib. How do I convert the rows into LabeledPoints or otherwise utilize MLlib on this dataset?
MLlib automated MLflow tracking is deprecated on clusters that run Databricks Runtime 10.1 ML and above, and it is disabled by default on clusters running Databricks Runtime 10.2 ML and above. Instead, use MLflow PySpark ML autologging by calling mlflow.
Is MLlib deprecated? No. MLlib includes both the RDD-based API and the DataFrame-based API. The RDD-based API is now in maintenance mode.
MLlib allows for preprocessing, munging, training of models, and making predictions at scale on data. You can even use models trained in MLlib to make predictions in Structured Streaming.
spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines. MLlib will still support the RDD-based API in spark. mllib with bug fixes. MLlib will not add new features to the RDD-based API.
RDD based Mllib is on its way to be deprecated, so you should rather use DataFrame based Mllib.
Generally the input to these MLlib apis is a DataFrame containing 2 columns - label and feature. There are various methods to build this DataFrame - low level apis like org.apache.spark.mllib.linalg.{Vector, Vectors}, org.apache.spark.mllib.regression.LabeledPoint, org.apache.spark.mllib.linalg.{Matrix, Matrices} etc. They all take numeric values for feature and label. Words can be converted to vectors using - org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}. This documentation explains more - https://spark.apache.org/docs/latest/mllib-data-types.html
Once input dataframe with label and feature is created, instantiate the MLlib api and pass in the DataFrame to 'fit' function to get the model and then call 'transform' or 'predict' function on the model to get the results.
Example -
training file looks like -
<numeric label> <a string separated by space>
//Build word vector
val trainingData = spark.read.parquet(<path to training file>)
val sampleDataDf = trainingData
.map { r =>
val s = r.getAs[String]("value").split(" ")
val label = s.head.toDouble
val feature = s.tail
(label, feature)
}.toDF("lable","feature_words")
val word2Vec = new Word2Vec()
.setInputCol("feature_words")
.setOutputCol("feature_vectors")
.setMinCount(0)
.setMaxIter(10)
//build word2Vector model
val model = word2Vec.fit(sampleDataDf)
//convert training text data to vector and labels
val wVectors = model.transform(sampleDataDf)
//train LinearSVM model
val svmAlgorithm = new LinearSVC()
.setFeaturesCol("feature_vectors")
.setMaxIter(100)
.setLabelCol("lable")
.setRegParam(0.01)
.setThreshold(0.5)
.fit(wVectors) //use word vectors created
//Predict new data, same format as training data containing words
val predictionData = spark.read.parquet(<path to prediction file>)
val pDataDf = predictionData
.map { r =>
val s = r.getAs[String]("value").split(" ")
val label = s.head.toDouble
val feature = s.tail
(label, feature)
}.toDF("lable","feature_words")
val pVectors = model.transform(pDataDf)
val predictionlResult = pVectors.map{ r =>
val s = r.getAs[Seq[String]]("feature_words")
val v = r.getAs[Vector]("feature_vectors")
val c = svmAlgorithm.predict(v) // predict using trained SVM
s"$c ${s.mkString(" ")}"
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With