Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Generic T as Spark Dataset[T] constructor

In the following snippet, the tryParquet function tries to load a Dataset from a Parquet file if it exists. If not, it computes, persists and returns back the Dataset plan which was provided:

import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset

sealed trait CustomRow

case class MyRow(
  id: Int,
  name: String
) extends CustomRow

val ds: Dataset[MyRow] =
  Seq((1, "foo"),
      (2, "bar"),
      (3, "baz")).toDF("id", "name").as[MyRow]

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
    Try(session.read.parquet(path)) match {
      case Success(df) => df.as[T] // <---- compile error here
      case Failure(_)  => {
        target.write.parquet(path)
        target
      }
    }

val readyDS: Dataset[MyRow] =
    tryParquet(spark, "/path/to/file.parq", ds)

However this produces a compile error on df.as[T]:

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._

Support for serializing other types will be added in future releases.

case Success(df) => df.as[T]

One can circumvent this problem by making tryParquet cast df to return an untyped DataFrame and let caller cast to the desired constructor. However is there any solution in the case we want the type to be managed internally by the function?

like image 349
Jivan Avatar asked Dec 14 '22 20:12

Jivan


1 Answers

Looks like it's possible by using an Encoder in the type parameter:

import org.apache.spark.sql.Encoder

def tryParquet[T <: CustomRow: Encoder](...)

This way the compiler can prove that df.as[T] is providing an Encoder when constructing the objects.

like image 89
Jivan Avatar answered Dec 21 '22 12:12

Jivan