Before sc.runJob
invokes dagScheduler.runJob
, the func performed on the rdd is “cleaned” by ClosureCleaner.clean
.
Why spark has to do this? What’s the purpose?
Ankur Dave, a fellow Spark Committer, wrote a good explanation of ClosureCleaner on Quora, reproduced below:
When Scala constructs a closure, it determines which outer variables the closure will use and stores references to them in the closure object. This allows the closure to work properly even when it's called from a different scope than it was created in.
Scala sometimes errs on the side of capturing too many outer variables (see SI-1419). That's harmless in most cases, because the extra captured variables simply don't get used (though this prevents them from getting GC'd). But it poses a problem for Spark, which has to send closures across the network so they can be run on slaves. When a closure contains unnecessary references, it wastes network bandwidth. More importantly, some of the references may point to non-serializable objects, and Spark will fail to serialize the closure.
To work around this bug in Scala, the ClosureCleaner traverses the object at runtime and prunes the unnecessary references. Since it does this at runtime, it can be more accurate than the Scala compiler can. Spark can then safely serialize the cleaned closure.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With