Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is rdd.map(identity).cache slow when rdd items are big?

I found out that when using .map( identity ).cache on a rdd, it become very slow if the items are big. While it is pretty much instantaneous otherwise.

Note: this is probably related to this question, but here I provide a very precise example (that can be executed directly in spark-shell):

// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
  val t = System.nanoTime
  val out = code
  println(s"time = ${(System.nanoTime - t)/1000000}ms")
  out
}

// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )

// create rdd
val n = 1000 // size of the rdd

val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching

// profiling
profile( rdd.count )                 // around 12 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!

I first expected that it was the time to create a new rdd (container). But if I use a rdd with same size but little content, there is only a tiny difference in execution time:

val rdd = parallelize(1 to n).cache
rdd.count

profile( rdd.count )                 // around 9 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 15 ms

So, it looks like caching is actually copying the data. I thought it might also lose time serializing it, but I checked that cache is used with default MEMORY_ONLY persistence:

rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true

=> So, is caching copying data, or is it something else?

This is really a major limitation for my application because I started with a design that use something similar to rdd = rdd.map(f: Item => Item).cache that can be used with many such functions f applied in arbitrary order (order that I cannot determine before hand).

I am using Spark 1.6.0

Edit

When I look at the spark ui -> stage tab -> the last stage (i.e. 4), all tasks have pretty much the same data with:

  • duration = 3s (it went down to 3s, but that's still 2.9 too much :-\ )
  • scheduler 10ms
  • task deserialization 20ms
  • gc 0.1s (all tasks have that, but why would gc be triggered???)
  • result serialization 0ms
  • getting result 0ms
  • peak exec mem 0.0B
  • input size 7.0MB/125
  • no errors
like image 853
Juh_ Avatar asked Dec 15 '22 05:12

Juh_


1 Answers

A jstack of the process running the org.apache.spark.executor.CoarseGrainedExecutorBackend during the slow caching reveals the following:

"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

The SizeEstimator makes sense as one of the main costs of caching something which is ostensibly already in memory, since proper size estimation for unknown objects can be fairly difficult; if you look in the visitSingleObject method, you can see it heavily relies on reflection, calling getClassInfo which accesses runtime type information; not only does the full object hierarchy get traversed, but each nested member gets checked against an IdentityHashMap to detect which references refer to the same concrete object instance, and thus the stack traces show lots of time in those IdentityHashMap operations.

In the case of your example objects, you basically have each item as a list of maps from wrapped integers to wrapped integers; presumably Scala's implementation of the inner map holds an array as well, which explains the visitSingleObject -> List.foreach -> visitSingleObject -> visitSingleObject call hierarchy. In any case, there are lots of inner objects to visit in this case, and the SizeEstimators set up a fresh IdentityHashMap for each object sampled.

In the case where you measure:

profile( rdd.cache.count )

this doesn't count as exercising the caching logic since the RDD has already been successfully cached, so Spark is smart enough not to re-run the caching logic. You can actually isolate out the exact cost of the caching logic independently of the extra "map(identity)" transformation by profiling your fresh RDD creation and caching directly; here's my Spark session continuing from your last few lines:

scala> profile( rdd.count )
time = 91ms
res1: Long = 1000

scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000

scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6564ms                                                                   
res4: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms                                                                  
res5: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms                                                                  
res6: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms                                                                  
res7: Long = 1000

So you can see, the slowness didn't come from the fact that you ran through a map transformation, per se, but rather in this case the ~6s appears to be the fundamental cost of calculating caching logic for 1000 objects when each object has something like ~1,000,000 to ~10,000,000 inner objects (depending on how the Map implementation is layed out; ex extra visitArray nesting in the top stack trace hints that the HashMap impl has nested arrays, which makes sense for a typical dense linear-probing data structure inside each hashtable entry).

For your concrete use case, you should err on the side of lazy caching if possible, since there's overhead associated with caching intermediate results that's not a good tradeoff if you're not really going to reuse the intermediate results for lots of separate downstream transformations. But as you mention in your question, if you're indeed using one RDD to branch out into multiple different downstream transformations, you might indeed need the caching step if the original transformations are at all expensive.

The workaround is to try to have inner data structures which are more amenable to constant-time calculations (e.g. arrays of primitives), where you can save a lot of cost on avoiding iterating over huge numbers of wrapper objects and depending on reflection for them in the SizeEstimator.

I tried things like Array[Array[Int]] and even though there's still nonzero overhead, it's 10x better for a similar data size:

scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28

scala> rdd.count // to trigger caching
res16: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 29ms
res17: Long = 1000

scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000

scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 763ms                                                                    
res20: Long = 1000

To illustrate just how bad the cost of reflection on any fancier objects is, if I remove the last toArray there and end up with each bigContent being a scala.collection.immutable.IndexedSeq[Array[Int]], the performance goes back to being within ~2x the slowness of the original IndexSeq[Map[Int,Int]] case:

scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28

scala> rdd.count // to trigger caching
res21: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 27ms
res22: Long = 1000

scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000

scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 2781ms                                                                   
res25: Long = 1000

As discussed in the comment section, you can also consider using the MEMORY_ONLY_SER StorageLevel, where as long as there's an efficient serializer, it can quite possibly be cheaper than the recursive reflection used in SizeEstimator; to do that you'd just replace cache() with persist(StorageLevel.MEMORY_ONLY_SER); as mentioned in this other question, cache() is conceptually the same thing as persist(StorageLevel.MEMORY_ONLY).

import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )

I actually tried this on both Spark 1.6.1 and Spark 2.0.0-preview running with everything else about the cluster configuration exactly the same (using Google Cloud Dataproc's "1.0" and "preview" image-versions, respectively). Unfortunately the MEMORY_ONLY_SER trick didn't appear to help in Spark 1.6.1:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6126ms                                                                   
res20: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms                                                                   
res21: Long = 1000

But in Spark 2.0.0-preview it seemed to improve performance by 10x:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 5353ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms                                                                   
res20: Long = 1000

This could vary depending on your objects though; speedup would only be expected if serialization itself isn't using tons of reflection anyway; if you're able to effectively use the Kryo serialization then it's likely you can see improvement using MEMORY_ONLY_SER for these large objects.

like image 192
Dennis Huo Avatar answered Jan 13 '23 14:01

Dennis Huo