I want to separately use TF-IDF features on the title
and description
fields, respectively and then combine those features in the VectorAssembler
so that the final classifier can operate on those features.
It works fine if I use a single serial flow that is simply
titleTokenizer -> titleHashingTF -> VectorAssembler
But I need both like so:
titleTokenizer -> titleHashingTF
-> VectorAssembler
descriptionTokenizer -> descriptionHashingTF
Code here:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer, StringIndexer, VectorAssembler}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.log4j.{Level, Logger}
object SimplePipeline {
def main(args: Array[String]) {
// setup boilerplate
val conf = new SparkConf()
.setAppName("Pipeline example")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder()
.appName("Session for SimplePipeline")
.getOrCreate()
val all_df = spark.read.json("file:///Users/me/data.json")
val numLabels = all_df.count()
// split into training and testing
val Array(training, testing) = all_df.randomSplit(Array(0.75, 0.25))
val nTraining = training.count();
val nTesting = testing.count();
println(s"Loaded $nTraining training labels...");
println(s"Loaded $nTesting testing labels...");
// convert string labels to integers
val indexer = new StringIndexer()
.setInputCol("rating")
.setOutputCol("label")
// tokenize our string inputs
val titleTokenizer = new Tokenizer()
.setInputCol("title")
.setOutputCol("title_words")
val descriptionTokenizer = new Tokenizer()
.setInputCol("description")
.setOutputCol("description_words")
// count term frequencies
val titleHashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(titleTokenizer.getOutputCol)
.setOutputCol("title_tfs")
val descriptionHashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(descriptionTokenizer.getOutputCol)
.setOutputCol("description_tfs")
// combine features together
val assembler = new VectorAssembler()
.setInputCols(Array(titleHashingTF.getOutputCol, descriptionHashingTF.getOutputCol))
.setOutputCol("features")
// set params for our model
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
// pipeline that combines all stages
val stages = Array(indexer, titleTokenizer, titleHashingTF, descriptionTokenizer, descriptionHashingTF, assembler, lr);
val pipeline = new Pipeline().setStages(stages);
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Make predictions.
val predictions = model.transform(testing)
// Select example rows to display.
predictions.select("label", "rawPrediction", "prediction").show()
sc.stop()
}
}
and my data file is simply a line-break separated file of JSON objects:
{"title" : "xxxxxx", "description" : "yyyyy" .... }
{"title" : "zzzzzz", "description" : "zxzxzx" .... }
The error I get is very long a difficult to understand, but the important part (I think) is a java.lang.NullPointerException
:
ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 12)
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
... 23 more
How should I be properly crafting my Pipeline to do this?
(Also I'm completely new to Scala)
The problem here is that you don't validate the data and some of the values are NULL
. It is pretty easy to reproduce this:
val df = Seq((1, Some("abcd bcde cdef")), (2, None)).toDF("id", "description")
val tokenizer = new Tokenizer().setInputCol("description")
tokenizer.transform(df).foreach(_ => ())
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
...
Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
...
You can for example drop:
tokenizer.transform(df.na.drop(Array("description")))
or replace these with empty strings:
tokenizer.transform(df.na.fill(Map("description" -> "")))
whichever makes more sense in your application.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With