I've got an Apache spark cluster with one master node and three worker nodes. The worker nodes have 32 cores and 124G of memory each. I've also got a dataset in HDFS with around 650 million text records. This dataset is a number of serialized RDDs read in like so:
import org.apache.spark.mllib.linalg.{Vector, Vectors, SparseVector}
val vectors = sc.objectFile[(String, SparseVector)]("hdfs://mn:8020/data/*")
I'd like to extract a sample of one million of these records to do some analytics, so I figured I'd try val sample = vectors.takeSample(false, 10000, 0)
. However, that eventually fails with this error message:
15/08/25 09:48:27 ERROR Utils: Uncaught exception in thread task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:79)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$r
I understand that I'm running out of heap space (on the driver, I think?), and that makes sense. Doing hadoop fs -du -s /path/to/data
, the dataset takes up 2575 Gigabytes on disk (but is only ~850 GB in size).
So, my question is, what can I do to extract this sample of 1000000 records (which I later plan on serializing to disk)? I know I could just do takeSample()
with smaller sample sizes and aggregate them later, but I think I'm just not setting the correct configuration or doing something wrong, which is preventing me from doing this the way I'd like.
You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.
Out of memory at the executor level. This is a very common issue with Spark applications which may be due to various reasons. Some of the most common reasons are high concurrency, inefficient queries, and incorrect configuration.
The most likely cause of this exception is that not enough heap memory is allocated to the Java virtual machines (JVMs). These JVMs are launched as executors or drivers as part of the Apache Spark application.
Use the --conf option to increase memory overhead when you run spark-submit. If increasing the memory overhead doesn't solve the problem, then reduce the number of executor cores.
When working with big data it is seldom a good idea to collect the intermediate results at the driver node. Instead it is almost always better to keep the data distributed in your cluster. The same holds true for the sample you want to take.
If you want to sample 1000000 elements of your data set to write it to disk afterwards, then why not taking the sample and writing it to disk without collecting it at the driver? The following code snippet should do exactly this
val sample = vectors.zipWithIndex().filter(_._1 < 1000000).map(_._2)
sample.saveAsObjectFile("path to file")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With