Why does spark throw NotSerializableException org.apache.hadoop.io.NullWritable with sequence files? My code (very simple):
import org.apache.hadoop.io.{BytesWritable, NullWritable}
sc.sequenceFile[NullWritable, BytesWritable](in).repartition(1000).saveAsSequenceFile(out, None)
The exception
org.apache.spark.SparkException: Job aborted: Task 1.0:66 had a not serializable result: java.io.NotSerializableException: org.apache.hadoop.io.NullWritable
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
So it is possible to read non-serializable types into an RDD - i.e. have an RDD of something that is not serializable (which seems counter intuitive). But once you wish to perform an operation on that RDD that requires the objects to be serializable, like repartition it needs to be serializable. Moreover it turns out that those weird classes SomethingWritable, although invented for the sole perpose of serializing things are not actually serializable :(. So you must map these things to byte arrays and back again:
sc.sequenceFile[NullWritable, BytesWritable](in)
.map(_._2.copyBytes()).repartition(1000)
.map(a => (NullWritable.get(), new BytesWritable(a)))
.saveAsSequenceFile(out, None)
Also see: https://stackoverflow.com/a/22594142/1586965
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With