Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kryo encoder v.s. RowEncoder in Spark Dataset

The purpose of the following examples is to understand the difference of the two encoders in Spark Dataset.

I can do this:

val df = Seq((1, "a"), (2, "d")).toDF("id", "value")

import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._

val myStructType = StructType(Seq(StructField("id", IntegerType), StructField("value", StringType)))
implicit val myRowEncoder = RowEncoder(myStructType)

val ds = df.map{case row => row}
ds.show

//+---+-----+
//| id|value|
//+---+-----+
//|  1|    a|
//|  2|    d|
//+---+-----+

I can also do this:

val df = Seq((1, "a"), (2, "d")).toDF("id", "value")

import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._

implicit val myKryoEncoder: Encoder[Row] = Encoders.kryo[Row] 

val ds = df.map{case row => row}
ds.show

//+--------------------+
//|               value|
//+--------------------+
//|[01 00 6F 72 67 2...|
//|[01 00 6F 72 67 2...|
//+--------------------+

The only difference of the code is: one is using Kryo encoder, another is using RowEncoder.

Question:

  • What is the difference using the two?
  • Why one is displaying encoded values, another is displaying human readable values?
  • When should we use which?
like image 958
jack Avatar asked Aug 31 '25 20:08

jack


2 Answers

Encoders.kryo simply creates an encoder that serializes objects of type T using Kryo

RowEncoder is an object in Scala with apply and other factory methods. RowEncoder can create ExpressionEncoder[Row] from a schema. Internally, apply creates a BoundReference for the Row type and returns a ExpressionEncoder[Row] for the input schema, a CreateNamedStruct serializer (using serializerFor internal method), a deserializer for the schema, and the Row type

RowEncoder knows about schema and uses it for serialization and deserialization.

Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.

Kryo is good for efficiently storaging large dataset and network intensive application.

for more information you can refer to these links:

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-RowEncoder.html https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoders.html https://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7ab https://stackoverflow.com/questions/58946987/what-are-the-pros-and-cons-of-java-serialization-vs-kryo-serialization#:~:text=Kryo%20is%20significantly%20faster%20and,in%20advance%20for%20best%20performance.

like image 190
Chandan Avatar answered Sep 05 '25 16:09

Chandan


According to Spark's documentation, SparkSQL does NOT use Kryo or Java serializations (standardly).

Kryo is for RDDs and not Dataframes or DataSets. Hence the question is a little off-beam afaik.

Does Kryo help in SparkSQL? This elaborates on custom objects, but...

UPDATED Answer after some free time

Your example was not really what I would call custom type. They are are just structs with primitives. No issue.

Kryo is a serializer, DS, DF's use Encoders for columnar advantage. Kryo is used internally by Spark for shuffling.

This user defined example case class Foo(name: String, position: Point) is one that we can do with DS or DF or via kryo. But what's the point with Tungsten and Catalyst working with "understanding the structure of the data"? and thus able to optimize. You also get a single binary value with kryo and I have found few examples of how to work successfully with it, e.g. JOIN.

KRYO Example

import org.apache.spark.sql.{Encoder, Encoders, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import spark.implicits._

case class Point(a: Int, b: Int)
case class Foo(name: String, position: Point)

implicit val PointEncoder: Encoder[Point] = Encoders.kryo[Point]
implicit val FooEncoder: Encoder[Foo] = Encoders.kryo[Foo]
 
val ds = Seq(new Foo("bar", new Point(0, 0))).toDS
ds.show()

returns:

+--------------------+
|               value|
+--------------------+
|[01 00 D2 02 6C 6...|
+--------------------+

Encoder for DS using case class Example

import org.apache.spark.sql.{Encoder, Encoders, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import spark.implicits._

case class Point(a: Int, b: Int)
case class Foo(name: String, position: Point)

val ds = Seq(new Foo("bar", new Point(0, 0))).toDS
ds.show()

returns:

+----+--------+
|name|position|
+----+--------+
| bar|  [0, 0]|
+----+--------+

This strikes me as the way to go with Spark, Tungsten, Catalyst.

Now, more complicated stuff is this when an Any is involved, but Any is not a good thing:

val data = Seq(
    ("sublime", Map(
      "good_song" -> "santeria",
      "bad_song" -> "doesn't exist")
    ),
    ("prince_royce", Map(
      "good_song" -> 4,
      "bad_song" -> "back it up")
    )
  )

val schema = List(
    ("name", StringType, true),
    ("songs", MapType(StringType, StringType, true), true)
  )

val rdd= spark.sparkContext.parallelize(data) 

rdd.collect

val df = spark.createDataFrame(rdd)
df.show()
df.printSchema()

returns:

Java.lang.UnsupportedOperationException: No Encoder found for Any.

Then this example is interesting that is a valid custom object use case Spark No Encoder found for java.io.Serializable in Map[String, java.io.Serializable]. But I would stay away from such.

Conclusions

  • Kryo vs Encoder vs Java Serialization in Spark? states that kryo is for RDD but that is for legacy; internally one can use it. Not 100% correct but actually to the point.

  • Spark: Dataset Serialization is also an informative link.

  • The stuff has evolved and the spirit is to not use kryo for DS, DF.

Hope this helps.

like image 39
thebluephantom Avatar answered Sep 05 '25 14:09

thebluephantom