I want to process a large text file "mydata.txt" (the size of actual file is about 30GB) with Spark. It's record delimiter is "\ |" followed by "\n". Because the default record separator of loading file (by "sc.textFile") is "\n", I set "textinputformat.record.delimiter" property of org.apache.hadoop.conf.Configuration to "\ |\n" to specify the record delimiter:
AAAAA_|BBBBB_|
CCCCC\
DDDDD
EEEEE_FFFFFFFFFFFF\ |
GGGGG_|HHHHH_|
IIIII\
GGGGG\
KKKKK_|LLLLLLLLLLL\ |
MMMM_|NNNNN_|OOOOO\ |
Next I executed the following code in spark-shell:
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val LINE_DELIMITER = "\\ |\n"
val FIELD_SEP = "_\\|"
val conf = new Configuration
conf.set("textinputformat.record.delimiter", LINE_DELIMITER)
val raw_data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
so far so good. however,
scala> val data = raw_data.filter(x => x.split(FIELD_SEP).size >= 3)
data: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:22
scala> data.collect
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
scala> data.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
    ...
Why I can't manipulate RDD "data", while everything is fine when using sc.textFile("mydata.txt")?
And how to fix it?
You are getting this exception because you are closing over org.apache.hadoop.conf.Configuration but it is not serializable 
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
You can do two things: 
1. Register Configuration with a Kyro serializer OR
2. Just mark your conf variable as transient which basically tells Spark not to ship it with the closure. 
scala> @transient val conf = new Configuration
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
scala> val raw_data = sc.newAPIHadoopFile("../test.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
14/11/28 00:54:03 INFO MemoryStore: ensureFreeSpace(32937) called with curMem=70594, maxMem=278302556
14/11/28 00:54:03 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 32.2 KB, free 265.3 MB)
raw_data: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at map at <console>:18
scala> val data = raw_data.filter{x => x.split(FIELD_SEP).size >= 3}
data: org.apache.spark.rdd.RDD[String] = FilteredRDD[6] at filter at <console>:22
scala> data.count
14/11/28 00:54:16 INFO FileInputFormat: Total input paths to process : 1
14/11/28 00:54:16 INFO SparkContext: Starting job: count at <console>:25
14/11/28 00:54:16 INFO DAGScheduler: Got job 2 (count at <console>:25) with 1 output partitions (allowLocal=false)
14/11/28 00:54:16 INFO DAGScheduler: Final stage: Stage 2(count at <console>:25)
14/11/28 00:54:16 INFO DAGScheduler: Parents of final stage: List()
14/11/28 00:54:16 INFO DAGScheduler: Missing parents: List()
14/11/28 00:54:16 INFO DAGScheduler: Submitting Stage 2 (FilteredRDD[6] at filter at <console>:22), which has no missing parents
14/11/28 00:54:16 INFO MemoryStore: ensureFreeSpace(4488) called with curMem=103531, maxMem=278302556
14/11/28 00:54:16 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 4.4 KB, free 265.3 MB)
14/11/28 00:54:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (FilteredRDD[6] at filter at <console>:22)
14/11/28 00:54:16 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
14/11/28 00:54:16 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1223 bytes)
14/11/28 00:54:16 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
14/11/28 00:54:16 INFO NewHadoopRDD: Input split: file:/Users/ssimanta/spark/test.txt:0+123
14/11/28 00:54:16 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1731 bytes result sent to driver
14/11/28 00:54:16 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 19 ms on localhost (1/1)
14/11/28 00:54:16 INFO DAGScheduler: Stage 2 (count at <console>:25) finished in 0.019 s
14/11/28 00:54:16 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
14/11/28 00:54:16 INFO DAGScheduler: Job 2 finished: count at <console>:25, took 0.025300 s
res5: Long = 1
scala> data.collect
14/11/28 00:55:16 INFO SparkContext: Starting job: collect at <console>:25
14/11/28 00:55:16 INFO DAGScheduler: Got job 3 (collect at <console>:25) with 1 output partitions (allowLocal=false)
14/11/28 00:55:16 INFO DAGScheduler: Final stage: Stage 3(collect at <console>:25)
14/11/28 00:55:16 INFO DAGScheduler: Parents of final stage: List()
14/11/28 00:55:16 INFO DAGScheduler: Missing parents: List()
14/11/28 00:55:16 INFO DAGScheduler: Submitting Stage 3 (FilteredRDD[6] at filter at <console>:22), which has no missing parents
14/11/28 00:55:16 INFO MemoryStore: ensureFreeSpace(4504) called with curMem=108019, maxMem=278302556
14/11/28 00:55:16 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 4.4 KB, free 265.3 MB)
14/11/28 00:55:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 3 (FilteredRDD[6] at filter at <console>:22)
14/11/28 00:55:16 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
14/11/28 00:55:16 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1223 bytes)
14/11/28 00:55:16 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
14/11/28 00:55:16 INFO NewHadoopRDD: Input split: file:/Users/ssimanta/spark/test.txt:0+123
14/11/28 00:55:16 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1717 bytes result sent to driver
14/11/28 00:55:16 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 16 ms on localhost (1/1)
14/11/28 00:55:16 INFO DAGScheduler: Stage 3 (collect at <console>:25) finished in 0.017 s
14/11/28 00:55:16 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
14/11/28 00:55:16 INFO DAGScheduler: Job 3 finished: collect at <console>:25, took 0.021439 s
res6: Array[String] = Array(MMMM_|NNNNN_|OOOOO\ |)
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With