I have a set of log files I would like to read into an RDD. These log files are all compressed gzip files and the filenames are date stamped.
I've been using sc.wholeTextFiles()
to read in the files and it seems like I've been running into Java heap memory issues. To isolate the problem, I decided to run it against a single file on a single machine as a test case.
I obtained the file from here:
http://dumps.wikimedia.org/other/pagecounts-raw/
Here are the sizes of the file, both compressed and uncompressed versions:
myuser@fembuntu$ ls -ltr pagecounts-20090505-180000*
-rw-rw-r-- 1 myuser myuser 65170192 Sep 20 2009 pagecounts-20090505-180000.gz
-rw-rw-r-- 1 myuser myuser 233007266 Jan 22 00:23 pagecounts-20090505-180000.txt
and the available memory on the machine is as follows:
myuser@fembuntu:~$ free -tm
total used free shared buffers cached
Mem: 4856 3018 1838 123 27 407
-/+ buffers/cache: 2583 2273
Swap: 5080 849 4231
Total: 9937 3867 6069
So I fired up the spark-shell, giving the executor 2G of memory:
$ spark-shell --executor-memory 2G
scala> val pc_loc = "file:///home/myuser/data/pagecounts"
scala> val filename="/pagecounts-20090505-180000.gz"
filename: String = /pagecounts-20090505-180000.gz
Here I read in the data via sc.textFile()
and display the 1st 2 lines:
scala> var rdd=sc.textFile(pc_loc + filename)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:31
scala> rdd.take(2)
res0: Array[String] = Array(aa.b Help:Books 1 5263, aa.b Main_Page 1 5416)
That works fine.
Here I use sc.wholeTextFiles()
, and split on the new line via flatMapValues()
to obtain a pair RDD that with rows being key-value pairs. The values correspond to the rows in the RDD obtained by using sc.textFile()
. The key would be the filepath.
scala> val pair_rdd=sc.wholeTextFiles(pc_loc + filename).flatMapValues(y => y.split("\n"))
pair_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at flatMapValues at <console>:31
But I get a heap error when I execute an action:
scala> pair_rdd.take(2)
16/01/22 01:13:36 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapCharBuffer.<init>(HeapCharBuffer.java:57)
at java.nio.CharBuffer.allocate(CharBuffer.java:335)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:795)
at org.apache.hadoop.io.Text.decode(Text.java:412)
Can anyone explain what's going in here ? Why does the flatMapValues
call to split the lines seem to blow Java heap memory usage out of the water resulting in a heap error ?
The problem you experience is not really specific to textFile
vs wholeTextFiles
with flatMapValues
scenario. It looks like your program doesn't even get to the point where data is flattened and I am pretty sure you'll get the same exception when you call count
instead of mapValues
.
In practice is just a matter of creating large objects. Remember that wholeTextFiles
has to read complete content of the file at once, it cannot be partially spilled to disk or partially garbage collected. While 200MB or so it is not particularly impressive but it is quite a lot to be handled by a single object. Moreover it has to reside on a single machine which means that is much harder to distribute the load.
Unlike wholeTextFiles
, textFile
provides much higher granularity in this particular case. Individual objects had to handle significantly less data and can be efficiently garbage collected if no longer required.
Ignoring size of the objects it looks like you're using Spark in a local mode. It means everything is handled by a single JVM. Since heap is shared by all threads it means that amount of memory available for actual processing can be lower than you expect.
Finally you should remember that only a part of the available memory is reserved for heap. See Garbage Collector Ergonomics and How is the default java heap size determined?. If you have to process large objects you can always override default initial and maximum heap size using -Xms
/ -Xmx
Java options.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With