Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use Serializable lambda in Spark JavaRDD transformation

I am trying to understand the following code.

// File: LambdaTest.java

package test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

public class LambdaTest implements Ops {

  public static void main(String[] args) {
    new LambdaTest().job();
  }

  public void job() {
    SparkConf conf = new SparkConf()
      .setAppName(LambdaTest.class.getName())
      .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer>              lst  = Arrays.asList(1, 2, 3, 4, 5, 6);
    JavaRDD<Integer>           rdd  = jsc.parallelize(lst);
    Function<Integer, Integer> func1 = (Function<Integer, Integer> & Serializable) x -> x * x;
    Function<Integer, Integer> func2 = x -> x * x;

    System.out.println(func1.getClass());  //test.LambdaTest$$Lambda$8/390374517
    System.out.println(func2.getClass());  //test.LambdaTest$$Lambda$9/208350681

    this.doSomething(rdd, func1);  // works
    this.doSomething(rdd, func2);  // org.apache.spark.SparkException: Task not serializable
  }
}

// File: Ops.java

package test;

import org.apache.spark.api.java.JavaRDD;
import java.util.function.Function;    

public interface Ops {

  default void doSomething(JavaRDD<Integer> rdd, Function<Integer, Integer> func) {
    rdd.map(x -> x + func.apply(x))
       .collect()
       .forEach(System.out::println);
  }

}

The difference is func1 is casted with a Serializable bound, while func2 is not.

When looking at the run time class of the two functions, they are both anonymous class under LambdaTest class

They are both used in an RDD transformation in an interface, then the two functions and LambdaTest should be serializable.

As you see, LambdaTest does not implement Serializable interface. So I think the two func should not work. But surprisingly, func1 works.

The stack trace for func2 is the following:

Serialization stack:
    - object not serializable (class: test.LambdaTest$$Lambda$9/208350681, value: test.LambdaTest$$Lambda$9/208350681@61d84e08)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$1024e30a$1:(Ljava/util/function/Function;Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$10/1470295349, fr.leboncoin.etl.jobs.test.Ops$$Lambda$10/1470295349@4e1459ea)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    ... 19 more

It seems that if a function bound with Serializable, the object containing it need not to be serialized, which makes me confused.

Any explanation on this is highly appreciated.

------------------------------ Updates ------------------------------

I have tried to use abstract class instead of interface:

//File: AbstractTest.java

public class AbstractTest {

  public static void main(String[] args) {
    new AbstractTest().job();
  }

  public void job() {
    SparkConf conf = new SparkConf()
      .setAppName(AbstractTest.class.getName())
      .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer>    lst = Arrays.asList(1, 2, 3, 4, 5, 6);
    JavaRDD<Integer> rdd = jsc.parallelize(lst);

    Ops ops = new Ops() {

      @Override
      public Integer apply(Integer x) {
        return x + 1;
      }
    };

    System.out.println(ops.getClass()); // class fr.leboncoin.etl.jobs.test.AbstractTest$1
    ops.doSomething(rdd);
  }
}

// File: Ops.java

public abstract class Ops implements Serializable{

  public abstract Integer apply(Integer x);

  public void doSomething(JavaRDD<Integer> rdd) {
    rdd.map(x -> x + apply(x))
       .collect()
       .forEach(System.out::println);
  }
}

It does not work either, even if Ops class is compiled in separate files with AbstractTest class. The ops object's class name is class fr.leboncoin.etl.jobs.test.AbstractTest$1. According to the following stack track, it seem that it needs to serialize AbstractTestin order to serialize AbstractTest$1.

Serialization stack:
    - object not serializable (class: test.AbstractTest, value: test.AbstractTest@21ac5eb4)
    - field (class: test.AbstractTest$1, name: this$0, type: class test.AbstractTest)
    - object (class test.AbstractTest$1, test.AbstractTest$1@36fc05ff)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/208350681, fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/208350681@4acb2510)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    ... 19 more
like image 646
Hao Ren Avatar asked Aug 04 '15 11:08

Hao Ren


People also ask

How can lambda expression be serialized?

We can serialize a lambda expression if its target type and its captured arguments have serialized. However, like inner classes, the serialization of lambda expressions is strongly discouraged.

What is the use of serialization in spark?

Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations.

What are lambda functions in spark?

Lambda expressions, for simple functions that can be written as an expression. (Lambdas do not support multi-statement functions or statements that do not return a value.) Local def s inside the function calling into Spark, for longer code. Top-level functions in a module.

What is JavaRDD spark?

(If you're new to Spark, JavaRDD is a distributed collection of objects, in this case lines of text in a file. We can apply operations to these objects that will automatically be parallelized across a cluster.)


1 Answers

LambdaTest doesn't need to be Serializable as it's not being sent over the wire - there's no reason to do that.

On the other hand both func1 and func1 do have to be Serializable as Spark will be using them to perform computation (on the RDD and therefore this code will have to be sent over the wire to the worker nodes. Notice that even though you write it all in the same class, after compilation your lambdas will be put in separate files, thanks to that the whole class doesn't have to be sent over the wire -> the outer class doesn't need to be Serializable.

As for why fun1 works, when you do not use type casting the Java compiler will infer the type of a lambda expression for you. So in this case the code generated for fun2 will simply implement a Function (since that's the target variable's type). On the other hand if a type cannot be inferred from the context (like in your case, the compiler has no way of knowing that fun1 has to be Serializable since it's a feature required by Spark) you can use type casting as in your example to explicitly provide a type. In that case the code generated by the compiler will be implementing both the Function and Serializable interfaces and the compiler won't try to infer the type on it's own.

You can find it described in the state of lambda under 5. Contexts for target typing.

like image 76
Mateusz Dymczyk Avatar answered Sep 20 '22 18:09

Mateusz Dymczyk