Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What operations contribute to Spark Task Deserialization time?

Tags:

apache-spark

I have some jobs where tasks are dominated by task deserialization time. The tasks themselves complete in about 10 seconds after 3 minutes of task deserialization.

What are the exact boundaries of this metric? What resource limitations most often contribute to long deserialization times?

like image 688
Larsenal Avatar asked Feb 23 '16 20:02

Larsenal


People also ask

What is task Deserialization time in Spark?

Summary metrics for all task are represented in a table and in a timeline. Tasks deserialization time. Duration of tasks. GC time is the total JVM garbage collection time. Result serialization time is the time spent serializing the task result on an executor before sending it back to the driver.

What causes scheduler delay in Spark?

Scheduler Delay Spark relies on data locality and tries to execute tasks as close to the data as possible to minimize data transfer. Task location can either be a host or a pair of a host and an executor. If an available executor does not satisfy its data locality, it keeps waiting until a timeout is reached.

How Spark decides number of tasks?

Number of tasks execution in parallelNumber of CPU cores available for an executor determines the number of tasks that can be executed in parallel for an application for any given time.


1 Answers

A quick buzz into the source code on master (https://github.com/kayousterhout/spark-1/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L179)

It's essentially this:

    val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
    updateDependencies(taskFiles, taskJars)
    task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

    // If this task has been killed before we deserialized it, let's quit now. Otherwise,
    // continue executing the task.
    if (killed) {
      // Throw an exception rather than returning, because returning within a try{} block
      // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
      // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
      // for the task.
      throw new TaskKilledException
    }

    attemptedTask = Some(task)
    logDebug("Task " + taskId + "'s epoch is " + task.epoch)
    env.mapOutputTracker.updateEpoch(task.epoch)

From this line (taskFiles, taskJars, taskBytes) I suspect that each task is deserializing the JARs; in my case I have a 136 MB fat JAR that isn't helping.

like image 95
Larsenal Avatar answered Sep 23 '22 17:09

Larsenal