Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a custom Encoder in Spark 2.X Datasets?

Spark Datasets move away from Row's to Encoder's for Pojo's/primitives. The Catalyst engine uses an ExpressionEncoder to convert columns in a SQL expression. However there do not appear to be other subclasses of Encoder available to use as a template for our own implementations.

Here is an example of code that is happy in Spark 1.X / DataFrames that does not compile in the new regime:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

We get a compiler error of

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

So then somehow/somewhere there should be a means to

  • Define/implement our custom Encoder
  • Apply it when performing a mapping on the DataFrame (which is now a Dataset of type Row)
  • Register the Encoder for use by other custom code

I am looking for code that successfully performs these steps.

like image 975
WestCoastProjects Avatar asked Jun 08 '16 15:06

WestCoastProjects


People also ask

How do I create an encoder Scala?

Scala. Encoders are generally created automatically through implicits from a SparkSession , or can be explicitly created by calling static methods on Encoders.

How do you create a Dataset in Spark?

There two ways to create Datasets: dynamically and by reading from a JSON file using SparkSession . First, for primitive types in examples or demos, you can create Datasets within a Scala or Python notebook or in your sample Spark application.

How do I create a schema for a DataFrame in Spark?

We can create a DataFrame programmatically using the following three steps. Create an RDD of Rows from an Original RDD. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.


2 Answers

As far as I am aware nothing really changed since 1.6 and the solutions described in How to store custom objects in Dataset? are the only available options. Nevertheless your current code should work just fine with default encoders for product types.

To get some insight why your code worked in 1.x and may not work in 2.0.0 you'll have to check the signatures. In 1.x DataFrame.map is a method which takes function Row => T and transforms RDD[Row] into RDD[T].

In 2.0.0 DataFrame.map takes a function of type Row => T as well, but transforms Dataset[Row] (a.k.a DataFrame) into Dataset[T] hence T requires an Encoder. If you want to get the "old" behavior you should use RDD explicitly:

df.rdd.map(row => ???)

For Dataset[Row] map see Encoder error while trying to map dataframe row to updated row

like image 162
zero323 Avatar answered Oct 05 '22 02:10

zero323


Did you import the implicit encoders?

import spark.implicits._

http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder

like image 35
eyal edelman Avatar answered Oct 05 '22 04:10

eyal edelman