Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is Spark performing worse when using Kryo serialization?

I enabled Kryo serialization for my Spark job, enabled the setting to require registration, and ensured all my types were registered.

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)

Wallclock-time performance of the job worsened by about 20% and the number of bytes shuffled increased by almost 400%.

This seems really surprising to me, given the Spark documentation's suggestion that Kryo should be better.

Kryo is significantly faster and more compact than Java serialization (often as much as 10x)

I manually invoked the serialize method on instances of Spark's org.apache.spark.serializer.KryoSerializer and org.apache.spark.serializer.JavaSerializer with an example of my data. The results were consistent with the suggestions in the Spark documentation: Kryo produced 98 bytes; Java produced 993 bytes. That really is a 10x improvement.

A possibly confounding factor is that the objects which are being serialized and shuffled implement the Avro GenericRecord interface. I tried registering the Avro schemas in the SparkConf, but that showed no improvement.

I tried making new classes to shuffle the data which were simple Scala case classes, not including any of the Avro machinery. It didn't improve the shuffle performance or number of bytes exchanged.

The Spark code ends up boiling down to following:

case class A(
    f1: Long,
    f2: Option[Long],
    f3: Int,
    f4: Int,
    f5: Option[String],
    f6: Option[Int],
    f7: Option[String],
    f8: Option[Int],
    f9: Option[Int],
    f10: Option[Int],
    f11: Option[Int],
    f12: String,
    f13: Option[Double],
    f14: Option[Int],
    f15: Option[Double],
    f16: Option[Double],
    f17: List[String],
    f18: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(f: Int) : AnyRef = ???
  def put(f: Int, value: Any) : Unit = ???
  def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
  val SCHEMA$: org.apache.avro.Schema = ???
}

case class B(
    f1: Long
    f2: Long
    f3: String
    f4: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(field$ : Int) : AnyRef = ???
  def getSchema() : org.apache.avro.Schema = B.SCHEMA$
  def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
  val SCHEMA$ : org.apache.avro.Schema = ???
}

def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
  val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
  joined.map { case (_, asAndBs) => asAndBs }
}

Do you have any idea what might be going on or how I could get the better performance that should be available from Kryo?

like image 523
Leif Wickland Avatar asked Jan 09 '17 17:01

Leif Wickland


People also ask

Why is KRYO serialization faster in spark?

Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you'll use in the program in advance for best performance.

What is KRYO serialization in spark?

KryoSerializer") . This setting configures the serializer used for not only shuffling data between worker nodes but also when serializing RDDs to disk.

Can we use KRYO serializer in PySpark?

Kryo won't make a major impact on PySpark because it just stores data as byte[] objects, which are fast to serialize even with Java. But it may be worth a try — you would just set the spark. serializer configuration and trying not to register any classe.

Why is KRYO serialized?

Kryo is a fast and efficient binary object graph serialization framework for Java. The goals of the project are high speed, low size, and an easy to use API. The project is useful any time objects need to be persisted, whether to a file, database, or over the network.


Video Answer


1 Answers

If your single record size is too small and having huge number of records might make your job slow.Try to increase the buffer size and see whether it makes any improvement.

Try the below one if not done already..

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  // Now it's 24 Mb of buffer by default instead of 0.064 Mb
  .set("spark.kryoserializer.buffer.mb","24") 

Ref:https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/

like image 139
Biju CD Avatar answered Oct 15 '22 22:10

Biju CD