Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Large task size for simplest program

I am trying to run the simplest program with Spark

import org.apache.spark.{SparkContext, SparkConf}

object LargeTaskTest {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val dat = (1 to 10000000).toList
    val data = sc.parallelize(dat).cache()
    for(i <- 1 to 100){
      println(data.reduce(_ + _))
    }
  }   
}

I get the following error message, after each iteration :

WARN TaskSetManager: Stage 0 contains a task of very large size (9767 KB). The maximum recommended task size is 100 KB.

Increasing the data size increases said task size. This suggests to me that the driver is shipping the "dat" object to all executors, but I can't for the life of me see why, as the only operation on my RDD is reduce, which basically has no closure. Any ideas ?

like image 297
Ulysse Mizrahi Avatar asked Sep 26 '22 03:09

Ulysse Mizrahi


1 Answers

Because you create the very large list locally first, the Spark parallelize method is trying to ship this list to the Spark workers as a single unit, as part of a task. Hence the warning message you receive. As an alternative, you could parallelize a much smaller list, then use flatMap to explode it into the larger list. this also has the benefit of creating the larger set of numbers in parallel. For example:

import org.apache.spark.{SparkContext, SparkConf}

object LargeTaskTest extends App {

  val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
  val sc = new SparkContext(conf)
  val dat = (0 to 99).toList
  val data = sc.parallelize(dat).cache().flatMap(i => (1 to 1000000).map(j => j * 100 + i))
  println(data.count()) //100000000
  println(data.reduce(_ + _))
  sc.stop()
}

EDIT:

Ultimately the local collection being parallelized has to be pushed to the executors. The parallelize method creates an instance of ParallelCollectionRDD:

def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L730

ParallelCollectionRDD creates a number of partitions equal to numSlices:

  override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L96

numSlices defaults to sc.defaultParallelism which on my machine is 4. So even when split, each partition contains a very large list which needs to be pushed to an executor.

SparkContext.parallelize contains the note @note Parallelize acts lazily and ParallelCollectionRDD contains the comment;

// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split // instead.

So it appears that the problem happens when you call reduce because this is the point that the partitions are sent to the executors, but the root cause is that you are calling parallelize on a very big list. Generating the large list within the executors is a better approach, IMHO.

like image 185
mattinbits Avatar answered Sep 29 '22 06:09

mattinbits