I am trying to parallelise a pipe. In the pipe there is a tidyr command ("tidyr::complete"). This breaks down the code once run in parallel, as the object class is not recognised.
Is there an alternative in dplyr to complete?
library(dplyr)
library(tidyr)
library(zoo)
test <- tibble(year=c(1,2,3,4,5,5,1,4,5),
var_1=c(1,1,1,1,1,1,2,2,2),
var_2=c(1,1,1,1,1,2,3,3,3),
var_3=c(0,5,NA,15,20,NA,1,NA,NA))
max_year <- max(test$year,na.rm = T)
min_year <- min(test$year,na.rm = T)
SERIAL
test_serial <- test %>%
group_by(var_1,var_2) %>%
complete(var_1, year = seq(min_year,max_year)) %>%
mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))
PARALLEL (THIS FAILS)
devtools::install_github("hadley/multidplyr")
library(multidplyr)
cl <- new_cluster(2)
cluster_copy(cl, c("test","max_year","min_year"))
cluster_library(cl, c("dplyr","tidyr","zoo"))
test_parallel <- test %>% group_by(var_1,var_2) %>% partition(cl)
test_parallel <- test_parallel %>%
dplyr::group_by(var_1,var_2) %>%
tidyr::complete(var_1, year = seq(min_year,max_year)) %>%
dplyr::mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) %>%
collect()
This is the error message
Error in UseMethod("complete_") :
no applicable method for 'complete_' applied to an object of class "multidplyr_party_df"
Multidplyr allows you to :
partition()
collect()
the resultsAll data processing tasks are not suitable for the previous workflow.
In particular, complete
needs to know all the possible values in the input data in order to create the missing rows, which means that this operation as a whole can't be split, that's why no applicable method is available.
In the example you provide, each node would receive a single var_1, var_2
pair without knowing what the other nodes got, which doesn't allow to achieve the expected result in parallel.
However, as you already know that year = seq(min_year,max_year)
, you could parallelize the complete
task for this variable only, splitting tasks by var_1
, for example with the furrr
package:
library(furrr)
plan(multiprocess)
test_parallel <- test %>%
group_by(var_1,var_2) %>%
complete(var_1) %>% split(.$var_1) %>%
furrr::future_map(~{
complete(.x, year = seq(min_year,max_year)) %>%
dplyr::mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))
}) %>% bind_rows()
> identical(c(test_serial$var_1,test_serial$var_2,test_serial$var_3,test_serial$year),
+ c(test_parallel$var_1,test_parallel$var_2,test_parallel$var_3,test_parallel$year))
[1] TRUE
To be tested on a larger dataset to measure potential performance improvements.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With