Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

R parallel: rbind parallely into separate data.frames

The below code produces different results on Windows and Ubuntu platforms. I understand it is because of the different methods of handling parallel processing.

Summarizing:
I cannot insert / rbind data on Linux parallely (mclapply, mcmapply) while I can do it on Windows.

Thanks @Hong Ooi for pointing out that mclapply does not works on Windows parallely, yet below question is still valid.

Of course there are no multiple inserts to same data.frame, each insert is performed into separate data.frame.

library(R6)
library(parallel)

# storage objects generator
cl <- R6Class(
    classname = "cl",
    public = list(
        data = data.frame(NULL),
        initialize = function() invisible(self),
        insert = function(x) self$data <- rbind(self$data, x)
    )
)

N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))

# random data.frames
set.seed(1)
ldt <- lapply(i, function(i) data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))

# entity storage
lcl1 <- lapply(i, function(i) cl$new())
lcl2 <- lapply(i, function(i) cl$new())
lcl3 <- lapply(i, function(i) cl$new())

# insert data
invisible({
    mclapply(names(i), FUN = function(n) lcl1[[n]]$insert(ldt[[n]]))
    mcmapply(FUN = function(dt, cl) cl$insert(dt), ldt, lcl2, SIMPLIFY=FALSE)
    lapply(names(i), FUN = function(n) lcl3[[n]]$insert(ldt[[n]]))
})

### Windows

sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000

### Unix

sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#n1 n2 n3 n4
# 0  0  0  0
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#n1 n2 n3 n4
# 0  0  0  0
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000

And the question:

How can I achieve rbind parallely into separate data.frames on a Linux platform?

P.S. Off-memory storage like SQLite cannot be considered as solution in my case.

like image 669
jangorecki Avatar asked May 22 '15 12:05

jangorecki


1 Answers

The problem is that mclapply and mcmapply are not intended to be used with functions that have side effects. Your function is modifying objects in lists, but mclapply doesn't send the modified objects back to the master process: it only returns the values explicitly returned by the function. That means your results are lost when the workers exit as mclapply returns.

Normally I would change the code to not depend on side effects, and return the objects that are modified. Here's one way to do that using clusterApply so that it also works in parallel on Windows:

library(R6)
library(parallel)
cl <- R6Class(
    classname = "cl",
    public = list(
        data = data.frame(NULL),
        initialize = function() invisible(self),
        insert = function(x) self$data <- rbind(self$data, x)))

N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))
set.seed(1)
ldt <- lapply(i, function(i)
  data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))
nw <- 3  # number of workers
clust <- makePSOCKcluster(nw)
idx <- splitIndices(length(i), nw)
nameslist <- lapply(idx, function(iv) names(i)[iv])

lcl4 <- do.call('c', clusterApply(clust, nameslist, 
  function(nms, cl, ldt) {
    library(R6)
    lcl4 <- lapply(nms, function(n) cl$new())
    names(lcl4) <- nms
    lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
    lcl4
  }, cl, ldt))

This method doesn't work if you want to create the list of objects once and then modify the objects multiple times in parallel. That is also possible to do, but you must have persistent workers. In that case, you fetch the modified objects from the workers after all of the tasks are complete. Unfortunately, mclapply doesn't use persistent workers, so in this case you must use the cluster-based functions such as clusterApply. Here's one way to do it:

# Initialize the cluster workers
clusterEvalQ(clust, library(R6))
clusterExport(clust, c('cl', 'ldt'))
clusterApply(clust, nameslist, function(nms) {
  x <- lapply(nms, function(n) cl$new())
  names(x) <- nms
  assign('lcl4', x, pos=.GlobalEnv)
  NULL
})

# Insert data into lcl4 on each worker
clusterApply(clust, nameslist, function(nms) {
  lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
  NULL
})

# Concatenate lcl4 from each worker
lcl4 <- do.call('c', clusterEvalQ(clust, lcl4))

This is very similar to the previous method, except that it splits the process into three phases: worker initialization, task execution, and result retrieval. I also initialized the workers in a more conventional manner using clusterExport and clusterEvalQ.

like image 100
Steve Weston Avatar answered Oct 18 '22 15:10

Steve Weston