Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Slowdown with repeated calls to spark dataframe in memory

Say I have 40 continuous (DoubleType) variables that I've bucketed into quartiles using ft_quantile_discretizer. Identifying the quartiles on all of the variables is super fast, as the function supports execution of multiple variables at once.

Next, I want to one hot code those bucketed variables, but there is no functionality currently supported to one hot code all of those variables with a single call. So I'm piping ft_string_indexer, ft_one_hot_encoder, and sdf_separate_column for each of the bucketed variables one at a time, by looping through the variables. This gets the job done. However, as the loop progresses, it slows down considerably. I'm thinking it's running out of memory, but can't figure out how to program this so that it executes with the same speed across the variables.

If q_vars is a character array of variable names (say 40 of them) for continuous variables, how can I code this up in a more spark-efficient way?

for (v in q_vars) {
   data_sprk_q<-data_sprk_q %>% 
       ft_string_indexer(v,paste0(v,"b"),"keep",string_order_type = "alphabetAsc") %>%
       ft_one_hot_encoder(paste0(v,"b"),paste0(v,"bc")) %>%
       sdf_separate_column(paste0(v,"bc"),into=q_vars_cat_list[[v]]) 
}

I also tried executing as a single massive pipeline with all of the variables referenced, but that too didn't solve the issue, so I'm thinking it doesn't have anything to do with the loop itself.

test_text<-paste0("data_sprk_q<-data_sprk_q %>% ", paste0("ft_string_indexer('",q_vars,"',paste0('",q_vars,"','b'),'keep',string_order_type = 'alphabetAsc') %>% ft_one_hot_encoder(paste0('",q_vars,"','b'),paste0('",q_vars,"','bc')) %>% sdf_separate_column(paste0('",q_vars,"','bc'),into=",q_vars_cat_list,")",collapse=" %>% "))
eval(parse(text=test_text))

Any help would be appreciated.

like image 576
hgb1234 Avatar asked Aug 21 '18 23:08

hgb1234


People also ask

Which option can be used in Spark SQL if you need to use an in-memory columnar structure to cache tables?

Spark SQL can cache tables using an in-memory columnar format by calling sqlContext. cacheTable("tableName") or dataFrame. cache().

Is Spark DataFrame stored in-memory?

Using DataFrame API The default value of the storageLevel for both functions is MEMORY_AND_DISK which means that the data will be stored in memory if there is space for it, otherwise, it will be stored on disk. Here you can see the (PySpark) documentation for other possible storage levels.

What does cache () do in PySpark?

cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.


1 Answers

In general some (sometimes substantial) slowdown with long ML Pipeline is expected, as a result of worse than linear complexity of the Catalyst optimizer. Short of splitting the process into multiple pipelines, and breaking the lineage in between (either using checkpoints and writing data to persistent storage and loading it back) there is not much you can about it at the moment.

However you current code adds a number of problems on top of that:

  • Unless you use more than 10 buckets StringIndexer

    ft_string_indexer(v ,paste0(v, "b"), "keep", string_order_type = "alphabetAsc")
    

    just duplicates the labels assigned by QuantileDiscretizer. With larger number of levels behavior becomes even less useful when using lexicographic order.

  • Applying One-Hot-Encoding might not be required at all (and in the worst case scenario can be harmful), depending on the downstream process, and even with linear models, might not be strictly necessary (you could argue that assigned labels are valid ordinals, and recording as nominal values, and increasing dimensionality is not desired outcome).

  • However the biggest problem is application of sdf_separate_column. It

    • Increases the cost of computing the execution plan by increasing the number of expressions.
    • Increases amount of memory required for processing by converting sparse data into dense.
    • Internally sparklyr uses UserDefinedFunction on each index, effectively causing reapeated allocation, decoding and garbage collection for the same row putting a lot of pressure on the cluster.
    • Last but not least it discards column metadata, extensively used by Spark ML.

    I would strongly advise against using this function here. Based on your comments it looks like you want to subset columns before passing the result to some other algorithm - for that you can use VectorSlicer.

Overall you can rewrite your pipeline as

set.seed(1)

df <- copy_to(sc, tibble(x=rnorm(100), y=runif(100), z=rpois(100, 1)))

input_cols <- colnames(df)
discretized_cols <- paste0(input_cols, "_d")
encoded_cols <- paste0(discretized_cols, "_e") %>% setNames(discretized_cols)

discretizer <- ft_quantile_discretizer(
  sc, input_cols = input_cols, output_cols = discretized_cols, num_buckets = 10
)
encoders <- lapply(
  discretized_cols, 
  function(x) ft_one_hot_encoder(sc, input_col=x, output_col=encoded_cols[x])
)

transformed_df <- do.call(ml_pipeline, c(list(discretizer), encoders)) %>%
  ml_fit(df) %>% 
  ml_transform(df)

and apply ft_vector_slicer when needed. For example to take values corresponding to the first, third and sixth bucket from x you can:

transformed_df %>% 
  ft_vector_slicer(
    input_col="x_d_e", output_col="x_d_e_s", indices=c(0, 2, 5)) 
like image 143
zero323 Avatar answered Sep 22 '22 07:09

zero323