Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to map rows to protobuf-generated class?

I need to write a job that reads a DataSet[Row] and converts it to a DataSet[CustomClass] where CustomClass is a protobuf class.

val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()

However, looks like Protobuf classes are not really Java Beans and I do get a NPE on the following

val x =  Encoders.bean(classOf[CustomClass])

How does one go about ensuring that the job can emit a dataset of type DataSet[CustomClass] where CustomClass is the protobuf class. Any pointers/examples on writing a custom encoder for the class?


val encoder2 = Encoders.bean(classOf[CustomClass])
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
  ... 48 elided

The Bean encoder internally uses


If I try to do the same in my custom encoder, I get a more descriptive error message:

Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
        at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
        at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
        at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
        at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
like image 606
Apurva Avatar asked Jun 26 '17 21:06


2 Answers

My experience with Encoders are not very promising and at this point I would recommend not spending more time on this.

I'd rather think about alternatives and how to work with Spark its way and map the result of Spark computation to the protobuf-generated class at the very last step.

like image 78
Jacek Laskowski Avatar answered Oct 23 '22 03:10

Jacek Laskowski

For converting Row to Protobuf class you can use sparksql-protobuf

This library provides utilities to work with Protobuf objects in SparkSQL. It provides a way to read parquet file written by SparkSQL back as an RDD of the compatible protobuf object. It can also convert RDD of protobuf objects into DataFrame.

add a dependency to your build.sbt file

resolvers += Resolver.jcenterRepo

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1"


You can follow some examples from the library to get started

Example 1

Example 2

I hope this helps!

like image 21
koiralo Avatar answered Oct 23 '22 05:10
