To begin with I'm using scala 2.10.4 and the example above is run in Spark 1.6 (though I doubt Spark has anything to do with this, it's just a serialization issue).
So here's my problem: assume I have a trait Base
that is implemented by say two classes B1
and B2
. Now I have a generic trait that is extended by a collection of classes, one of them being over subtypes of Base
e.g. (here I keep Spark's notion of RDD, but it could be something else actually as soon as it is serialized; Something is just a result no matter what actually):
trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something = ... }
...
Now I need an object that will take an RDD[T]
(assume no ambuiguity here, it's just a simplified version) an that returns Something
corresponding to the result of function corresponding with type T
. But it should also work for Array[T]
with a merging strategy. So far it looks like:
object Obj {
def compute[T: TypeTag](input: RDD[T]): Something = {
typeOf[T] match {
case t if t <:< typeOf[A] =>
val foo = new Foo[T]
foo.function(rdd)
case t if t <:< typeOf[Array[A]] =>
val foo = new Foo[A]
foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]])))
case t if t <:< typeOf[Base] =>
val foo = new Foo[T]
foo.function(rdd)
// here it gets ugly...
case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
val tt = getSubInfo[T](0)
val tpe = tt.tpe
val foo = new Foo[tpe.type]
foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]]))
}
}
// strategy to transform arrays of T into a T object when possible
private def mergeArray[T: TypeTag](a: Array[T]): T = ...
// extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though
private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ...
}
Unfortunatly, it seems to work fine on a local machine, but when it gets sent to Spark (serialized), I get a org.apache.spark.SparkException: Task not serializable
with:
Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
- object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types)
- field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)
I do have a workaround (quite obvious, enumerate possibilities), but for my curiosity, is there a way to fix this? And why aren't Symbol serializable whereas their equivalents in Manifests were?
Thanks for the help.
TypeTags are generally now serializable in scala but, oddly, not types directly (this is odd because typetags contain symbols which are not :-/).
This might do what you want
// implicit constructor TypeTag parameter is serialized.
abstract class TypeAware[T:TypeTag] extends Serializable {
def typ:Type = _typeCached
@transient
lazy val _typeCached:Type = typeOf[T]
}
trait Foo[T] extends Serializable {
def function(rdd: RDD[T]): Something {... impl here?...}
def typ:Type
}
class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{
def function(rdd: RDD[T]): Something {... impl here?...}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With