Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Serialization problems using Function implementations with Spark

I'm having troubles understanding Spark Function implementations in Java. The documentation gives three ways to use functions in map and reduce :

  1. via lambda
  2. via inline classes implementing Function and Function2
  3. via inner classes implementing Function and Function2

The trouble is that I can't manage to make 2. and 3. work. For instance, this code :

public int countInline(String path) {

    String master = "local";
    SparkConf conf = new SparkConf().setAppName("charCounterInLine")
            .setMaster(master);
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> lines = sc.textFile(path);

    JavaRDD<Integer> lineLengths = lines
            .map(new Function<String, Integer>() {
                public Integer call(String s) {
                    return s.length();
                }
            });
    return lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }); // the line causing the error 
}

gives me this mistake :

14/07/09 11:23:20 INFO DAGScheduler: Failed to run reduce at CharCounter.java:42
[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: Hadoop.Spark.basique.CharCounter
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Right now, I can avoid this problem by implementing Function and Function2 in public outer classes. However, it was more a lucky guess than a well-thought decision. Moreover, since I can't manage to make documentation examples work, I guess there are things I don't understand.

To conclude, my questions are :

  • how is it possible to make 2. and 3. work ?
  • why is only the lambda working ?
  • are there other way to use functions ?
like image 612
merours Avatar asked Jul 09 '14 09:07

merours


2 Answers

The relevant part of this stracktrace is:

Task not serializable: java.io.NotSerializableException: Hadoop.Spark.basique.CharCounter

When you defined your functions as inner classes, their enclosing object is being pulled into the function closure and serialized. If this class is non-serializable or contains a non-serializable field, then you'll run into this error.

You have a few options here:

  • Mark the non-serializable fields of the enclosing object as transient.
  • Define your functions as outer-classes.
  • Define your functions as static nested classes.
like image 104
Josh Rosen Avatar answered Sep 27 '22 17:09

Josh Rosen


Adding "implements Serializable" for enclosing class can solve the problem. It is serializing enclosing class because inner class is a member of that, but enclosing class is not serializable it seems.

like image 31
Pushpender Avatar answered Sep 27 '22 16:09

Pushpender