Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NullPointerException in org.apache.spark.ml.feature.Tokenizer

I want to separately use TF-IDF features on the title and description fields, respectively and then combine those features in the VectorAssembler so that the final classifier can operate on those features.

It works fine if I use a single serial flow that is simply

titleTokenizer -> titleHashingTF -> VectorAssembler

But I need both like so:

titleTokenizer       -> titleHashingTF 
                                                -> VectorAssembler
descriptionTokenizer -> descriptionHashingTF

Code here:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer, StringIndexer, VectorAssembler}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.log4j.{Level, Logger}


object SimplePipeline {
    def main(args: Array[String]) {

        // setup boilerplate
        val conf = new SparkConf()
            .setAppName("Pipeline example")
        val sc = new SparkContext(conf)
        val spark = SparkSession
            .builder()
            .appName("Session for SimplePipeline")
            .getOrCreate()

        val all_df = spark.read.json("file:///Users/me/data.json")
        val numLabels = all_df.count()

        // split into training and testing
        val Array(training, testing) = all_df.randomSplit(Array(0.75, 0.25))
        val nTraining = training.count();
        val nTesting = testing.count();

        println(s"Loaded $nTraining training labels...");
        println(s"Loaded $nTesting  testing labels...");

        // convert string labels to integers
        val indexer = new StringIndexer()
            .setInputCol("rating")
            .setOutputCol("label")

        // tokenize our string inputs
        val titleTokenizer = new Tokenizer()
            .setInputCol("title")
            .setOutputCol("title_words")
        val descriptionTokenizer = new Tokenizer()
            .setInputCol("description")
            .setOutputCol("description_words")

        // count term frequencies
        val titleHashingTF = new HashingTF()
            .setNumFeatures(1000)
            .setInputCol(titleTokenizer.getOutputCol)
            .setOutputCol("title_tfs")
        val descriptionHashingTF = new HashingTF()
            .setNumFeatures(1000)
            .setInputCol(descriptionTokenizer.getOutputCol)
            .setOutputCol("description_tfs")

        // combine features together
        val assembler = new VectorAssembler()
            .setInputCols(Array(titleHashingTF.getOutputCol, descriptionHashingTF.getOutputCol))
            .setOutputCol("features")

        // set params for our model
        val lr = new LogisticRegression()
            .setMaxIter(10)
            .setRegParam(0.01)

        // pipeline that combines all stages
        val stages = Array(indexer, titleTokenizer, titleHashingTF, descriptionTokenizer, descriptionHashingTF, assembler, lr);
        val pipeline = new Pipeline().setStages(stages);

        // Fit the pipeline to training documents.
        val model = pipeline.fit(training)

        // Make predictions.
        val predictions = model.transform(testing)

        // Select example rows to display.
        predictions.select("label", "rawPrediction", "prediction").show()

        sc.stop()
    }
}

and my data file is simply a line-break separated file of JSON objects:

{"title" : "xxxxxx", "description" : "yyyyy" .... }
{"title" : "zzzzzz", "description" : "zxzxzx" .... }

The error I get is very long a difficult to understand, but the important part (I think) is a java.lang.NullPointerException:

ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 12)
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array<string>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
    at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
    ... 23 more

How should I be properly crafting my Pipeline to do this?

(Also I'm completely new to Scala)

like image 753
lollercoaster Avatar asked Jan 15 '17 02:01

lollercoaster


1 Answers

The problem here is that you don't validate the data and some of the values are NULL. It is pretty easy to reproduce this:

val df = Seq((1, Some("abcd bcde cdef")), (2, None)).toDF("id", "description")

val tokenizer = new Tokenizer().setInputCol("description")
tokenizer.transform(df).foreach(_ => ())
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array<string>)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
...
Caused by: java.lang.NullPointerException
  at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
...

You can for example drop:

tokenizer.transform(df.na.drop(Array("description")))

or replace these with empty strings:

tokenizer.transform(df.na.fill(Map("description" -> "")))

whichever makes more sense in your application.

like image 68
zero323 Avatar answered Sep 18 '22 02:09

zero323