I've seen from this question that Spark nodes effectively "communicate directly", but I'm less concerned with the theory and more with the implementation. Here it shows, in the "###Encryption" section near the bottom of the page, that you can configure Spark to use a number of SSL protocols for security, which would suggest, to me at least, that it uses some form of HTTP(s) for communication. My question is effectively two parts: what protocol do Spark nodes use to communicate, and how is the data formatted for this transfer?
Shuffle service is a proxy through which Spark executors fetch the blocks. Thus, its lifecycle is independent on the lifecycle of executor. Apache Spark provide extendible framework to provide different implementation of Shuffle service. Shuffle service can run on a Spark worker node or even outside Spark Worker.
Spark uses a master/slave architecture. As you can see in the figure, it has one central coordinator (Driver) that communicates with many distributed workers (executors). The driver and each of the executors run in their own Java processes.
You start ExternalShuffleService using start-shuffle-service.sh shell script and enable its use by the driver and executors using spark. shuffle. service. enabled.
Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks for the executors to run.
Spark uses RPC (Netty) to communicate between the executor processes. You can look into the NettyRpcEndpointRef
class to see that actual implementation.
For shuffling data, we start from the BlockManager
which is responsible for providing data blocks. There is one per executor process. Internally a BlockStoreShuffleReader
which manages the reads from different executors using a SerializerManager
. This manager holds an actual serializer, which is defined by the spark.serializer
property:
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
When the BlockManager
attempts to read a block, it uses the serializer from that underlying configuration. It can be either a KryoSerializer
or a JavaSerializer
, depending on your setting.
Bottom line, for reading and writing shuffled data Spark uses the user defined serializer.
For task serialization, this is a little different.
Spark uses a variable called closureSerializer
, which defaults to JavaSerializerInstance
, meaning Java serialization. You can see this inside the DAGScheduler.submitMissingTasks
method:
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
The actual object that gets serialized and sent to each executor is called TaskDescription
:
def encode(taskDescription: TaskDescription): ByteBuffer = {
val bytesOut = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(bytesOut)
dataOut.writeLong(taskDescription.taskId)
dataOut.writeInt(taskDescription.attemptNumber)
dataOut.writeUTF(taskDescription.executorId)
dataOut.writeUTF(taskDescription.name)
dataOut.writeInt(taskDescription.index)
// Write files.
serializeStringLongMap(taskDescription.addedFiles, dataOut)
// Write jars.
serializeStringLongMap(taskDescription.addedJars, dataOut)
// Write properties.
dataOut.writeInt(taskDescription.properties.size())
taskDescription.properties.asScala.foreach { case (key, value) =>
dataOut.writeUTF(key)
// SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
val bytes = value.getBytes(StandardCharsets.UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
// Write the task. The task is already serialized, so write it directly to the byte buffer.
Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)
dataOut.close()
bytesOut.close()
bytesOut.toByteBuffer
}
And gets sent over RPC from the CoarseGrainedSchedulerBackend.launchTasks
method:
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
What I've showed so far talks about launching tasks. For shuffling data, Spark holds a BlockStoreShuffleReader
which manages the reads from different executors.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With