Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do Spark Nodes communicate during a Shuffle?

Tags:

apache-spark

I've seen from this question that Spark nodes effectively "communicate directly", but I'm less concerned with the theory and more with the implementation. Here it shows, in the "###Encryption" section near the bottom of the page, that you can configure Spark to use a number of SSL protocols for security, which would suggest, to me at least, that it uses some form of HTTP(s) for communication. My question is effectively two parts: what protocol do Spark nodes use to communicate, and how is the data formatted for this transfer?

like image 328
Chance Avatar asked Aug 16 '17 13:08

Chance


People also ask

What is shuffle service in Spark?

Shuffle service is a proxy through which Spark executors fetch the blocks. Thus, its lifecycle is independent on the lifecycle of executor. Apache Spark provide extendible framework to provide different implementation of Shuffle service. Shuffle service can run on a Spark worker node or even outside Spark Worker.

What mechanism does Spark communicate with driver and executor?

Spark uses a master/slave architecture. As you can see in the figure, it has one central coordinator (Driver) that communicates with many distributed workers (executors). The driver and each of the executors run in their own Java processes.

How do I start a Spark shuffle service?

You start ExternalShuffleService using start-shuffle-service.sh shell script and enable its use by the driver and executors using spark. shuffle. service. enabled.

How do Spark clusters work?

Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks for the executors to run.


1 Answers

Spark uses RPC (Netty) to communicate between the executor processes. You can look into the NettyRpcEndpointRef class to see that actual implementation.

For shuffling data, we start from the BlockManager which is responsible for providing data blocks. There is one per executor process. Internally a BlockStoreShuffleReader which manages the reads from different executors using a SerializerManager. This manager holds an actual serializer, which is defined by the spark.serializer property:

val serializer = instantiateClassFromConf[Serializer](
  "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

When the BlockManager attempts to read a block, it uses the serializer from that underlying configuration. It can be either a KryoSerializer or a JavaSerializer, depending on your setting.

Bottom line, for reading and writing shuffled data Spark uses the user defined serializer.


For task serialization, this is a little different.

Spark uses a variable called closureSerializer, which defaults to JavaSerializerInstance, meaning Java serialization. You can see this inside the DAGScheduler.submitMissingTasks method:

val taskBinaryBytes: Array[Byte] = stage match {
  case stage: ShuffleMapStage =>
    JavaUtils.bufferToArray(
      closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
  case stage: ResultStage =>
      JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

The actual object that gets serialized and sent to each executor is called TaskDescription:

def encode(taskDescription: TaskDescription): ByteBuffer = {
  val bytesOut = new ByteBufferOutputStream(4096)
  val dataOut = new DataOutputStream(bytesOut)

  dataOut.writeLong(taskDescription.taskId)
  dataOut.writeInt(taskDescription.attemptNumber)
  dataOut.writeUTF(taskDescription.executorId)
  dataOut.writeUTF(taskDescription.name)
  dataOut.writeInt(taskDescription.index)

  // Write files.
  serializeStringLongMap(taskDescription.addedFiles, dataOut)

  // Write jars.
  serializeStringLongMap(taskDescription.addedJars, dataOut)

  // Write properties.
  dataOut.writeInt(taskDescription.properties.size())
  taskDescription.properties.asScala.foreach { case (key, value) =>
    dataOut.writeUTF(key)
    // SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
    val bytes = value.getBytes(StandardCharsets.UTF_8)
    dataOut.writeInt(bytes.length)
    dataOut.write(bytes)
  }

  // Write the task. The task is already serialized, so write it directly to the byte buffer.
  Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)

  dataOut.close()
  bytesOut.close()
  bytesOut.toByteBuffer
}

And gets sent over RPC from the CoarseGrainedSchedulerBackend.launchTasks method:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

What I've showed so far talks about launching tasks. For shuffling data, Spark holds a BlockStoreShuffleReader which manages the reads from different executors.

like image 198
Yuval Itzchakov Avatar answered Sep 18 '22 03:09

Yuval Itzchakov