In the following snippet, the tryParquet
function tries to load a Dataset from a Parquet file if it exists. If not, it computes, persists and returns back the Dataset plan which was provided:
import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
sealed trait CustomRow
case class MyRow(
id: Int,
name: String
) extends CustomRow
val ds: Dataset[MyRow] =
Seq((1, "foo"),
(2, "bar"),
(3, "baz")).toDF("id", "name").as[MyRow]
def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
Try(session.read.parquet(path)) match {
case Success(df) => df.as[T] // <---- compile error here
case Failure(_) => {
target.write.parquet(path)
target
}
}
val readyDS: Dataset[MyRow] =
tryParquet(spark, "/path/to/file.parq", ds)
However this produces a compile error on df.as[T]
:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
case Success(df) => df.as[T]
One can circumvent this problem by making tryParquet
cast df
to return an untyped DataFrame
and let caller cast to the desired constructor. However is there any solution in the case we want the type to be managed internally by the function?
Looks like it's possible by using an Encoder
in the type parameter:
import org.apache.spark.sql.Encoder
def tryParquet[T <: CustomRow: Encoder](...)
This way the compiler can prove that df.as[T]
is providing an Encoder when constructing the objects.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With