We have a Hadoop cluster on which we store data which is serialized to bytes using Kryo (a serialization framework). The Kryo version which we used to do this has been forked from the official release 2.21 to apply our own patches to issues we have experienced using Kryo. The current Kryo version 2.22 also fixes these issues, but with different solutions. As a result, we cannot just change the Kryo version we use, because this would mean that we would no longer be able to read the data which is already stored on our Hadoop cluster. To address this problem, we want to run a Hadoop job which
The problem is that it is not trivial to use two different versions of the same class in one Java program (more precisely, in a Hadoop job's mapper class).
How is it possible to deserialize and serialize an object with two different versions of the same serialization framework in one Hadoop job?
The first approach which has come to our minds was to rename the packages in our own Kryo branch using the relocation functionality of the Maven Shade plugin and release it with a different artifact ID so we could depend on both artifacts in our conversion job project. We would then instantiate one Kryo object of both the old and the new version and use the old one for deserialization and the new one for serializing the object again.
Problems
We don't use Kryo explicitly in Hadoop jobs, but rather access it through multiple layers of our own libraries. For each of these libraries, it would be necessary to
To make things even more messy, we also use Kryo serializers provided by other 3rd party libraries for which we would have to do the same thing.
The second approach we came up with was to not depend on Kryo at all in the Maven project which contains the conversion job, but load the required classes from a JAR for each version, which is stored in Hadoop's distributed cache. Serializing an object would then look something like this:
public byte[] serialize(Object foo, JarClassLoader cl) {
final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo");
Object k = kryoClass.getConstructor().newInstance();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output");
Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos);
Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class);
writeObject.invoke(k, output, foo);
outputClass.getMethod("close").invoke(output);
baos.close();
byte[] bytes = baos.toByteArray();
return bytes;
}
Problems
Though this approach might work to instantiate an unconfigured Kryo object and serialize / restore some object, we use a much more complex Kryo configuration. This includes several custom serializers, registered class ids et cetera. We were for example unable to figure out a way to set custom serializers for classes without getting a NoClassDefFoundError - the following code does not work:
Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo");
Object kryo = kryoClass.getConstructor().newInstance();
Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class);
addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError
The last line throws a
java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer
because the URISerializer
class references Kryo's Serializer
class and tries to load it using its own class loader (which is the System class loader), which does not know the Serializer
class.
Currently the most promising approach seems to be using an independant intermediate serialization, e.g. JSON using Gson or alike, and then running two separate jobs:
Problems
The biggest problem with this solution is the fact that it roughly doubles the space consumption of the data processed. Moreover, we need another serialization method which works without problems on all of our data, which we would need to investigate first.
1 Incompatible Changes. Incompatible changes to classes are those changes for which the guarantee of interoperability cannot be maintained. The incompatible changes that may occur while evolving a class are: Deleting fields - If a field is deleted in a class, the stream written will not contain its value.
Changing a class from Serializable to Externalizable or vice-versa is an incompatible change since the stream will contain data that is incompatible with the implementation of the available class.
If our class does not implement Serializable interface, or if it is having a reference to a non- Serializable class, then the JVM will throw NotSerializableException . All transient and static fields do not get serialized.
I would use the multiple classloaders approach.
(Package renaming will also work. It does seem ugly, but this is a one-off hack so beauty and correctness can take a back seat. Intermediate serialization seems risky - there was a reason you are using Kryo, and that reason will be negated by using a different intermediate form).
The overall design would be:
child classloaders: Old Kryo New Kryo <-- both with simple wrappers
\ /
\ /
\ /
\ /
|
default classloader: domain model; controller for the re-serialization
Load a Jar with the modified Kryo version and wrapper code. The wrapper has a static 'main' method with one argument: The name of the file to deserialize. Call the main method via reflection from the default classloader:
Class deserializer = deserializerClassLoader.loadClass("com.example.deserializer.Main");
Method mainIn = deserializer.getMethod("main", String.class);
Object graph = mainIn.invoke(null, "/path/to/input/file");
When the call returns, load a second Jar with the new serialization framework with a simple wrapper. The wrapper has a static 'main' method and an argument to pass the name of the file to serialize in. Call the main method via reflection from the default classloader:
Class serializer = deserializerClassLoader.loadClass("com.example.serializer.Main");
Method mainOut = deserializer.getMethod("main", Object.class, String.class);
mainOut.invoke(null, graph, "/path/to/output/file");
Considerations
In the code fragments, one classloader is created for each object serialization and deserialization. You probably want to load the classloaders only once, discover the main methods and loop over the files, something like:
for (String file: files) {
Object graph = mainIn.invoke(null, file + ".in");
mainOut.invoke(null, graph, file + ".out");
}
Do the domain objects have any reference to any Kryo class? If so, you have difficulties:
In either case, your first approach should be to examine these references and eliminate them. One approach to ensure that you have done this is to ensure the default classloader does not have access to any Kryo version. If the domain objects reference Kryo in any way, the reference will fail (with a ClassNotFoundError if the class is referenced directly or ClassNotFoundException if reflection is used).
For 2, you can create two jar files that contain the serializer and all the dependencies for the new and old versions of your serializer as shown here. Then create a map reduce job that loads each version of your code in a separate class loader, and add some glue code in the middle which deserializes with the old code, then serializes with the new code.
You will have to be careful that your domain object is loaded in the same class loader as your glue code, and the code to serialize/deserialize depends on the same class loader as your glue code so that they both see the same domain object class.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With