I would like to parallelize a loop like
td <- data.frame(cbind(c(rep(1,4),2,rep(1,5)),rep(1:10,2)))
names(td) <- c("val","id")
res <- rep(NA,NROW(td))
for(i in levels(interaction(td$id))){
res[td$id==i] <- mean(td$val[td$id!=i])
}
with the help of foreach() of the library(doParallel) in order to speed up computations. Unfortunately foreach doesn't seem to support direct assignments, at least
registerDoParallel(4)
res <- rep(NA,NROW(td))
foreach(i=levels(interaction(td$id))) %dopar%{
res[td$id==i] <- mean(td$val[td$id!=i])}
doesn't do what I want (give the same result as the normal loop above). Any ideas what I am doing wrong or how I could somehow "hack" the .combine option in foreach in order to do what I want? Please note that the order of the id variable is not always the same in the original data set. Any hint would be very much appreciated!
forEach() executes the callbackFn function once for each array element; unlike map() or reduce() it always returns the value undefined and is not chainable.
Using return in a forEach() is equivalent to a continue in a conventional loop.
The foreach loop is mainly used for looping through the values of an array. It loops over the array, and each value for the current array element is assigned to $value, and the array pointer is advanced by one to go the next element in the array.
To perform these computations efficiently in parallel, you need to use chunking since the individual mean calculations don't take much time. When using foreach
, I often use functions from the itertools
package for chunking. In this case, I use the isplitVector
function in order to generate one task per worker. The results are vectors, so they are combined by simply adding them together, which is why the r
vector must be initialized to a vector of zeros.
vadd <- function(a, ...) {
for (v in list(...))
a <- a + v
a
}
res <- foreach(ids=isplitVector(unique(td$id), chunks=workers),
.combine='vadd',
.multicombine=TRUE,
.inorder=FALSE) %dopar% {
r <- rep(0, NROW(td))
for (i in ids)
r[td$id == i] <- mean(td$val[td$id != i])
r
}
This is a classic example of putting the original sequential version in the foreach
loop, but only operating on a subset of the data. Since there is only one result to combine for each worker, there is very little post-processing, so it runs quite efficiently.
To see how this performed, I benchmarked it against the sequential version and against Rolands's data table version using the following data set:
set.seed(107)
n <- 1000000
m <- 10000
td <- data.frame(val=rnorm(n), id=sample(m, n, replace=TRUE))
I include this because the performance is very data dependent. You could even get different performance results by using a different random seed.
Here are some benchmark results from my Linux box with a Xeon CPU X5650 and 12 GB of RAM:
So for at least one data set, it is worthwhile to execute this computation in parallel. It's not a perfect speedup, but it's not too bad. In order to run any of these benchmarks on your own machine, or with a different data set, you can download them from pastebin via the links above.
Update
After working on these benchmarks, I was interested in using data.table
with foreach
to get an even faster version. This is what I came up with (with advice from Matthew Dowle):
cmean <- function(v, mine) if (mine) mean(v) else 0
nuniq <- length(unique(td$id))
res <- foreach(grps=isplitIndices(nuniq, chunks=workers),
.combine='vadd',
.multicombine=TRUE,
.inorder=FALSE,
.packages='data.table') %dopar% {
td[, means := cmean(td$val[-.I], .GRP %in% grps), by=id]
td$means
}
td
is now a data.table
object. I used isplitIndices
from the itertools
package to generate vectors of group numbers associated with each task chunk. The cmean
function is a wrapper around mean
that returns zero for groups that shouldn't be computed in that task chunk. It uses the same combine function as the non-data table version since the task results are the same.
With four workers and the same data set, this version ran in 56.4 seconds, which is a speedup of 3.7 compared to the sequential data table version, making it the clear winner at 6.4 times faster than the sequential for loop. The benchmark can be downloaded from pastebin here.
Your performance gain will be better by orders of magnitude if you use data.table for this instead of parallelization of a loop:
library(data.table)
DT <- data.table(td)
DT[, means := mean(DT[-.I, val]), by = id]
identical(DT$means, res)
#[1] TRUE
If you want to use foreach
you'll need to combine it with a merge
:
library(foreach)
res2 <- foreach(i=levels(interaction(td$id)), .combine=rbind) %do% {
data.frame(level = i, means = mean(td$val[td$id!=i]))}
res2 <- merge(res2, td, by.x = "level", by.y = "id", sort = FALSE)
# level means val
# 1 1 1.111111 1
# 2 1 1.111111 1
# 3 2 1.111111 1
# 4 2 1.111111 1
# 5 3 1.111111 1
# 6 3 1.111111 1
# 7 4 1.111111 1
# 8 4 1.111111 1
# 9 5 1.000000 2
# 10 5 1.000000 2
# 11 6 1.111111 1
# 12 6 1.111111 1
# 13 7 1.111111 1
# 14 7 1.111111 1
# 15 8 1.111111 1
# 16 8 1.111111 1
# 17 9 1.111111 1
# 18 9 1.111111 1
# 19 10 1.111111 1
# 20 10 1.111111 1
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With