Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterating/looping over Spark parquet files in a script results in memory error/build-up (using Spark SQL queries)

I've been trying to figure out how to keep Spark from crashing due to memory issues when I loop over parquet files and several post-processing functions. Sorry for the deluge of text but it's not exactly one specific bug (I'm using PySpark.) Apologies if this breaks proper Stack Overflow form!

Basic pseudo code is:

#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its  "=" subdirectory
for counter in fileNums:
  sparkDataFrame = sqlContext.read.parquet(counter)
  summaryReportOne = sqlContext.sql.("SELECT.....")
  summaryReportOne.write.partition("id").parquet("/")
  summaryReportTwo = sqlContext.sql.("SELECT....")
  summaryReportTwo.write.partition("id").parquet("/")
  #several more queries, several involving joins, etc....

This code uses spark SQL queries, so I've been unsuccessful at creating a wrapper function with all of the SQL queries/functions and passing it into foreach (which can't take a sparkContext or sqlQuery as input) as opposed to a standard for loop.

Technically, this is one big parquet file that has partitions, but it's far to big to read in all at once and query on it; I need to run the functions on each partition. So I just run a regular python loop in PySpark where on each loop, I process one parquet partition (sub-directory) and write relevant output reports.

Not sure if wrapping all the code around a big mapPartition() would work either due to the size of the entire parquet file?

But after a few loops, the script crashes due to memory errors - specifically, a Java heap error. (I have confirmed there's nothing special about the file for which the loop crashes; it happens with whatever random file is read in on the second or third loop.)

Caused by: com.google.protobuf.ServiceException:     
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at    org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space

I realize Spark isn't mean to be run in a loop, but these SQL queries are a bit too complex for just the standard Spark SQL packaged functions, and we write out multiple summary reports for each file on different aggregation statistics.

Is there a way to basically clear the memory at the end of each loop index? Dropping any registered temp tables using sqlContext.dropTempTable() and clearing the cache using sqlContext.clearCache() hasn't helped. If I try and stop the sparkContext and re-start it in each loop, I also get errors, as some processes haven't "wrapped" up yet (it seems like you used to be able to "gracefully" stop a context but I couldn't find this in current PySpark documentation.)

I should also note that I am not calling unpersist() on dataframes in the loop after I'm done with them but I'm also not calling persist() on them; I just re-write over the dataframes within each loop (which could be part of the issue).

I am working with our engineering team to tweak the memory settings, but we know we're already allocating far enough memory to finish one loop of this script (and one loop does run without any errors).

Any suggestions at all would be helpful - including tools that might be better for this use case than Spark. I am using Spark version 1.6.1.

like image 923
kplaney Avatar asked Oct 31 '22 03:10

kplaney


1 Answers

Update: if I call unpersist() on each table I make from a sql query after I'm done with it in each loop, then the loop can continue successfully onto the next iteration without memory issues. .clearCache() and dropping temp tables alone, as noted above, did not do the trick. My guess it this worked because while the tables were from from sparkSQL queries, that returns an RDD.

Even though I didn't call persist() to these RDDs, I had to tell Spark to clear them out before the next loop started so that I could assign new SQL queries to these same variable names.

like image 136
kplaney Avatar answered Nov 08 '22 04:11

kplaney