Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kryo in Apache Spark

The Spark documentation states that all you have to do is register your class and add two variables to the conf:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MyClass1])
    kryo.register(classOf[MyClass2])
  }
}

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "mypackage.MyRegistrator")
val sc = new SparkContext(conf)

I have implemented this in my code, yet I still get serialization errors when trying to sort a key/value sequence file of (Text, Text). My version of the MyRegistrator looks like this:

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[Text])
  }
}

I have also added logging in MyRegistrator and I see no log statements. I have also purposely misspelled the name of MyRegistrator and the job does not error out. There has to be more to this than what the documentation lets on. Is there anything else I need to do?

I am using Apache Spark 1.0.2.

Thanks

like image 627
Crackerman Avatar asked Feb 11 '26 15:02

Crackerman


1 Answers

I was able to figure out how to fix this issue. I upgraded the version of Apache Spark to 1.1.0 and it started working. I did not change any code at all, the only thing that I changed was my POM. To prove that it worked, I commented out all references to Kryo in my code and reran. It failed with a serialization error.

like image 167
Crackerman Avatar answered Feb 16 '26 11:02

Crackerman



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!