Background
Here's my situation: I'm trying to create a class that filters an RDD based on some feature of the contents, but that feature can be different in different scenarios so I'd like to parameterize that with a function. Unfortunately, I seem to be running into issues with the way Scala captures its closures. Even though my function is serializable, the class is not.
From the example in the spark source on closure cleaning, it seems to suggest my situation can't be solved, but I'm convinced there's a way to achieve what I'm trying to do by creating the right (smaller) closure.
My Code
class MyFilter(getFeature: Element => String, other: NonSerializable) {
def filter(rdd: RDD[Element]): RDD[Element] = {
// All my complicated logic I want to share
rdd.filter { elem => getFeature(elem) == "myTargetString" }
}
Simplified Example
class Foo(f: Int => Double, rdd: RDD[Int]) {
def go(data: RDD[Int]) = data.map(f)
}
val works = new Foo(_.toDouble, otherRdd)
works.go(myRdd).collect() // works
val myMap = Map(1 -> 10d)
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x, 0)
val doesntWork = new Foo(complicatedButSerializableFunc, otherRdd)
doesntWork.go(myRdd).collect() // craps out
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: $iwC$$iwC$Foo
Serialization stack:
- object not serializable (class: $iwC$$iwC$Foo, value: $iwC$$iwC$Foo@61e33118)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: foo, type: class $iwC$$iwC$Foo)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@47d6a31a)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
// Even though
val out = new ObjectOutputStream(new FileOutputStream("test.obj"))
out.writeObject(complicatedButSerializableFunc) // works
Questions
Foo
but the second one does?Foo
in my closure?Found the answer with the help of this article.
Essentially, when creating the closure for a given function, Scala will include the entire object for any complex field referenced (if someone has a good explanation for why this doesn't happen in the first simple example, I'll accept that answer). The solution is to pass the serializable value to a different function so that only the minimal reference is kept, very similar to the ol' javascript for-loop paradigm for event listeners.
Example
def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)
class Foo(f: Int => Double, somethingNonserializable: RDD[String]) {
def go(data: RDD[Int]) = enclose(f) { actualFunction => data.map(actualFunction) }
}
Or with JS-style self-executing anonymous function
def go(data: RDD[Int]) = ((actualFunction: Int => Double) => data.map(actualFunction))(f)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With