Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

No Java class corresponding to Product with Serializable with Base found

I've written two case class that extends Base abstract class. I have two list of each class (listA and listB). When I want to merge these two list, I can't convert the final list to Apache Spark 1.6.1 Dataset.

abstract class Base

case class A(name: String) extends Base
case class B(age: Int) extends Base

val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()

Apache Spark will rise this Exception:

A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found
    at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)

When I want to create RDD from list Spark doesn't throw any Exception, But when I convert RDD to Dataset with toDS() method this prior exception will throw.

like image 366
Milad Khajavi Avatar asked Oct 18 '22 08:10

Milad Khajavi


1 Answers

First, you can get a saner type for list by making it a List[Base] explicitly or by adding Base extends Product with Serializable if the intention is for it only to be extended by case classes/objects. But this isn't enough, because

Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.

Note that abstract classes like Base are not supported. And custom encoders aren't supported either. Though you could try using the kryo (or javaSerialization, as the last resort) encoder, see How to store custom objects in Dataset?.

Here is complete working example:

abstract class Base extends Serializable with Product

case class A(name: String) extends Base

case class B(age: Int) extends Base

object BaseEncoder {
  implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base]
}


val listA: Seq[A] = Seq(A("a"), A("b"))
val listB: Seq[B] = Seq(B(1), B(2))
val list: Seq[Base] = listA ++ listB

val ds = sc.parallelize(list).toDS
like image 166
Alexey Romanov Avatar answered Oct 21 '22 04:10

Alexey Romanov