Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Out of Memory Error when Reading large file in Spark 2.1.0

I want to use spark to read a large (51GB) XML file (on an external HDD) into a dataframe (using spark-xml plugin), do simple mapping / filtering, reordering it and then writing it back to disk, as a CSV file.

But I always get a java.lang.OutOfMemoryError: Java heap space no matter how I tweak this.

I want to understand why doesn't increasing the number of partitions stop the OOM error

Shouldn't it split the task into more parts so that each individual part is smaller and doesn't cause memory problems?

(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)

Things I've tried:

  • repartitioning/coalescing to (5,000 and 10,000 partitions) the dataframe when reading and when writing (initial value is 1,604)
  • using a smaller number of executors (6, 4, even with 2 executors I get OOM error!)
  • decrease the size of split files (default looks like it's 33MB)
  • give tons of RAM (all I have)
  • increase spark.memory.fraction to 0.8 (default is 0.6)
  • decrease spark.memory.storageFraction to 0.2 (default is 0.5)
  • set spark.default.parallelism to a 30 and 40 (default is 8 for me)
  • set spark.files.maxPartitionBytes to 64M (default is 128M)

All my code is here (notice i'm not caching anything):

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema) // defined previously
  .option("rowTag", "row")
  .load(s"$pathToInputXML")

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604

// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)

// filter and select only the cols i'm interested in
val dsout = df2
  .where( df2.col("_TypeId") === "1" )
  .select(
    df("_Id").as("id"),
    df("_Title").as("title"),
    df("_Body").as("body"),
  ).as[Post]

// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r


// more mapping
dsout
 .map{
  case Post(id,title,body,tags) =>

    val body1 = tagPat.replaceAllIn(body,"")
    val body2 = whitespacePat.replaceAllIn(body1," ")

    Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))

}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)

NOTES

  • the input split are really small (33MB only), so why can't I have 8 threads each processing one split? it really shouldn't blow my memory (I've se

UPDATE I've written a shorter version of the code that just reads the file and then forEachPartition(println).

I get the same OOM error:

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema)
  .option("rowTag", "row")
  .load(s"$pathToInputXML")
  .repartition(numPartitions)

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")

df
  .where(df.col("_PostTypeId") === "1")
  .select(
   df("_Id").as("id"),
   df("_Title").as("title"),
   df("_Body").as("body"),
   df("_Tags").as("tags")
  ).as[Post]
  .map {
    case Post(id, title, body, tags) =>
      Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
  }
  .foreachPartition { rdd =>
    if (rdd.nonEmpty) {
      println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
    }
  }

P.S.: I'm using spark v 2.1.0. My machine has 8 cores and 16 GB ram.

like image 679
Felipe Avatar asked May 05 '17 04:05

Felipe


1 Answers

I was getting this error when running spark-shell and hence I increased the driver memory to a high number. Then I was able to load the XML.

spark-shell --driver-memory 6G

Source: https://github.com/lintool/warcbase/issues/246#issuecomment-249272263

like image 143
joydeep bhattacharjee Avatar answered Nov 28 '22 13:11

joydeep bhattacharjee