Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

dataframe filter gives NullPointerException

In Spark 1.6.0 I have a data frame with a column that holds a job description, like:

Description
bartender
bartender
employee
taxi-driver
...

I retrieve a list of unique values from that column with:

val jobs = people.select("Description").distinct().rdd.map(r => r(0).asInstanceOf[String]).repartition(4)

I then try, for each job description, to retrieve people with that job and do something, but I get a NullPointerException:

jobs.foreach { 
  ajob => 
   var peoplewithjob = people.filter($"Description" === ajob)
   // ... do stuff
}

I don't understand why this happens, because every job has been extracted from the people data frame, so there should be at least one with that job... any hint more that welcome! Here's the stack trace:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 206, localhost): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
at jago.Run$$anonfun$main$1.apply(Run.scala:89)
at jago.Run$$anonfun$main$1.apply(Run.scala:82)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
like image 332
user299791 Avatar asked Feb 21 '16 17:02

user299791


1 Answers

It happens because Spark doesn't support nested actions or transformations. If you want to operate on distinct values extracted from the DataFrame you have to fetch the results to the driver and iterate locally:

// or toLocalIterator
jobs.collect.foreach { 
  ajob => 
    var peoplewithjob = people.filter($"Description" === ajob)
}

Depending on what kind of transformations you apply as "do stuff" it can be a better idea to simply grouBy and aggregate:

people.groupBy($"Description").agg(...)    
like image 153
zero323 Avatar answered Oct 06 '22 16:10

zero323