I'm trying to create a Dataset from a RDD y
Pattern: y: RDD[(MyObj1, scala.Iterable[MyObj2])]
So I created explicitly encoder :
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
//Create Dataset
val z = spark.createDataset(y)(tuple2[MyObj1, Iterable[MyObj2]])
When I compile this code I don't have an Error but when I try to run it I get this Error :
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for scala.Iterable[org.bean.input.MyObj2]
- field (class: "scala.collection.Iterable", name: "_2")
- root class: "scala.Tuple2"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:619)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:607)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)
Some explanation for my objects (MyObj1 & MyObj2)
- MyObj1 :
case class MyObj1(
id:String,
type:String
)
- MyObj2 :
trait MyObj2 {
val o_state:Option[String]
val n_state:Option[String]
val ch_inf: MyObj1
val state_updated:MyObj3
}
Any Help please
Spark doesn't provide Encoder for Iterables, so unless you want to use Encoder.kryo or Encoder.java this won't work.
The closest subclass of Iterable for which Spark provides Encoders is Seq, so this is probably the one you should use here. Otherwise refer to How to store custom objects in Dataset?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With