How does Apache Spark send functions to other machines under the hood

I started playing with Pyspark to do some data processing. It was interesting to me that I could do something like

rdd.map(lambda x : (x['somekey'], 1)).reduceByKey(lambda x,y: x+y).count()

And it would send the logic in these functions over potentially numerous machines to execute in parallel.

Now, coming from a Java background, if I wanted to send an object containing some methods to another machine, that machine would need to know the class definition of the object im streaming over the network. Recently java had the idea of Functional Interfaces, which would create an implementation of that interface for me at compile time (ie. MyInterface impl = ()->System.out.println("Stuff");)

Where MyInterface would just have one method, 'doStuff()'

However, if I wanted to send such a function over the wire, the destination machine would need to know the implementation (impl itself) in order to call its 'doStuff()' method.

My question boils down to... How does Spark, written in Scala, actually send functionality to other machines? I have a couple hunches:

  1. The driver streams class definitions to other machines, and those machines dynamically load them with a class loader. Then the driver streams the objects and the machines know what they are, and can execute on them.
  2. Spark has a set of methods defined on all machines (core libraries) which are all that are needed for anything I could pass it. That is, my passed function is converted into one or more function calls on the core library. (Seems unlikely since the lambda can be just about anything, including instantiating other objects inside)


Edit: Spark is written in Scala, but I was interested in hearing how this might be approached in Java (Where a function can not exist unless its in a class, thus changing the class definition which needs updated on worker nodes).

Edit 2: This is the problem in java in case of confusion:

public class Playground
    private static interface DoesThings
        public void doThing();
    public void func() throws Exception {
        Socket s = new Socket("addr", 1234);
        ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
        oos.writeObject("Hello!"); // Works just fine, you're just sending a string
        oos.writeObject((DoesThings)()->System.out.println("Hey, im doing a thing!!")); // Sends the object, but error on other machine

        DoesThings dt = (DoesThings)()->System.out.println("Hey, im doing a thing!!");

The System.out,println(dt.getClass()) returns: "class JohnLibs.Playground$$Lambda$1/23237446"

Now, assume that the Interface definition wasn't in the same file, it was in a shared file both machines had. But this driver program, func(), essentially creates a new type of class which implements DoesThings. As you can see, the destination machine is not going to know what JohnLibs.Playground$$Lambda$1/23237446 is, even though it knows what DoesThings is. It all comes down to you cant pass a function without it being bound to a class. In python you could just send a String with the definition, and then execute that string (Since its interpreted). Perhaps thats what spark does, since it uses scala instead of java (If scala can have functions outside of classes)

2 Answers

Java bytecode, which is, of course, what both Java and Scala are compiled to, was created specifically to be platform independent. So, if you have a classfile you can move it to any other machine, regardless of "silicon" architecture, and provided it has a JVM of at least that verion, it will run. James Gosling and his team did this deliberately to allow code to move between machines right from the very start, and it was easy to demonstrate in Java 0.98 (the first version I played with).

When the JVM tries to load a class, it uses an instance of a ClassLoader. Classloaders encompass two things, the ability to fetch the binary of a bytecode file, and the ability to load the code (verify its integrity, convert it into an in-memory instance of java.lang.Class, and make it available to other code in the system). At Java 1, you mostly had to write your own classloader if you wanted to take control of how the byes were loaded, although there was a sun-specific AppletClassLoader, which was written to load classfiles from http, rather than from the file system.

A little later, at Java 1.2, the "how to fetch the bytes of the classfile" part was separated out in the URLClassloader. That could use any supported protocol to load classes. Indeed, the protocol support mechanism was and is extensible via pluggable protocol handlers. So, now you can load classes from anywhere without the risk of making mistakes in the harder part, which is how you verify and install the class.

Along with that, Java's RMI mechanism allows a serialized object (the class name, along with the "state" part of an object) to be wrapped in a MarshaledObject. This adds "where this class may be loaded from", represented as a URL. RMI automates the conversion of real objects in memory to MarshaledObjects and also shipping them around on the network. If a JVM receives a marshaled object for which it already has the class definition, it always uses that class definition (for security). If not, however, then provided a bunch of criteria are met (security, and just plain working correctly, criteria) then the classfile may be loaded from that remote server, allowing a JVM to load classes for which it has never seen the definitions. (Obviously, the code for such systems must typically be written against ubiquitous interfaces--if not, there's going to be a lot of reflection going on!)

Now, I don't know (indeed, I found your question trying to determine the same thing whether Spark uses RMI infrastructure (I do know that hadoop does not, because, seemingly because the authors wanted to create their own system--which is fun and educational of course--rather than use a flexible, configurable, extensively-tested, including security tested!- system.)

However, all that has to happen to make this work in general are the steps that I outlined for RMI, those requirements are essentially:

1) Objects can be serialized into some byte sequence format understood by all participants

2) When objects are sent across the wire the receiving end must have some way to obtain the classfile that defines them. This can be a) pre-installation, b) RMI's approach of "here's where to find this" or c) the sending system sends the jar. Any of these can work

3) Security should probably be maintained. In RMI, this requirement was rather "in your face", but I don't see it in Spark, so they either hid the configuration, or perhaps just fixed what it can do.

Anyway, that's not really an answer, since I described principles, with a specific example, but not the actual specific answer to your question. I'd still like to find that!

Toby Eggitt

When you submit a spark application to the cluster, your code is deployed to all worker nodes, so your class and function definitions exist on all nodes.

