Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel *ply within functions

I want to use the parallel functionality of the plyr package within functions.

I would have thought that the proper way to export objects that have been created within the body of the function (in this example, the object is df_2) is as follows

# rm(list=ls())
library(plyr)
library(doParallel)

workers=makeCluster(2)
registerDoParallel(workers,core=2)

plyr_test=function() {
  df_1=data.frame(type=c("a","b"),x=1:2)
  df_2=data.frame(type=c("a","b"),x=3:4)

  #export df_2 via .paropts  
  ddply(df_1,"type",.parallel=TRUE,.paropts=list(.export="df_2"),.fun=function(y) {
    merge(y,df_2,all=FALSE,by="type")
  })
}
plyr_test()
stopCluster(workers)

However, this throws an error

Error in e$fun(obj, substitute(ex), parent.frame(), e$data) : 
  unable to find variable "df_2"

So I did some research and found out that it works if I export df_2 manually

workers=makeCluster(2)
registerDoParallel(workers,core=2)

plyr_test_2=function() {
  df_1=data.frame(type=c("a","b"),x=1:2)
  df_2=data.frame(type=c("a","b"),x=3:4)

  #manually export df_2
  clusterExport(cl=workers,varlist=list("df_2"),envir=environment())

  ddply(df_1,"type",.parallel=TRUE,.fun=function(y) {
    merge(y,df_2,all=FALSE,by="type")
  })
}
plyr_test_2()
stopCluster(workers)

It gives the correct result

  type x.x x.y
1    a   1   3
2    b   2   4

But I have also found out that the following code works

workers=makeCluster(2)
registerDoParallel(workers,core=2)

plyr_test_3=function() {
  df_1=data.frame(type=c("a","b"),x=1:2)
  df_2=data.frame(type=c("a","b"),x=3:4)

  #no export at all!
  ddply(df_1,"type",.parallel=TRUE,.fun=function(y) {
    merge(y,df_2,all=FALSE,by="type")
  })
}
plyr_test_3()
stopCluster(workers)

plyr_test_3() also gives the correct result and I don't understand why. I would have thought that I have to export df_2...

My question is: What is the right way to deal with parallel *ply within functions? Obviously, plyr_test() is incorrect. I somehow have the feeling that the manual export in plyr_test_2() is useless. But I also think that plyr_test_3() is kind of bad coding style. Could someone please elaborate on that? Thanks guys!

like image 439
cryo111 Avatar asked Nov 01 '22 12:11

cryo111


2 Answers

The problem with plyr_test is that df_2 is defined in plyr_test which isn't accessible from the doParallel package, and therefore it fails when it tries to export df_2. So that is a scoping issue. plyr_test2 avoids this problem because is doesn't try to use the .export option, but as you guessed, the call to clusterExport is not needed.

The reason that both plyr_test2 and plyr_test3 succeed is that df_2 is serialized along with the anonymous function that is passed to the ddply function via the .fun argument. In fact, both df_1 and df_2 are serialized along with the anonymous function because that function is defined inside plyr_test2 and plyr_test3. It's helpful that df_2 is included in this case, but the inclusion of df_1 is unnecessary and may hurt your performance.

As long as df_2 is captured in the environment of the anonymous function, no other value of df_2 will ever be used, regardless of what you export. Unless you can prevent it from being captured, it is pointless to export it either with .export or clusterExport because the captured value will be used. You can only get yourself into trouble (as you did the .export) by trying to export it to the workers.

Note that in this case, foreach does not auto-export df_2 because it isn't able to analyze the body of the anonymous function to see what symbols are referenced. If you call foreach directly without using an anonymous function, then it will see the reference and auto-export it, making it unnecessary to explicitly export it using .export.

You could prevent the environment of plyr_test from being serialized along with the anonymous function by modifying it's environment before passing it to ddply:

plyr_test=function() {
  df_1=data.frame(type=c("a","b"),x=1:2)
  df_2=data.frame(type=c("a","b"),x=3:4)
  clusterExport(cl=workers,varlist=list("df_2"),envir=environment())
  fun=function(y) merge(y, df_2, all=FALSE, by="type")
  environment(fun)=globalenv()
  ddply(df_1,"type",.parallel=TRUE,.fun=fun)
}

One of the advantages of the foreach package is that it doesn't encourage you to create a function inside of another function that might be capturing a bunch of variables accidentally.


This issue suggests to me that foreach should include an option called .exportenv that is similar to the clusterExport envir option. That would be very helpful for plyr, since it would allow df_2 to be correctly exported using .export. However, that exported value still wouldn't be used unless the environment containing df_2 was removed from the .fun function.

like image 104
Steve Weston Avatar answered Nov 09 '22 15:11

Steve Weston


It looks like a scope issue.

Here is my "test suite" that allows me to .export different variables or avoid creating df_2 inside the function. I add and remove a dummy df_2 and df_3 outside of the function and compare.

library(plyr)
library(doParallel)

workers=makeCluster(2)
registerDoParallel(workers,core=2)

plyr_test=function(exportvar,makedf_2) {
  df_1=data.frame(type=c("a","b"),x=1:2)
  if(makedf_2){
    df_2=data.frame(type=c("a","b"),x=3:4)
  }
  print(ls())

  ddply(df_1,"type",.parallel=TRUE,.paropts=list(.export=exportvar,.verbose = TRUE),.fun=function(y) {
    z <- merge(y,df_2,all=FALSE,by="type")
  })
}
ls()
rm(df_2,df_3)
plyr_test("df_2",T)
plyr_test("df_2",F)
plyr_test("df_3",T)
plyr_test("df_3",F)
plyr_test(NULL,T) #ok
plyr_test(NULL,F)
df_2='hi'
ls()
plyr_test("df_2",T) #ok
plyr_test("df_2",F)
plyr_test("df_3",T)
plyr_test("df_3",F)
plyr_test(NULL,T) #ok
plyr_test(NULL,F)
df_3 = 'hi'
ls()
plyr_test("df_2",T) #ok
plyr_test("df_2",F)
plyr_test("df_3",T) #ok
plyr_test("df_3",F)
plyr_test(NULL,T) #ok
plyr_test(NULL,F)
rm(df_2)
ls()
plyr_test("df_2",T)
plyr_test("df_2",F)
plyr_test("df_3",T) #ok
plyr_test("df_3",F)
plyr_test(NULL,T) #ok
plyr_test(NULL,F)

I don't know why, but .export looks for df_2 in the global environment outside of the function, (I saw parent.env() in the code, which might be "more correct" than global environment) while the calculation requires the variable to be in the same environment as ddply and automatically exports it.

Using a dummy variable for df_2 outside of the function allows .export to work, while the calculation uses the df_2 inside.

When .export can't find the variable outside of the function, it outputs:

Error in e$fun(obj, substitute(ex), parent.frame(), e$data) : 
  unable to find variable "df_2" 

With a df_2 dummy variable outside of the function but without one inside, .export is fine but ddply outputs:

Error in do.ply(i) : task 1 failed - "object 'df_2' not found"

It's possible that since this is a small example or maybe not parallelizable, it's actually running on one core and avoiding the need to export anything. A bigger example might fail without the .export, but someone else can try that.

like image 34
ARobertson Avatar answered Nov 09 '22 15:11

ARobertson