Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NullPointerException in Spark RDD map when submitted as a spark job

We're trying to submit a spark job (spark 2.0, hadoop 2.7.2) but for some reason we're receiving a rather cryptic NPE in EMR. Everything runs just fine as a scala program so we're not really sure what's causing the issue. Here's the stack trace:

18:02:55,271 ERROR Utils:91 - Aborting task java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

As far as we can tell this is occurring in the following method:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.map(row =>
      "text|label"
  ).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}

We've narrowed it down to the map function as this works when submitted as a spark job:

def process(dataFrame: DataFrame, S3bucket: String) = {
  dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}

Does anyone have any idea what might be causing this issue? Also, how can we resolve it? We're pretty stumped.

like image 407
cscan Avatar asked Feb 06 '23 08:02

cscan


1 Answers

I think that you get a NullPointerException thrown by the worker when it tries to access a SparkContext object that's only present on the driver and not the workers.

coalesce() repartitions your data. When you request one partition only, it will try to squeeze all the data in one partition*. That may put much pressure on the memory footpring of your application.

In general, it is a good idea not to shrink your partitions in only 1.

For more, read this: Spark NullPointerException with saveAsTextFile and this.


  • In case you are not sure what a partition is, I explained it to myself in memoryOverhead issue in Spark.
like image 143
gsamaras Avatar answered Feb 09 '23 01:02

gsamaras