Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

writing to global environment when running in parallel

I have a data.frame of cells, values and coordinates. It resides in the global environment.

> head(cont.values)
   cell value   x   y
1 11117    NA -34 322
2 11118    NA -30 322
3 11119    NA -26 322
4 11120    NA -22 322
5 11121    NA -18 322
6 11122    NA -14 322

Because my custom function takes almost a second to calculate individual cell (and I have tens of thousands of cells to calculate) I don't want to duplicate calculations for cells that already have a value. My following solution tries to avoid that. Each cell can be calculated independently, screaming for parallel execution.

What my function actually does is check if there's a value for a specified cell number and if it's NA, it calculates it and inserts it in place of NA.

I can run my magic function (result is value for a corresponding cell) using apply family of functions and from within apply, I can read and write cont.values without a problem (it's in global environment).

Now, I want to run this in parallel (using snowfall) and I'm unable to read or write from/to this variable from individual core.

Question: What solution would be able to read/write from/to a dynamic variable residing in global environment from within worker (core) when executing a function in parallel. Is there a better approach of doing this?

like image 377
Roman Luštrik Avatar asked Jun 06 '11 11:06

Roman Luštrik


3 Answers

The pattern of a central store that workers consult for values is implemented in the rredis package on CRAN. The idea is that the Redis server maintains a store of key-value pairs (your global data frame, re-implemented). Workers query the server to see if the value has been calculated (redisGet) and if not do the calculation and store it (redisSet) so that other workers can re-use it. Workers can be R scripts, so it's easy to expand the work force. It's a very nice alternative parallel paradigm. Here's an example that uses the notion of 'memoizing' each result. We have a function that is slow (sleeps for a second)

fun <- function(x) { Sys.sleep(1); x }

We write a 'memoizer' that returns a variant of fun that first checks to see if the value for x has already been calculated, and if so uses that

memoize <-
    function(FUN)
{
    force(FUN) # circumvent lazy evaluation
    require(rredis)
    redisConnect()
    function(x)
    {
        key <- as.character(x)
        val <- redisGet(key)
        if (is.null(val)) {
            val <- FUN(x)
            redisSet(key, val)
        }
        val
    }
}

We then memoize our function

funmem <- memoize(fun)

and go

> system.time(res <- funmem(10)); res
   user  system elapsed 
  0.003   0.000   1.082 
[1] 10
> system.time(res <- funmem(10)); res
   user  system elapsed 
  0.001   0.001   0.040 
[1] 10

This does require a redis server running outside R but very easy to install; see the documentation that comes with the rredis package.

A within-R parallel version might be

library(snow)
cl <- makeCluster(c("localhost","localhost"), type = "SOCK")
clusterEvalQ(cl, { require(rredis); redisConnect() })
tasks <- sample(1:5, 100, TRUE)
system.time(res <- parSapply(cl, tasks, funmem))
like image 101
Martin Morgan Avatar answered Nov 07 '22 20:11

Martin Morgan


It will depend on what the function in question is, off course, but I'm afraid that snowfall won't be much of a help there. Thing is, you'll have to export the whole dataframe to the different cores (see ?sfExport) and still find a way to combine it. That kind of beats the whole purpose of changing the value in the global environment, as you probably want to keep memory use as low as possible.

You can dive into the low-level functions of snow to -kind of- get this to work. See following example :

#Some data
Data <- data.frame(
  cell = 1:10,
  value = sample(c(100,NA),10,TRUE),
  x = 1:10,
  y = 1:10
)
# A sample function
sample.func <- function(){
    id <- which(is.na(Data$value)) # get the NA values

    # this splits up the values from the dataframe in a list
    # which will be passed to clusterApply later on.
    parts <- lapply(clusterSplit(cl,id),function(i)Data[i,c("x","y")])

    # Here happens the magic
    Data$value[id] <<-
    unlist(clusterApply(cl,parts,function(x){
        x$x+x$y
      }
    ))
}
#now we run it
require(snow)
cl <- makeCluster(c("localhost","localhost"), type = "SOCK")
sample.func()
stopCluster(cl)
> Data
   cell value  x  y
1     1   100  1  1
2     2   100  2  2
3     3     6  3  3
4     4     8  4  4
5     5    10  5  5
6     6    12  6  6
7     7   100  7  7
8     8   100  8  8
9     9    18  9  9
10   10    20 10 10

You will still have to copy (part of) your data though to get it to the cores. But that will happen anyway when you call snowfall high level functions on dataframes, as snowfall uses the low-level function of snow anyway.

Plus, one shouldn't forget that if you change one value in a dataframe, the whole dataframe is copied in the memory as well. So you won't win that much by adding the values one by one when they come back from the cluster. You might want to try some different approaches and do some memory profiling as well.

like image 4
Joris Meys Avatar answered Nov 07 '22 21:11

Joris Meys


I agree with Joris that you will need to copy your data to the other cores. On the positive side, you don't have to worry about NA's being in the data or not, within the cores. If your original data.frame is called cont.values:

nnaidx<-is.na(cont.values$value) #where is missing data originally
dfrnna<-cont.values[nnaidx,] #subset for copying to other cores
calcValForDfrRow<-function(dfrRow){return(dfrRow$x+dfrRow$y)}#or whatever pleases you
sfExport(dfrnna, calcValForDfrRow) #export what is needed to other cores
cont.values$value[nnaidx]<-sfSapply(seq(dim(dfrnna)[1]), function(i){calcValForDfrRow(dfrnna[i,])}) #sfSapply handles 'reordering', so works exactly as if you had called sapply

should work nicely (barring typos)

like image 1
Nick Sabbe Avatar answered Nov 07 '22 22:11

Nick Sabbe