I've written two case class
that extends Base abstract class
. I have two list of each class (listA
and listB
). When I want to merge these two list, I can't convert the final list to Apache Spark 1.6.1 Dataset.
abstract class Base
case class A(name: String) extends Base
case class B(age: Int) extends Base
val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB
val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()
Apache Spark will rise this Exception:
A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found
at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
When I want to create RDD from list
Spark doesn't throw any Exception, But when I convert RDD to Dataset with toDS()
method this prior exception will throw.
First, you can get a saner type for list
by making it a List[Base]
explicitly or by adding Base extends Product with Serializable
if the intention is for it only to be extended by case classes/objects. But this isn't enough, because
Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.
Note that abstract classes like Base
are not supported. And custom encoders aren't supported either. Though you could try using the kryo
(or javaSerialization
, as the last resort) encoder, see How to store custom objects in Dataset?.
Here is complete working example:
abstract class Base extends Serializable with Product
case class A(name: String) extends Base
case class B(age: Int) extends Base
object BaseEncoder {
implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base]
}
val listA: Seq[A] = Seq(A("a"), A("b"))
val listB: Seq[B] = Seq(B(1), B(2))
val list: Seq[Base] = listA ++ listB
val ds = sc.parallelize(list).toDS
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With