Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark TaskNotSerializable when using anonymous function

Background

Here's my situation: I'm trying to create a class that filters an RDD based on some feature of the contents, but that feature can be different in different scenarios so I'd like to parameterize that with a function. Unfortunately, I seem to be running into issues with the way Scala captures its closures. Even though my function is serializable, the class is not.

From the example in the spark source on closure cleaning, it seems to suggest my situation can't be solved, but I'm convinced there's a way to achieve what I'm trying to do by creating the right (smaller) closure.

My Code

class MyFilter(getFeature: Element => String, other: NonSerializable) {
  def filter(rdd: RDD[Element]): RDD[Element] = {
    // All my complicated logic I want to share
    rdd.filter { elem => getFeature(elem) == "myTargetString" }     
}

Simplified Example

class Foo(f: Int => Double, rdd: RDD[Int]) { 
  def go(data: RDD[Int]) = data.map(f) 
}

val works = new Foo(_.toDouble, otherRdd)
works.go(myRdd).collect() // works

val myMap = Map(1 -> 10d)
val complicatedButSerializableFunc: Int => Double = x => myMap.getOrElse(x, 0)
val doesntWork = new Foo(complicatedButSerializableFunc, otherRdd)
doesntWork.go(myRdd).collect() // craps out

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: $iwC$$iwC$Foo
Serialization stack:
    - object not serializable (class: $iwC$$iwC$Foo, value: $iwC$$iwC$Foo@61e33118)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: foo, type: class $iwC$$iwC$Foo)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@47d6a31a)
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)

// Even though
val out = new ObjectOutputStream(new FileOutputStream("test.obj"))
out.writeObject(complicatedButSerializableFunc) // works

Questions

  1. Why does the first simplied example not attempt to serialize all of Foo but the second one does?
  2. How can I get the reference to my serializable function without including a reference to Foo in my closure?
like image 605
Patrick Avatar asked Feb 08 '23 21:02

Patrick


1 Answers

Found the answer with the help of this article.

Essentially, when creating the closure for a given function, Scala will include the entire object for any complex field referenced (if someone has a good explanation for why this doesn't happen in the first simple example, I'll accept that answer). The solution is to pass the serializable value to a different function so that only the minimal reference is kept, very similar to the ol' javascript for-loop paradigm for event listeners.

Example

def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)

class Foo(f: Int => Double, somethingNonserializable: RDD[String]) { 
 def go(data: RDD[Int]) = enclose(f) { actualFunction => data.map(actualFunction) } 
}

Or with JS-style self-executing anonymous function

def go(data: RDD[Int]) = ((actualFunction: Int => Double) => data.map(actualFunction))(f)
like image 94
Patrick Avatar answered Feb 15 '23 11:02

Patrick