I tried to broadcast a not-so-large map (~ 70 MB when saved to HDFS as text file), and I got out of memory errors. I tried to increase the driver memory to 11G and executor memory to 11G, and still got the same error. The memory.fraction is set to 0.3, and there's not many data (less than 1G) cached either.
When the map is only around 2 MB, there's no problem. I wonder if there is a size limit when broadcasting objects. How can I solve this problem using the bigger map? Thank you!
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:143)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:648)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1006)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1327)
Edit: Add more information according to the comments:
Some code related to the broadcasting:
val mappingAllLocal: Map[String, Int] = mappingAll.rdd.map(r => (r.getAs[String](0), r.getAs[Int](1))).collectAsMap().toMap
// I can use the above mappingAll to HDFS, and it's around 70MB
val mappingAllBrd = sc.broadcast(mappingAllLocal) // <-- this is where the out of memory happens
You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.
Out of memory at the executor level. This is a very common issue with Spark applications which may be due to various reasons. Some of the most common reasons are high concurrency, inefficient queries, and incorrect configuration.
Broadcasting in Spark Broadcasting works in Spark by broadcasting the data from executors to the drivers and then have the drivers broadcast it back to the executors. So in other words driver does it collect to get all the data and then it broadcasts the data back to the executors.
The maximum size for the broadcast table is 8GB. Spark also internally maintains a threshold of the table size to automatically apply broadcast joins.
Using set("spark.driver.memory", "15G")
has no effect on client mode. You have to use the command line parameter --conf="spark.driver.memory=15G"
when submitting the application to increase the driver's heap size.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With