Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: out of memory when broadcasting objects

I tried to broadcast a not-so-large map (~ 70 MB when saved to HDFS as text file), and I got out of memory errors. I tried to increase the driver memory to 11G and executor memory to 11G, and still got the same error. The memory.fraction is set to 0.3, and there's not many data (less than 1G) cached either.

When the map is only around 2 MB, there's no problem. I wonder if there is a size limit when broadcasting objects. How can I solve this problem using the bigger map? Thank you!

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
    at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
    at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
    at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229)
    at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194)
    at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
    at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
    at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
    at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
    at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
    at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165)
    at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:143)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:648)
    at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1006)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1327)

Edit: Add more information according to the comments:

  • I use spark-submit to submit the compiled jar file in client mode. Spark 1.5.0
  • spark.yarn.executor.memoryOverhead 600
  • set("spark.kryoserializer.buffer.max", "256m")
  • set("spark.speculation", "true")
  • set("spark.storage.memoryFraction", "0.3")
  • set("spark.driver.memory", "15G")
  • set("spark.executor.memory", "11G")
  • I tried set("spar.sql.tungsten.enabled", "false") and it doesn't help.
  • The master machine has 60G memory. Around 30G is used for Spark/Yarn. I'm not sure how much heap size is for my job, but there's not much other process going on at the same time. Especially the map is only around 70MB.

Some code related to the broadcasting:

val mappingAllLocal: Map[String, Int] = mappingAll.rdd.map(r => (r.getAs[String](0), r.getAs[Int](1))).collectAsMap().toMap
// I can use the above mappingAll to HDFS, and it's around 70MB
val mappingAllBrd = sc.broadcast(mappingAllLocal) // <-- this is where the out of memory happens
like image 252
EXP0 Avatar asked Jul 27 '16 21:07

EXP0


People also ask

How do I fix out of memory error in Spark?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

Can Spark run out of memory?

Out of memory at the executor level. This is a very common issue with Spark applications which may be due to various reasons. Some of the most common reasons are high concurrency, inefficient queries, and incorrect configuration.

What exactly happens in case of broadcasting in Spark?

Broadcasting in Spark Broadcasting works in Spark by broadcasting the data from executors to the drivers and then have the drivers broadcast it back to the executors. So in other words driver does it collect to get all the data and then it broadcasts the data back to the executors.

How much data we can broadcast in Spark?

The maximum size for the broadcast table is 8GB. Spark also internally maintains a threshold of the table size to automatically apply broadcast joins.


1 Answers

Using set("spark.driver.memory", "15G") has no effect on client mode. You have to use the command line parameter --conf="spark.driver.memory=15G" when submitting the application to increase the driver's heap size.

like image 83
Kien Truong Avatar answered Oct 09 '22 20:10

Kien Truong