Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Out of memory error when collecting data out of Spark cluster

Tags:

I know there are plenty of questions on SO about out of memory errors on Spark but I haven't found a solution to mine.

I have a simple workflow:

  1. read in ORC files from Amazon S3
  2. filter down to a small subset of rows
  3. select a small subset of columns
  4. collect into the driver node (so I can do additional operations in R)

When I run the above and then cache the table to spark memory it takes up <2GB - tiny compared to the memory available to my cluster - then I get an OOM error when I try to collect the data to my driver node.

I have tried running on the following setups:

  • local mode on a computer with 32 cores and 244GB of ram
  • standalone mode with 10 x 6.2 GB executors and a 61GB driver node

For each of these I have played with numerous configurations of executor.memory, driver.memory, and driver.maxResultSize to cover the full range of possible values within my available memory, but always I end up with an out of memory error at the collect stage; either java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded, or Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned. (a sparklyr error indicative of memory issues).

Based on my [limited] understanding of Spark, caching a table prior to collecting should force all calculations - i.e. if the table is sitting happily in memory after caching at <2GB, then I shouldn't need much more than 2GB of memory to collect it into the driver node.

Note that answers to this question have some suggestions I am yet to try, but these are likely to impact performance (e.g. serialising the RDD) so would like to avoid using if possible.

My questions:

  1. how it can be that a dataframe that takes up so little space after it has been cached can cause memory problems?
  2. is there something obvious for me to check/change/troubleshoot to help fix the problem, before I move on to additional options that may compromise performance?

Thank you

Edit: note in response to @Shaido's comment below, calling cache via Sparklyr "forces data to be loaded in memory by executing a count(*) over the table" [from Sparklyr documentation] - i.e. the table should be sitting in memory and all the calculations run (I believe) prior to calling collect.

Edit: some additional observations since following the suggestions below:

  • As per the comments below, I have now tried writing the data to csv instead of collecting to get an idea of likely file size. This operation creates a set of csvs amounting to ~3GB, and takes only 2 seconds when run after caching.
  • If I set driver.maxResultSize to <1G I get an error stating that the size of the serialized RDD is 1030 MB, larger than driver.maxResultSize.
  • If I watch memory usage in Task Manager after calling collect I see that usage just keeps going up until it reaches ~ 90GB, at which point the OOM error occurs. So for whatever reason the amount of RAM being used to perform the collect operation is ~100x greater than the size of the RDD I'm trying to collect.

Edit: code added below, as requested in comments.

#__________________________________________________________________________________________________________________________________

# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________

firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'

library(dplyr)
library(stringr)
library(sparklyr)

#__________________________________________________________________________________________________________________________________

# Configure & connect to spark
#__________________________________________________________________________________________________________________________________

Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop") 

config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions

# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g' 
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')

#__________________________________________________________________________________________________________________________________

# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________

#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++

spark_session(sc) %>%
  invoke("read") %>% 
  invoke("format", "orc") %>%
  invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>% 
  invoke("createOrReplaceTempView", "alldatadf") 
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory

#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++

# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1

# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2) 

# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7) 
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)

#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++

# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory

  # filter by month and year, using ORC partitions for extra speed
  filter(((date_year==year_y1  & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
            (date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
            (date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
            (date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%

  # filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
  filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%

  # filter by advertiser ID
  filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) & 
            !is.na(advertiser_id)) |
           ((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 | 
               floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%

  # Define cols to keep
  transmute(time=as.numeric(event_time/1000000),
            user_id=as.character(user_id),
            action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
            lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
            activity_lookup=as.character(activity_id),
            sv1=as.character(segment_value_1),
            other_data=as.character(other_data))  %>%
  mutate(time_char=as.character(from_unixtime(time)))

# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")

#__________________________________________________________________________________________________________________________________

# Collect out of spark
#__________________________________________________________________________________________________________________________________

myDF <- collect(dftbl)
like image 688
jay Avatar asked Aug 25 '17 01:08

jay


People also ask

How do I fix out of memory error in Spark?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

What happens if data do not fit in memory in Spark?

Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.


1 Answers

When you say collect on the dataframe there are 2 things happening,

  1. First is all the data has to be written to the output on the driver.
  2. The driver has to collect the data from all nodes and keep in its memory.

Answer:

If you are looking to just load the data into memory of the exceutors, count() is also an action that will load the data into the executor's memory which can be used by other processes.

If you want to extract the data, then try this along with other properties when puling the data "--conf spark.driver.maxResultSize=10g".

like image 146
BalaramRaju Avatar answered Sep 26 '22 06:09

BalaramRaju