Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark runs out of memory when grouping by key

I am attempting to perform a simple transformation of common crawl data using Spark host on an EC2 using this guide, my code looks like this:

package ccminer

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object ccminer {
  val english = "english|en|eng"
  val spanish = "es|esp|spa|spanish|espanol"
  val turkish = "turkish|tr|tur|turc"
  val greek = "greek|el|ell"
  val italian = "italian|it|ita|italien"
  val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")

  def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")

  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      System.err.println("Bad command line")
      System.exit(-1)
    }

    val cluster = "spark://???"
    val sc = new SparkContext(cluster, "Common Crawl Miner",
      System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))

    sc.sequenceFile[String, String](args(0)).map {
      case (k, v) => (langIndep(k), v)
    }
    .groupByKey(args(2).toInt)
    .filter {
      case (_, vs) => vs.size > 1
    }
    .saveAsTextFile(args(1))
  }
}

And I am running it with the command as follows:

sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"

But very quickly it fails with errors as follows

java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

So my basic question is, what is necessary to write a Spark task that can group by key with an almost unlimited amount of input without running out of memory?

like image 450
John McCrae Avatar asked Mar 25 '14 14:03

John McCrae


People also ask

How do I fix out of memory error in Spark?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

Can Spark run out of memory?

Out of memory at the executor level. This is a very common issue with Spark applications which may be due to various reasons. Some of the most common reasons are high concurrency, inefficient queries, and incorrect configuration.

How do I use group by key in Spark?

The GroupByKey function in apache spark is defined as the frequently used transformation operation that shuffles the data. The GroupByKey function receives key-value pairs or (K, V) as its input and group the values based on the key, and finally, it generates a dataset of (K, Iterable) pairs as its output.

How do I reduce the GC time on my Spark?

Decrease the InitiatingHeapOccupancyPercent value ( default is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we have higher chances to avoid full GC. Increase the ConcGCThreads value, to have more threads for concurrent marking, thus we can speed up the concurrent marking phase.


2 Answers

So this says that you have run out of allocated heap space of JVM. You may increase heap size but still this is limited by system capabilities (Cannot exceed the amount of physical RAM).

On the other hand as explained by homutov this happens in large collecting operations. For example groupByKey, reduceByKey, cartisien + mapToPair . These operations will collect the RDD data into one place making JVM to run out of heap space.

What can you do?

With my experience, when a cluster/system have limited resources, you can use use Spark tuning guide. spark.default.parallelism can be increased till you can accompany task into your cluster/system [I once ran a KNN implementation for 14000 instance, 1024 feature dataset on my laptop's virtual machine by tweaking parallelism ].

Command line flag :   --conf spark.default.parallelism=4   ; 4 is the parallelism value

Remember, you need to TUNE these features to most effective and fail avoidance (running out of heap) setting to get best results out of Spark.

Additionally

Remember to use use primitive datatypes instead of wrappers . And use Arrays instead of collections.

 ex :  List<Integers> vs int[] ; int[] is better than List 

In Spark arrays can save many valuable space and improve performance.

Also use BroadCast variables instead of Cartesian product or any large combination task.

like image 67
Kavindu Dodanduwa Avatar answered Sep 20 '22 13:09

Kavindu Dodanduwa


The most common cause of java.lang.OutOfMemoryError exceptions in shuffle tasks (such as groupByKey, reduceByKey, etc.) is low level of parallelism.

You can increase default value by setting spark.default.parallelism property in configuration.

like image 20
homutov Avatar answered Sep 21 '22 13:09

homutov