Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Memory Usage of sc.textfile vs sc.wholeTextFiles + flatMapValues

Tags:

apache-spark

I have a set of log files I would like to read into an RDD. These log files are all compressed gzip files and the filenames are date stamped.

I've been using sc.wholeTextFiles() to read in the files and it seems like I've been running into Java heap memory issues. To isolate the problem, I decided to run it against a single file on a single machine as a test case.

I obtained the file from here:

http://dumps.wikimedia.org/other/pagecounts-raw/

Here are the sizes of the file, both compressed and uncompressed versions:

 myuser@fembuntu$ ls -ltr pagecounts-20090505-180000*
 -rw-rw-r-- 1 myuser myuser  65170192 Sep 20  2009 pagecounts-20090505-180000.gz
-rw-rw-r-- 1 myuser myuser 233007266 Jan 22 00:23 pagecounts-20090505-180000.txt

and the available memory on the machine is as follows:

myuser@fembuntu:~$ free -tm

       total       used       free     shared    buffers     cached
Mem:    4856       3018       1838        123         27        407
-/+ buffers/cache: 2583       2273
 Swap:  5080        849       4231
Total:  9937       3867       6069

So I fired up the spark-shell, giving the executor 2G of memory:

$ spark-shell --executor-memory 2G

scala> val pc_loc = "file:///home/myuser/data/pagecounts"
scala> val filename="/pagecounts-20090505-180000.gz"
filename: String = /pagecounts-20090505-180000.gz

Here I read in the data via sc.textFile() and display the 1st 2 lines:

scala>  var rdd=sc.textFile(pc_loc + filename)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:31

scala> rdd.take(2)
res0: Array[String] = Array(aa.b Help:Books 1 5263, aa.b Main_Page 1 5416)

That works fine.

Here I use sc.wholeTextFiles(), and split on the new line via flatMapValues() to obtain a pair RDD that with rows being key-value pairs. The values correspond to the rows in the RDD obtained by using sc.textFile(). The key would be the filepath.

scala> val pair_rdd=sc.wholeTextFiles(pc_loc + filename).flatMapValues(y => y.split("\n"))
pair_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at flatMapValues at <console>:31

But I get a heap error when I execute an action:

scala> pair_rdd.take(2)
16/01/22 01:13:36 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapCharBuffer.<init>(HeapCharBuffer.java:57)
at java.nio.CharBuffer.allocate(CharBuffer.java:335)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:795)
at org.apache.hadoop.io.Text.decode(Text.java:412)

Can anyone explain what's going in here ? Why does the flatMapValues call to split the lines seem to blow Java heap memory usage out of the water resulting in a heap error ?

like image 217
femibyte Avatar asked Dec 24 '22 10:12

femibyte


1 Answers

The problem you experience is not really specific to textFile vs wholeTextFiles with flatMapValues scenario. It looks like your program doesn't even get to the point where data is flattened and I am pretty sure you'll get the same exception when you call count instead of mapValues.

In practice is just a matter of creating large objects. Remember that wholeTextFiles has to read complete content of the file at once, it cannot be partially spilled to disk or partially garbage collected. While 200MB or so it is not particularly impressive but it is quite a lot to be handled by a single object. Moreover it has to reside on a single machine which means that is much harder to distribute the load.

Unlike wholeTextFiles, textFile provides much higher granularity in this particular case. Individual objects had to handle significantly less data and can be efficiently garbage collected if no longer required.

Ignoring size of the objects it looks like you're using Spark in a local mode. It means everything is handled by a single JVM. Since heap is shared by all threads it means that amount of memory available for actual processing can be lower than you expect.

Finally you should remember that only a part of the available memory is reserved for heap. See Garbage Collector Ergonomics and How is the default java heap size determined?. If you have to process large objects you can always override default initial and maximum heap size using -Xms / -Xmx Java options.

like image 77
zero323 Avatar answered Mar 31 '23 16:03

zero323