Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel computing, which alternative to tidyr::complete in dplyr?

I am trying to parallelise a pipe. In the pipe there is a tidyr command ("tidyr::complete"). This breaks down the code once run in parallel, as the object class is not recognised.

Is there an alternative in dplyr to complete?

library(dplyr)
library(tidyr)
library(zoo)


test <- tibble(year=c(1,2,3,4,5,5,1,4,5),
               var_1=c(1,1,1,1,1,1,2,2,2), 
               var_2=c(1,1,1,1,1,2,3,3,3), 
               var_3=c(0,5,NA,15,20,NA,1,NA,NA))

max_year <- max(test$year,na.rm = T)
min_year <- min(test$year,na.rm = T)

SERIAL


test_serial <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1, year = seq(min_year,max_year)) %>%
  mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))


PARALLEL (THIS FAILS)

devtools::install_github("hadley/multidplyr")
library(multidplyr)

cl <- new_cluster(2)
cluster_copy(cl, c("test","max_year","min_year"))
cluster_library(cl, c("dplyr","tidyr","zoo"))

test_parallel <- test %>% group_by(var_1,var_2) %>% partition(cl)
test_parallel <- test_parallel %>% 
  dplyr::group_by(var_1,var_2) %>% 
  tidyr::complete(var_1, year = seq(min_year,max_year)) %>%
  dplyr::mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) %>% 
  collect()

This is the error message

Error in UseMethod("complete_") : 
  no applicable method for 'complete_' applied to an object of class "multidplyr_party_df"
like image 606
MCS Avatar asked Jun 24 '20 11:06

MCS


1 Answers

Multidplyr allows you to :

  1. split the data up using partition()
  2. process each partition on a dedicated node
  3. collect() the results

All data processing tasks are not suitable for the previous workflow.

In particular, complete needs to know all the possible values in the input data in order to create the missing rows, which means that this operation as a whole can't be split, that's why no applicable method is available.

In the example you provide, each node would receive a single var_1, var_2 pair without knowing what the other nodes got, which doesn't allow to achieve the expected result in parallel.

However, as you already know that year = seq(min_year,max_year), you could parallelize the complete task for this variable only, splitting tasks by var_1, for example with the furrr package:

library(furrr)
plan(multiprocess)
test_parallel <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1) %>% split(.$var_1) %>% 
  furrr::future_map(~{
    complete(.x, year = seq(min_year,max_year)) %>%
    dplyr::mutate(
        var_3 = na.approx(var_3,na.rm = FALSE),
        var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) 
    }) %>% bind_rows()

> identical(c(test_serial$var_1,test_serial$var_2,test_serial$var_3,test_serial$year),
+           c(test_parallel$var_1,test_parallel$var_2,test_parallel$var_3,test_parallel$year))
[1] TRUE

To be tested on a larger dataset to measure potential performance improvements.

like image 90
Waldi Avatar answered Sep 21 '22 00:09

Waldi