I wrote some piece of code that reads multiple parquet files and caches them for subsequent use. My code looks simplified like this
val data = SparkStartup.sqlContext.read.parquet(...)
data.setName(...).persist(StorageLevel.MEMORY_AND_DISK_SER).collect()
map += data
The parquet file are in total about 11g. I config my application by:
val sparkConfig = new SparkConf().setAppName(...).setMaster("local[128]")
sparkConfig.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConfig.set("spark.kryoserializer.buffer.max", "512m");
sparkConfig.set("spark.kryoserializer.buffer", "256");
sparkConfig.set("spark.driver.maxResultSize", "0");
sparkConfig.set("spark.driver.memory", "9g");
I thought that by using MEMORY_AND_DISK_SER
, Spark would spill out to disk if too much memory is used. However, I get `java.lang.OutOfMemoryError: Java heap space errors at
at java.util.Arrays.copyOf(Arrays.java:3230)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeName(DefaultClassResolver.java:105)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:81)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
Why is this? I start my application with -Xmx9g -Dspark.executor.memory=9g -Dspark.executor.cores=3
. For the files that are read before everything crashes, I can see in the SparkUI that a parquet files takes 9x its size when read to memory.
It is because you are calling collect()
in your driver application. This returns an Array
of your data items, which would need to fit into memory.
You should instead work with the data
RDD and map, reduce, group, etc your large set of data into some desired result, and then collect()
that smaller amount of data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With