I am trying to convert a Spark RDD to a Spark SQL dataframe with toDF()
. I have used this function successfully many times, but in this case I'm getting a compiler error:
error: value toDF is not a member of org.apache.spark.rdd.RDD[com.example.protobuf.SensorData]
Here is my code below:
// SensorData is an auto-generated class
import com.example.protobuf.SensorData
def loadSensorDataToRdd : RDD[SensorData] = ???
object MyApplication {
def main(argv: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("My application")
conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val sensorDataRdd = loadSensorDataToRdd()
val sensorDataDf = sensorDataRdd.toDF() // <-- CAUSES COMPILER ERROR
}
}
I am guessing that the problem is with the SensorData class, which is a Java class that was auto-generated from a Protocol Buffer. What can I do in order to convert the RDD to a dataframe?
The reason for the compilation error is that there's no Encoder
in scope to convert a RDD
with com.example.protobuf.SensorData
to a Dataset
of com.example.protobuf.SensorData
.
Encoders
(ExpressionEncoders
to be exact) are used to convert InternalRow
objects into JVM objects according to the schema (usually a case class or a Java bean).
There's a hope you can create an Encoder
for the custom Java class using org.apache.spark.sql.Encoders
object's bean
method.
Creates an encoder for Java Bean of type T.
Something like the following:
import org.apache.spark.sql.Encoders
implicit val SensorDataEncoder = Encoders.bean(classOf[com.example.protobuf.SensorData])
If SensorData
uses unsupported types you'll have to map
the RDD[SensorData]
to an RDD
of some simpler type(s), e.g. a tuple of the fields, and only then expect toDF
work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With