The below code produces different results on Windows and Ubuntu platforms. I understand it is because of the different methods of handling parallel processing.
Summarizing:
I cannot insert
/ rbind
data on Linux parallely (mclapply
, mcmapply
) while I can do it on Windows.
Thanks @Hong Ooi for pointing out that
mclapply
does not works on Windows parallely, yet below question is still valid.
Of course there are no multiple inserts to same data.frame
, each insert is performed into separate data.frame.
library(R6)
library(parallel)
# storage objects generator
cl <- R6Class(
classname = "cl",
public = list(
data = data.frame(NULL),
initialize = function() invisible(self),
insert = function(x) self$data <- rbind(self$data, x)
)
)
N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))
# random data.frames
set.seed(1)
ldt <- lapply(i, function(i) data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))
# entity storage
lcl1 <- lapply(i, function(i) cl$new())
lcl2 <- lapply(i, function(i) cl$new())
lcl3 <- lapply(i, function(i) cl$new())
# insert data
invisible({
mclapply(names(i), FUN = function(n) lcl1[[n]]$insert(ldt[[n]]))
mcmapply(FUN = function(dt, cl) cl$insert(dt), ldt, lcl2, SIMPLIFY=FALSE)
lapply(names(i), FUN = function(n) lcl3[[n]]$insert(ldt[[n]]))
})
### Windows
sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
# n1 n2 n3 n4
# 100000 100000 100000 100000
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
# n1 n2 n3 n4
# 100000 100000 100000 100000
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
# n1 n2 n3 n4
# 100000 100000 100000 100000
### Unix
sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#n1 n2 n3 n4
# 0 0 0 0
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#n1 n2 n3 n4
# 0 0 0 0
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
# n1 n2 n3 n4
# 100000 100000 100000 100000
And the question:
rbind
parallely into separate data.frame
s on a Linux platform?P.S. Off-memory storage like SQLite
cannot be considered as solution in my case.
The problem is that mclapply
and mcmapply
are not intended to be used with functions that have side effects. Your function is modifying objects in lists, but mclapply
doesn't send the modified objects back to the master process: it only returns the values explicitly returned by the function. That means your results are lost when the workers exit as mclapply
returns.
Normally I would change the code to not depend on side effects, and return the objects that are modified. Here's one way to do that using clusterApply
so that it also works in parallel on Windows:
library(R6)
library(parallel)
cl <- R6Class(
classname = "cl",
public = list(
data = data.frame(NULL),
initialize = function() invisible(self),
insert = function(x) self$data <- rbind(self$data, x)))
N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))
set.seed(1)
ldt <- lapply(i, function(i)
data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))
nw <- 3 # number of workers
clust <- makePSOCKcluster(nw)
idx <- splitIndices(length(i), nw)
nameslist <- lapply(idx, function(iv) names(i)[iv])
lcl4 <- do.call('c', clusterApply(clust, nameslist,
function(nms, cl, ldt) {
library(R6)
lcl4 <- lapply(nms, function(n) cl$new())
names(lcl4) <- nms
lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
lcl4
}, cl, ldt))
This method doesn't work if you want to create the list of objects once and then modify the objects multiple times in parallel. That is also possible to do, but you must have persistent workers. In that case, you fetch the modified objects from the workers after all of the tasks are complete. Unfortunately, mclapply
doesn't use persistent workers, so in this case you must use the cluster-based functions such as clusterApply
. Here's one way to do it:
# Initialize the cluster workers
clusterEvalQ(clust, library(R6))
clusterExport(clust, c('cl', 'ldt'))
clusterApply(clust, nameslist, function(nms) {
x <- lapply(nms, function(n) cl$new())
names(x) <- nms
assign('lcl4', x, pos=.GlobalEnv)
NULL
})
# Insert data into lcl4 on each worker
clusterApply(clust, nameslist, function(nms) {
lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
NULL
})
# Concatenate lcl4 from each worker
lcl4 <- do.call('c', clusterEvalQ(clust, lcl4))
This is very similar to the previous method, except that it splits the process into three phases: worker initialization, task execution, and result retrieval. I also initialized the workers in a more conventional manner using clusterExport
and clusterEvalQ
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With