Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Memory issue when importing parquet files in Spark

I am trying to query data from parquet files in Scala Spark (1.5), including a query of 2 million rows ("variants" in the following code).

val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")

val parquetFile = sqlContext.read.parquet(<path>)

parquetFile.registerTempTable("tmpTable")
sqlContext.cacheTable("tmpTable")

val patients = sqlContext.sql("SELECT DISTINCT patient FROM tmpTable ...)

val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... )

This runs fine when the number of rows fetched is low, but fails with a "Size exceeds Integer.MAX_VALUE" error when lots of data is requested. The error looks as follows:

User class threw exception: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 43 in stage 1.0 failed 4 times,
most recent failure: Lost task 43.3 in stage 1.0 (TID 123, node009):
java.lang.RuntimeException: java.lang.IllegalArgumentException:
Size exceeds Integer.MAX_VALUE at
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at ...

What can I do to make this work?

This looks like a memory issue, but I have tried using up to 100 executors with no difference (the time it takes to fail stays the same no matter the number of executors involved, too). It feels like the data isn't getting partitioned across the nodes?

I have attempted to force higher parallelization by naively replacing this line, to no avail:

val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ).repartition(sc.defaultParallelism*10)
like image 637
user1918737 Avatar asked Mar 22 '16 01:03

user1918737


1 Answers

I don't believe the issue is parquet specific. You are "hitting" a limitation on the maximum size of a partition in Spark.

Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at ...

The Integer.MAX_VALUE detected that you have a size of (I believe) a partition that is more than 2GB (requires more than an int32 to index it).

The comment from Joe Widen is spot on. You need to repartition your data even more. Try 1000 or more.

E.g.,

val data = sqlContext.read.parquet("data.parquet").rdd.repartition(1000).toDF
like image 104
marios Avatar answered Nov 20 '22 07:11

marios