Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Shared memory in parallel foreach in R

Problem Description:

I have a big matrix c, loaded in RAM memory. My goal is through parallel processing to have read only access to it. However when I create the connections either I use doSNOW, doMPI, big.matrix, etc the amount to ram used increases dramatically.

Is there a way to properly create a shared memory, where all the processes may read from, without creating a local copy of all the data?

Example:

libs<-function(libraries){# Installs missing libraries and then load them
  for (lib in libraries){
    if( !is.element(lib, .packages(all.available = TRUE)) ) {
      install.packages(lib)
    }
    library(lib,character.only = TRUE)
  }
}

libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)

#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% {
  #load bigmemory
  require(bigmemory)
  # attach the matrix via shared memory??
  m <- attach.big.matrix(mdesc)
  #dummy expression to test data aquisition
  c<-m[1,1]
}
closeAllConnections()

RAM: Ram usage during <code>foreach</code> in the image above, you may find that the memory increases a lot until foreach ends and it is freed.

like image 445
Stanislav Avatar asked Jul 22 '15 23:07

Stanislav


2 Answers

I think the solution to the problem can be seen from the post of Steve Weston, the author of the foreach package, here. There he states:

The doParallel package will auto-export variables to the workers that are referenced in the foreach loop.

So I think the problem is that in your code your big matrix c is referenced in the assignment c<-m[1,1]. Just try xyz <- m[1,1] instead and see what happens.

Here is an example with a file-backed big.matrix:

#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double", 
                 separated = FALSE, 
                 backingfile = "example.bin", 
                 descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}

enter image description here

## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  invisible(c) ## c is referenced and thus exported to workers
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}
closeAllConnections()

enter image description here

like image 181
NoBackingDown Avatar answered Oct 19 '22 00:10

NoBackingDown


Alternatively, if you are on Linux/Mac and you want a CoW shared memory, use forks. First load all your data into the main thread, and then launch working threads (forks) with general function mcparallel from the parallel package.

You can collect their results with mccollect or with the use of truly shared memory using the Rdsm library, like this:

library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number

job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23

You can confirm, that the value really gets updated in backgruound, if you delay the write:

fn<-function()
{
  Sys.sleep(1) #One second delay
  shared[1]<-11
}

job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)

To control for concurency and avoid race conditions use locks:

library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"

bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
}

good.incr<-function()
{
  lock(m)
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
  unlock(m)
}

shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions

mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6 

mccollect()

Edit:

I simplified dependencies a bit by exchanging Rdsm::mgrmakevar into bigmemory::big.matrix. mgrmakevar internally calls big.matrix anyway, and we don't need anything more.

like image 43
Adam Ryczkowski Avatar answered Oct 19 '22 00:10

Adam Ryczkowski