Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding Apache Spark RDD task serialization

I am trying to understand how task serialization works in Spark and am a bit confused by some mixed results I'm getting in a test I've written.

I have some test code (simplified for sake of post) that does the following over more than one node:

object TestJob {
  def run(): Unit = {
    val rdd = ...
    val helperObject = new Helper() // Helper does NOT impl Serializable and is a vanilla class
    rdd.map(element => {
      helperObject.transform(element)
    }).collect()
  }
}

When I execute run(), the job bombs out with a "task not serializable" exception as expected since helperObject is not serializable. HOWEVER, when I alter it a little, like this:

trait HelperComponent {
  val helperObject = new Helper()
}

object TestJob extends HelperComponent {
  def run(): Unit = {
    val rdd = ...
    rdd.map(element => {
      helperObject.transform(element)
    }).collect()
  }
}

The job executes successfully for some reason. Could someone help me to understand why this might be? What exactly gets serialized by Spark and sent to the workers in each case above?

I am using Spark version 2.1.1.

Thank you!

like image 908
simonl Avatar asked Oct 26 '17 22:10

simonl


People also ask

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.

What is serialization in Spark?

Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations. PySpark supports custom serializers for performance tuning.

What is KRYO serialization in Spark?

KryoSerializer") . This setting configures the serializer used for not only shuffling data between worker nodes but also when serializing RDDs to disk. The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application.

What happens when an action is executed in RDD?

Conclusion: RDD actions are operations that return non-RDD values, since RDD's are lazy they do not execute the transformation functions until we call actions. hence, all these functions trigger the transformations to execute and finally returns the value of the action functions to the driver program.


1 Answers

Could someone help me to understand why this might be?

In your first snippet, helperObject is a local variable declared inside run. As such, it will be closed over (lifted) by the function such that where ever this code executes, all information would be available, and because of that Sparks ClosureCleaner yells at you for trying to serialize it.

In your second snippet, the value is no longer a local variable in the method scope, it is part of the class instance (technically, this is an object declaration but it will be transformed into a JVM class after all).

This is meaningful in Spark for the reason that all worker nodes in the cluster contain the JARs needed to execute your code. Thus, instead of serializing TestObject in its entirety for rdd.map, when Spark spins up an Executor process in one of your workers, it will load TestObject locally via a ClassLoader, and create an instance of it, just like every other JVM class in a non distributed application.

To conclude, the reason you don't see this blowing up is because the class is no longer serialized due to the changes in the way you've declared the type instance.

like image 122
Yuval Itzchakov Avatar answered Nov 04 '22 03:11

Yuval Itzchakov