Say I have 40 continuous (DoubleType
) variables that I've bucketed into quartiles using ft_quantile_discretizer
. Identifying the quartiles on all of the variables is super fast, as the function supports execution of multiple variables at once.
Next, I want to one hot code those bucketed variables, but there is no functionality currently supported to one hot code all of those variables with a single call. So I'm piping ft_string_indexer
, ft_one_hot_encoder
, and sdf_separate_column
for each of the bucketed variables one at a time, by looping through the variables. This gets the job done. However, as the loop progresses, it slows down considerably. I'm thinking it's running out of memory, but can't figure out how to program this so that it executes with the same speed across the variables.
If q_vars
is a character array of variable names (say 40 of them) for continuous variables, how can I code this up in a more spark-efficient way?
for (v in q_vars) {
data_sprk_q<-data_sprk_q %>%
ft_string_indexer(v,paste0(v,"b"),"keep",string_order_type = "alphabetAsc") %>%
ft_one_hot_encoder(paste0(v,"b"),paste0(v,"bc")) %>%
sdf_separate_column(paste0(v,"bc"),into=q_vars_cat_list[[v]])
}
I also tried executing as a single massive pipeline with all of the variables referenced, but that too didn't solve the issue, so I'm thinking it doesn't have anything to do with the loop itself.
test_text<-paste0("data_sprk_q<-data_sprk_q %>% ", paste0("ft_string_indexer('",q_vars,"',paste0('",q_vars,"','b'),'keep',string_order_type = 'alphabetAsc') %>% ft_one_hot_encoder(paste0('",q_vars,"','b'),paste0('",q_vars,"','bc')) %>% sdf_separate_column(paste0('",q_vars,"','bc'),into=",q_vars_cat_list,")",collapse=" %>% "))
eval(parse(text=test_text))
Any help would be appreciated.
Spark SQL can cache tables using an in-memory columnar format by calling sqlContext. cacheTable("tableName") or dataFrame. cache().
Using DataFrame API The default value of the storageLevel for both functions is MEMORY_AND_DISK which means that the data will be stored in memory if there is space for it, otherwise, it will be stored on disk. Here you can see the (PySpark) documentation for other possible storage levels.
cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.
In general some (sometimes substantial) slowdown with long ML Pipeline is expected, as a result of worse than linear complexity of the Catalyst optimizer. Short of splitting the process into multiple pipelines, and breaking the lineage in between (either using checkpoints and writing data to persistent storage and loading it back) there is not much you can about it at the moment.
However you current code adds a number of problems on top of that:
Unless you use more than 10 buckets StringIndexer
ft_string_indexer(v ,paste0(v, "b"), "keep", string_order_type = "alphabetAsc")
just duplicates the labels assigned by QuantileDiscretizer
. With larger number of levels behavior becomes even less useful when using lexicographic order.
Applying One-Hot-Encoding might not be required at all (and in the worst case scenario can be harmful), depending on the downstream process, and even with linear models, might not be strictly necessary (you could argue that assigned labels are valid ordinals, and recording as nominal values, and increasing dimensionality is not desired outcome).
However the biggest problem is application of sdf_separate_column
. It
sparklyr
uses UserDefinedFunction
on each index, effectively causing reapeated allocation, decoding and garbage collection for the same row putting a lot of pressure on the cluster.I would strongly advise against using this function here. Based on your comments it looks like you want to subset columns before passing the result to some other algorithm - for that you can use VectorSlicer
.
Overall you can rewrite your pipeline as
set.seed(1)
df <- copy_to(sc, tibble(x=rnorm(100), y=runif(100), z=rpois(100, 1)))
input_cols <- colnames(df)
discretized_cols <- paste0(input_cols, "_d")
encoded_cols <- paste0(discretized_cols, "_e") %>% setNames(discretized_cols)
discretizer <- ft_quantile_discretizer(
sc, input_cols = input_cols, output_cols = discretized_cols, num_buckets = 10
)
encoders <- lapply(
discretized_cols,
function(x) ft_one_hot_encoder(sc, input_col=x, output_col=encoded_cols[x])
)
transformed_df <- do.call(ml_pipeline, c(list(discretizer), encoders)) %>%
ml_fit(df) %>%
ml_transform(df)
and apply ft_vector_slicer
when needed. For example to take values corresponding to the first, third and sixth bucket from x
you can:
transformed_df %>%
ft_vector_slicer(
input_col="x_d_e", output_col="x_d_e_s", indices=c(0, 2, 5))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With