I want to use the parallel functionality of the plyr
package within functions.
I would have thought that the proper way to export objects that have been created within the body of the function (in this example, the object is df_2
) is as follows
# rm(list=ls())
library(plyr)
library(doParallel)
workers=makeCluster(2)
registerDoParallel(workers,core=2)
plyr_test=function() {
df_1=data.frame(type=c("a","b"),x=1:2)
df_2=data.frame(type=c("a","b"),x=3:4)
#export df_2 via .paropts
ddply(df_1,"type",.parallel=TRUE,.paropts=list(.export="df_2"),.fun=function(y) {
merge(y,df_2,all=FALSE,by="type")
})
}
plyr_test()
stopCluster(workers)
However, this throws an error
Error in e$fun(obj, substitute(ex), parent.frame(), e$data) :
unable to find variable "df_2"
So I did some research and found out that it works if I export df_2
manually
workers=makeCluster(2)
registerDoParallel(workers,core=2)
plyr_test_2=function() {
df_1=data.frame(type=c("a","b"),x=1:2)
df_2=data.frame(type=c("a","b"),x=3:4)
#manually export df_2
clusterExport(cl=workers,varlist=list("df_2"),envir=environment())
ddply(df_1,"type",.parallel=TRUE,.fun=function(y) {
merge(y,df_2,all=FALSE,by="type")
})
}
plyr_test_2()
stopCluster(workers)
It gives the correct result
type x.x x.y
1 a 1 3
2 b 2 4
But I have also found out that the following code works
workers=makeCluster(2)
registerDoParallel(workers,core=2)
plyr_test_3=function() {
df_1=data.frame(type=c("a","b"),x=1:2)
df_2=data.frame(type=c("a","b"),x=3:4)
#no export at all!
ddply(df_1,"type",.parallel=TRUE,.fun=function(y) {
merge(y,df_2,all=FALSE,by="type")
})
}
plyr_test_3()
stopCluster(workers)
plyr_test_3()
also gives the correct result and I don't understand why. I would have thought that I have to export df_2
...
My question is: What is the right way to deal with parallel *ply
within functions? Obviously, plyr_test()
is incorrect. I somehow have the feeling that the manual export in plyr_test_2()
is useless. But I also think that plyr_test_3()
is kind of bad coding style. Could someone please elaborate on that? Thanks guys!
The problem with plyr_test
is that df_2
is defined in plyr_test
which isn't accessible from the doParallel
package, and therefore it fails when it tries to export df_2
. So that is a scoping issue. plyr_test2
avoids this problem because is doesn't try to use the .export
option, but as you guessed, the call to clusterExport
is not needed.
The reason that both plyr_test2
and plyr_test3
succeed is that df_2
is serialized along with the anonymous function that is passed to the ddply
function via the .fun
argument. In fact, both df_1
and df_2
are serialized along with the anonymous function because that function is defined inside plyr_test2
and plyr_test3
. It's helpful that df_2
is included in this case, but the inclusion of df_1
is unnecessary and may hurt your performance.
As long as df_2
is captured in the environment of the anonymous function, no other value of df_2
will ever be used, regardless of what you export. Unless you can prevent it from being captured, it is pointless to export it either with .export
or clusterExport
because the captured value will be used. You can only get yourself into trouble (as you did the .export
) by trying to export it to the workers.
Note that in this case, foreach does not auto-export df_2
because it isn't able to analyze the body of the anonymous function to see what symbols are referenced. If you call foreach directly without using an anonymous function, then it will see the reference and auto-export it, making it unnecessary to explicitly export it using .export
.
You could prevent the environment of plyr_test
from being serialized along with the anonymous function by modifying it's environment before passing it to ddply
:
plyr_test=function() {
df_1=data.frame(type=c("a","b"),x=1:2)
df_2=data.frame(type=c("a","b"),x=3:4)
clusterExport(cl=workers,varlist=list("df_2"),envir=environment())
fun=function(y) merge(y, df_2, all=FALSE, by="type")
environment(fun)=globalenv()
ddply(df_1,"type",.parallel=TRUE,.fun=fun)
}
One of the advantages of the foreach
package is that it doesn't encourage you to create a function inside of another function that might be capturing a bunch of variables accidentally.
This issue suggests to me that foreach
should include an option called .exportenv
that is similar to the clusterExport
envir
option. That would be very helpful for plyr
, since it would allow df_2
to be correctly exported using .export
. However, that exported value still wouldn't be used unless the environment containing df_2
was removed from the .fun
function.
It looks like a scope issue.
Here is my "test suite" that allows me to .export different variables or avoid creating df_2 inside the function. I add and remove a dummy df_2 and df_3 outside of the function and compare.
library(plyr)
library(doParallel)
workers=makeCluster(2)
registerDoParallel(workers,core=2)
plyr_test=function(exportvar,makedf_2) {
df_1=data.frame(type=c("a","b"),x=1:2)
if(makedf_2){
df_2=data.frame(type=c("a","b"),x=3:4)
}
print(ls())
ddply(df_1,"type",.parallel=TRUE,.paropts=list(.export=exportvar,.verbose = TRUE),.fun=function(y) {
z <- merge(y,df_2,all=FALSE,by="type")
})
}
ls()
rm(df_2,df_3)
plyr_test("df_2",T)
plyr_test("df_2",F)
plyr_test("df_3",T)
plyr_test("df_3",F)
plyr_test(NULL,T) #ok
plyr_test(NULL,F)
df_2='hi'
ls()
plyr_test("df_2",T) #ok
plyr_test("df_2",F)
plyr_test("df_3",T)
plyr_test("df_3",F)
plyr_test(NULL,T) #ok
plyr_test(NULL,F)
df_3 = 'hi'
ls()
plyr_test("df_2",T) #ok
plyr_test("df_2",F)
plyr_test("df_3",T) #ok
plyr_test("df_3",F)
plyr_test(NULL,T) #ok
plyr_test(NULL,F)
rm(df_2)
ls()
plyr_test("df_2",T)
plyr_test("df_2",F)
plyr_test("df_3",T) #ok
plyr_test("df_3",F)
plyr_test(NULL,T) #ok
plyr_test(NULL,F)
I don't know why, but .export looks for df_2 in the global environment outside of the function, (I saw parent.env() in the code, which might be "more correct" than global environment) while the calculation requires the variable to be in the same environment as ddply and automatically exports it.
Using a dummy variable for df_2 outside of the function allows .export to work, while the calculation uses the df_2 inside.
When .export can't find the variable outside of the function, it outputs:
Error in e$fun(obj, substitute(ex), parent.frame(), e$data) :
unable to find variable "df_2"
With a df_2 dummy variable outside of the function but without one inside, .export is fine but ddply outputs:
Error in do.ply(i) : task 1 failed - "object 'df_2' not found"
It's possible that since this is a small example or maybe not parallelizable, it's actually running on one core and avoiding the need to export anything. A bigger example might fail without the .export, but someone else can try that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With