I am attempting to perform a simple transformation of common crawl data using Spark host on an EC2 using this guide, my code looks like this:
package ccminer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object ccminer {
val english = "english|en|eng"
val spanish = "es|esp|spa|spanish|espanol"
val turkish = "turkish|tr|tur|turc"
val greek = "greek|el|ell"
val italian = "italian|it|ita|italien"
val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")
def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")
def main(args: Array[String]): Unit = {
if (args.length != 3) {
System.err.println("Bad command line")
System.exit(-1)
}
val cluster = "spark://???"
val sc = new SparkContext(cluster, "Common Crawl Miner",
System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))
sc.sequenceFile[String, String](args(0)).map {
case (k, v) => (langIndep(k), v)
}
.groupByKey(args(2).toInt)
.filter {
case (_, vs) => vs.size > 1
}
.saveAsTextFile(args(1))
}
}
And I am running it with the command as follows:
sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"
But very quickly it fails with errors as follows
java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
So my basic question is, what is necessary to write a Spark task that can group by key with an almost unlimited amount of input without running out of memory?
You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.
Out of memory at the executor level. This is a very common issue with Spark applications which may be due to various reasons. Some of the most common reasons are high concurrency, inefficient queries, and incorrect configuration.
The GroupByKey function in apache spark is defined as the frequently used transformation operation that shuffles the data. The GroupByKey function receives key-value pairs or (K, V) as its input and group the values based on the key, and finally, it generates a dataset of (K, Iterable) pairs as its output.
Decrease the InitiatingHeapOccupancyPercent value ( default is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we have higher chances to avoid full GC. Increase the ConcGCThreads value, to have more threads for concurrent marking, thus we can speed up the concurrent marking phase.
So this says that you have run out of allocated heap space of JVM. You may increase heap size but still this is limited by system capabilities (Cannot exceed the amount of physical RAM).
On the other hand as explained by homutov this happens in large collecting operations. For example groupByKey, reduceByKey, cartisien + mapToPair . These operations will collect the RDD data into one place making JVM to run out of heap space.
What can you do?
With my experience, when a cluster/system have limited resources, you can use use Spark tuning guide. spark.default.parallelism can be increased till you can accompany task into your cluster/system [I once ran a KNN implementation for 14000 instance, 1024 feature dataset on my laptop's virtual machine by tweaking parallelism ].
Command line flag : --conf spark.default.parallelism=4 ; 4 is the parallelism value
Remember, you need to TUNE these features to most effective and fail avoidance (running out of heap) setting to get best results out of Spark.
Additionally
Remember to use use primitive datatypes instead of wrappers . And use Arrays instead of collections.
ex : List<Integers> vs int[] ; int[] is better than List
In Spark arrays can save many valuable space and improve performance.
Also use BroadCast variables instead of Cartesian product or any large combination task.
The most common cause of java.lang.OutOfMemoryError exceptions in shuffle tasks (such as groupByKey, reduceByKey, etc.) is low level of parallelism.
You can increase default value by setting spark.default.parallelism property in configuration.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With