The Spark documentation states that all you have to do is register your class and add two variables to the conf:
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MyClass1])
kryo.register(classOf[MyClass2])
}
}
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "mypackage.MyRegistrator")
val sc = new SparkContext(conf)
I have implemented this in my code, yet I still get serialization errors when trying to sort a key/value sequence file of (Text, Text). My version of the MyRegistrator looks like this:
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Text])
}
}
I have also added logging in MyRegistrator and I see no log statements. I have also purposely misspelled the name of MyRegistrator and the job does not error out. There has to be more to this than what the documentation lets on. Is there anything else I need to do?
I am using Apache Spark 1.0.2.
Thanks
I was able to figure out how to fix this issue. I upgraded the version of Apache Spark to 1.1.0 and it started working. I did not change any code at all, the only thing that I changed was my POM. To prove that it worked, I commented out all references to Kryo in my code and reran. It failed with a serialization error.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With