I enabled Kryo serialization for my Spark job, enabled the setting to require registration, and ensured all my types were registered.
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
Wallclock-time performance of the job worsened by about 20% and the number of bytes shuffled increased by almost 400%.
This seems really surprising to me, given the Spark documentation's suggestion that Kryo should be better.
Kryo is significantly faster and more compact than Java serialization (often as much as 10x)
I manually invoked the serialize
method on instances of Spark's org.apache.spark.serializer.KryoSerializer
and org.apache.spark.serializer.JavaSerializer
with an example of my data. The results were consistent with the suggestions in the Spark documentation: Kryo produced 98 bytes; Java produced 993 bytes. That really is a 10x improvement.
A possibly confounding factor is that the objects which are being serialized and shuffled implement the Avro GenericRecord
interface. I tried registering the Avro schemas in the SparkConf
, but that showed no improvement.
I tried making new classes to shuffle the data which were simple Scala case class
es, not including any of the Avro machinery. It didn't improve the shuffle performance or number of bytes exchanged.
The Spark code ends up boiling down to following:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: Int) : AnyRef = ???
def put(f: Int, value: Any) : Unit = ???
def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
val SCHEMA$: org.apache.avro.Schema = ???
}
case class B(
f1: Long
f2: Long
f3: String
f4: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(field$ : Int) : AnyRef = ???
def getSchema() : org.apache.avro.Schema = B.SCHEMA$
def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
val SCHEMA$ : org.apache.avro.Schema = ???
}
def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
joined.map { case (_, asAndBs) => asAndBs }
}
Do you have any idea what might be going on or how I could get the better performance that should be available from Kryo?
Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you'll use in the program in advance for best performance.
KryoSerializer") . This setting configures the serializer used for not only shuffling data between worker nodes but also when serializing RDDs to disk.
Kryo won't make a major impact on PySpark because it just stores data as byte[] objects, which are fast to serialize even with Java. But it may be worth a try — you would just set the spark. serializer configuration and trying not to register any classe.
Kryo is a fast and efficient binary object graph serialization framework for Java. The goals of the project are high speed, low size, and an easy to use API. The project is useful any time objects need to be persisted, whether to a file, database, or over the network.
If your single record size is too small and having huge number of records might make your job slow.Try to increase the buffer size and see whether it makes any improvement.
Try the below one if not done already..
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Now it's 24 Mb of buffer by default instead of 0.064 Mb
.set("spark.kryoserializer.buffer.mb","24")
Ref:https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With