Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - Serializing an object with a non-serializable member

I am going to ask this question in the context of Spark, because that's what I'm facing, but this might be a plain Java issue.

In our spark job, we have a Resolver which needs to be used in all of our workers (it's used in a udf). The problem is that it's not serializable and we cannot change it to be so. The solution was to put it as a member of another class which is serializable.

So we ended up with:

public class Analyzer implements Serializable {
    transient Resolver resolver;

    public Analyzer() {
        System.out.println("Initializing a Resolver...");
        resolver = new Resolver();
    }

    public int resolve(String key) {
         return resolver.find(key);
    }
}

We then broadcast this class using the Spark API:

 val analyzer = sparkContext.broadcast(new Analyzer())

(more information about Spark broadcast can be found here)

We then proceed to use analyzer in a UDF, as part of our spark code, with something like:

val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()

This all works as expected, but leaves we wondering.

Resolver does not implement Serializable and is, therefore, marked as transient - meaning it does not get serialized along with it's owner object Analyzer.

But as you can see clearly from the code above, the resolve() method uses resolver, so it must not be null. And indeed it isn't. The code works.

So if the field is not passed through serialization, how is the resolver member instantiated?

My initial thought was that maybe the Analyzer constructor is called on the receiving side (i.e. the spark worker), but then I would expect to see the line "Initializing a Resolver..." printed several times. But it's only printed once, which is probably an indication to the fact that it's only called once, right before it's passed to the broadcast API. So why isn't resolver null?

Am I missing something about JVM serialization or Spark serialization?

How does this code even work?

Spark runs on YARN, in cluster mode. spark.serializer is set to org.apache.spark.serializer.KryoSerializer.

like image 566
summerbulb Avatar asked Jan 21 '18 19:01

summerbulb


People also ask

What happens if object is not serialized?

What happens if you try to send non-serialized Object over network? When traversing a graph, an object may be encountered that does not support the Serializable interface. In this case the NotSerializableException will be thrown and will identify the class of the non-serializable object.

What makes an object not serializable?

A non-serializable value is a complex object, like a class instance or a function. It is not an array, a plain serializable object, nor a primitive (like strings, numbers, booleans, null, etc.). Otherwise, it would be included in the list of the items that JSON supports.

How do I prevent some data from getting serialized?

You can prevent member variables from being serialized by marking them with the NonSerialized attribute as follows. If possible, make an object that could contain security-sensitive data nonserializable. If the object must be serialized, apply the NonSerialized attribute to specific fields that store sensitive data.

What does serializing an object mean?

Serialization is the process of converting an object into a stream of bytes to store the object or transmit it to memory, a database, or a file. Its main purpose is to save the state of an object in order to be able to recreate it when needed. The reverse process is called deserialization.


1 Answers

So if the field is not passed through serialization, how is the resolver member instantiated?

It is instantiated via the constructor call (new Resolver), when invoking kryo.readObject:

kryo.readClassAndObject(input).asInstanceOf[T]

My initial thought was that maybe the Analyzer constructor is called on the receiving side (i.e. the spark worker), but then I would expect to see the line "Initializing a Resolver..." printed several times. But it's only printed once, which is probably an indication to the fact that it's only called once

That's not how a broadcast variable works. What happens is that when each Executor needs the broadcast variable in scope, it first checks if it has the object in memory in its BlockManager, if it doesn't, it asks either the driver or the neighbor executors (if there are multiple executors on the same Worker node) for their cached instance, and they serialize it and send it to him, and in turn he receives the instance and caches it inside his own BlockManager.

This is documented in the behavior of the TorrentBroadcast (which is the default broadcasting implementation):

* The driver divides the serialized object into small chunks and
* stores those chunks in the BlockManager of the driver.
*
* On each executor, the executor first attempts to fetch the object from its BlockManager. If
* it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
* other executors if available. Once it gets the chunks, it puts the chunks in its own
* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor).

if we remove the transient it fails, and the stack-trace leads to Kryo

That is because there is probably a field inside your Resolver class which even Kryo is unable to serialize, regardless of the Serializable attribute.

like image 172
Yuval Itzchakov Avatar answered Sep 26 '22 11:09

Yuval Itzchakov