I need to write a job that reads a DataSet[Row] and converts it to a DataSet[CustomClass] where CustomClass is a protobuf class.
val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
case Row(f1: String, f2: Long ) => {
val pbufClass = CustomClass.newBuilder()
.setF1(f1)
.setF2(f2)
pbufClass.build()}}(protoEncoder)
However, looks like Protobuf classes are not really Java Beans and I do get a NPE on the following
val x = Encoders.bean(classOf[CustomClass])
How does one go about ensuring that the job can emit a dataset of type DataSet[CustomClass] where CustomClass is the protobuf class. Any pointers/examples on writing a custom encoder for the class?
NPE:
val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
... 48 elided
The Bean encoder internally uses
JavaTypeInference.serializerFor(protoClass)
If I try to do the same in my custom encoder, I get a more descriptive error message:
Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
My experience with Encoders are not very promising and at this point I would recommend not spending more time on this.
I'd rather think about alternatives and how to work with Spark its way and map the result of Spark computation to the protobuf-generated class at the very last step.
For converting Row to Protobuf class you can use sparksql-protobuf
This library provides utilities to work with Protobuf objects in SparkSQL. It provides a way to read parquet file written by SparkSQL back as an RDD of the compatible protobuf object. It can also convert RDD of protobuf objects into DataFrame.
add a dependency to your build.sbt
file
resolvers += Resolver.jcenterRepo
libraryDependencies ++= Seq(
"com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
"org.apache.parquet" % "parquet-protobuf" % "1.8.1"
)
You can follow some examples from the library to get started
Example 1
Example 2
I hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With