I'm trying to run the following code in scala on the Spark framework, but I get an extremely large task size (8MB)
tidRDD:RDD[ItemSet]
mh:MineHelper
x:ItemSet
broadcast_tid:Broadcast[Array[ItemSet]]
count:Int
tidRDD.flatMap(x => mh.mineFreqSets(x, broadcast_tid.value, count)).collect()
The reason I added the MinerHelper class was to make it serialisable, and it only contains given method. An ItemSet is a class with 3 private members and a few getter/setter methods, nothing out of the ordinary. I feel that this is the correct way to approach this problem, but Spark thinks otherwise. Am I making some gaping errors, or is it something small that's wrong?
Here's the warning:
WARN TaskSetManager: Stage 1 contains a task of very large size (8301 KB). The maximum recommended task size is 100 KB.
You're probably closing over this, forcing the whole enclosing object to be serialized.
You probably have something like the following:
class Foo {
val outer = ???
def f(rdd: RDD[ItemSet]): RDD[ItemSet] = {
rdd.map(x => outer.g(x))
}
}
In this case, during the serialization of the task, Spark will need the instance of the enclosing Foo. Indeed, when you are referencing outer, you really mean this.outer.
A simple fix is to put your outer variables in local ones:
class Foo {
val outer = ???
def f(rdd: RDD[ItemSet]): RDD[ItemSet] = {
val _outer = outer // local variable
rdd.map(x => _outer.g(x)) // no reference to `this`
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With