Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark job running out of heap memory on takeSample

I've got an Apache spark cluster with one master node and three worker nodes. The worker nodes have 32 cores and 124G of memory each. I've also got a dataset in HDFS with around 650 million text records. This dataset is a number of serialized RDDs read in like so:

import org.apache.spark.mllib.linalg.{Vector, Vectors, SparseVector}
val vectors = sc.objectFile[(String, SparseVector)]("hdfs://mn:8020/data/*")

I'd like to extract a sample of one million of these records to do some analytics, so I figured I'd try val sample = vectors.takeSample(false, 10000, 0). However, that eventually fails with this error message:

 15/08/25 09:48:27 ERROR Utils: Uncaught exception in thread task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:79)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$r

I understand that I'm running out of heap space (on the driver, I think?), and that makes sense. Doing hadoop fs -du -s /path/to/data, the dataset takes up 2575 Gigabytes on disk (but is only ~850 GB in size).

So, my question is, what can I do to extract this sample of 1000000 records (which I later plan on serializing to disk)? I know I could just do takeSample() with smaller sample sizes and aggregate them later, but I think I'm just not setting the correct configuration or doing something wrong, which is preventing me from doing this the way I'd like.

like image 229
npp1993 Avatar asked Aug 25 '15 14:08

npp1993


People also ask

How do you fix heap memory in Spark?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

Can Spark run out of memory?

Out of memory at the executor level. This is a very common issue with Spark applications which may be due to various reasons. Some of the most common reasons are high concurrency, inefficient queries, and incorrect configuration.

Why do we get out of memory error in Spark?

The most likely cause of this exception is that not enough heap memory is allocated to the Java virtual machines (JVMs). These JVMs are launched as executors or drivers as part of the Apache Spark application.

How do I increase memory in executor Spark?

Use the --conf option to increase memory overhead when you run spark-submit. If increasing the memory overhead doesn't solve the problem, then reduce the number of executor cores.


1 Answers

When working with big data it is seldom a good idea to collect the intermediate results at the driver node. Instead it is almost always better to keep the data distributed in your cluster. The same holds true for the sample you want to take.

If you want to sample 1000000 elements of your data set to write it to disk afterwards, then why not taking the sample and writing it to disk without collecting it at the driver? The following code snippet should do exactly this

val sample = vectors.zipWithIndex().filter(_._1 < 1000000).map(_._2)

sample.saveAsObjectFile("path to file")
like image 121
Till Rohrmann Avatar answered Sep 24 '22 06:09

Till Rohrmann