Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create Spark Dataset or Dataframe from case classes that contains Enums

I have been trying to create Spark Dataset using case classes that contain Enums but I'm not able to. I'm using Spark version 1.6.0. The exceptions is complaining about that there are no encoder found for my Enum. Is this not possible in Spark, to have enums in the data?

Code:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object MyEnum extends Enumeration {
  type MyEnum = Value
  val Hello, World = Value
}

case class MyData(field: String, other: MyEnum.Value)

object EnumTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlCtx = new SQLContext(sc)

    import sqlCtx.implicits._

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS()

    println(s"df: ${df.collect().mkString(",")}}")
  }

}

Error:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.company.MyEnum.Value
- field (class: "scala.Enumeration.Value", name: "other")
- root class: "com.company.MyData"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
at com.company.EnumTest$.main(EnumTest.scala:22)
at com.company.EnumTest.main(EnumTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
like image 797
ErikHabanero Avatar asked Sep 23 '16 12:09

ErikHabanero


1 Answers

You can create your own encoder:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object MyEnum extends Enumeration {
  type MyEnum = Value
  val Hello, World = Value
}

case class MyData(field: String, other: MyEnum.Value)

object MyDataEncoders {
  implicit def myDataEncoder: org.apache.spark.sql.Encoder[MyData] =
    org.apache.spark.sql.Encoders.kryo[MyData]
}  

object EnumTest {
  import MyDataEncoders._

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlCtx = new SQLContext(sc)

    import sqlCtx.implicits._

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS()

    println(s"df: ${df.collect().mkString(",")}}")
  }
}
like image 111
Carlos Vilchez Avatar answered Nov 10 '22 09:11

Carlos Vilchez