I want to use spark to read a large (51GB) XML file (on an external HDD) into a dataframe (using spark-xml plugin), do simple mapping / filtering, reordering it and then writing it back to disk, as a CSV file.
But I always get a java.lang.OutOfMemoryError: Java heap space
no matter how I tweak this.
I want to understand why doesn't increasing the number of partitions stop the OOM error
Shouldn't it split the task into more parts so that each individual part is smaller and doesn't cause memory problems?
(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)
Things I've tried:
spark.memory.fraction
to 0.8 (default is 0.6)spark.memory.storageFraction
to 0.2 (default is 0.5)spark.default.parallelism
to a 30 and 40 (default is 8 for me)spark.files.maxPartitionBytes
to 64M (default is 128M)All my code is here (notice i'm not caching anything):
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where( df2.col("_TypeId") === "1" )
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// more mapping
dsout
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
NOTES
UPDATE I've written a shorter version of the code that just reads the file and then forEachPartition(println).
I get the same OOM error:
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema)
.option("rowTag", "row")
.load(s"$pathToInputXML")
.repartition(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
df
.where(df.col("_PostTypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
df("_Tags").as("tags")
).as[Post]
.map {
case Post(id, title, body, tags) =>
Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
}
.foreachPartition { rdd =>
if (rdd.nonEmpty) {
println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
}
}
P.S.: I'm using spark v 2.1.0. My machine has 8 cores and 16 GB ram.
I was getting this error when running spark-shell and hence I increased the driver memory to a high number. Then I was able to load the XML.
spark-shell --driver-memory 6G
Source: https://github.com/lintool/warcbase/issues/246#issuecomment-249272263
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With