Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Execute foreach loop in parallel or sequentially given a condition

I often end up with several nested foreach loops and sometimes when writing general functions (e.g. for a package) there is no level which is obvious to parallelize at. Is there any way to accomplish what the mock-up below describes?

foreach(i = 1:I) %if(I < J) `do` else `dopar`% {
    foreach(j = 1:J) %if(I >= J) `do` else `dopar`% {
        # Do stuff
    }
}

Furthermore, is there some way to detect if a parallel backend is registered so I can avoid getting unnecessary warning messages? This would be useful both when checking packages prior to CRAN submission and to not bother users running R on single core computers.

foreach(i=1:I) %if(is.parallel.backend.registered()) `dopar` else `do`% {
    # Do stuff
}

Thanks for your time.

Edit: Thank you very much for all the feedback on cores and workers and you're right in that the best way to deal with the above example would be to rethink the whole setup. I'd prefer something like to below to the triu idea but it's essentially the same point. And it could of course also be done with a parallel tapply like Joris suggested.

ij <- expand.grid(i=1:I, j=1:J)
foreach(i=ij$I, j=ij$J) %dopar% {
    myFuction(i, j)
}

However, in my attempt to simplify the situation that gave rise to this thread I left out some crucial details. Imagine that I have two functions analyse and batch.analyse and the best level to parallelize at might be different depending on the values of n.replicates and n.time.points.

analyse <- function(x, y, n.replicates=1000){
    foreach(r = 1:n.replicates) %do% {
        # Do stuff with x and y
    }
}
batch.analyse <- function(x, y, n.replicates=10, n.time.points=1000){
    foreach(tp = 1:time.points) %do% {
        my.y <- my.func(y, tp)
        analyse(x, my.y, n.replicates)
    }
}

If n.time.points > n.replicates it makes sense to parallelize in batch.analyse but otherwise it makes more sense to parallelize in analyse. Any ideas on how to tackle it? Would it somehow be possible to detect in analyse if parallelization has already taken place?

like image 621
Backlin Avatar asked Jul 29 '11 10:07

Backlin


People also ask

Does ForEach run sequentially?

ForEach-Object has no parallelism. It executes each loop sequentially.

Can we put condition in ForEach?

No, a foreach simply works for each element.

Is ForEach parallel?

ForEach loop works like a Parallel. For loop. The loop partitions the source collection and schedules the work on multiple threads based on the system environment. The more processors on the system, the faster the parallel method runs.

What is nested ForEach?

Nesting ForEach Tags consists of creating two loops, an inner loop and an outer loop, such that for each item returned by the outer loop, the inner loop executes completely and returns its items associated to the outer loop's item.


3 Answers

The issue that you raise was the motivation for the foreach nesting operator, '%:%'. If the body of the inner loop takes a substantial amount of compute time, you're pretty safe using:

foreach(i = 1:I) %:%
    foreach(j = 1:J) %dopar% {
        # Do stuff
    }

This "unrolls" the nested loops, resulting in (I * J) tasks that can all be executed in parallel.

If the body of the inner loop doesn't take much time, the solution is more difficult. The standard solution is to parallelize the outer loop, but that could still result in either many small tasks (when I is large and J is small) or a few large tasks (when I is small and J is large).

My favorite solution is to use the nesting operator with task chunking. Here's a complete example using the doMPI backend:

library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
I <- 100; J <- 2
opt <- list(chunkSize=10)
foreach(i = 1:I, .combine='cbind', .options.mpi=opt) %:%
    foreach(j = 1:J, .combine='c') %dopar% {
        (i * j)
    }
closeCluster(cl)

This results in 20 "task chunks", each consisting of 10 computations of the loop body. If you want to have a single task chunk for each worker, you can compute the chunk size as:

cs <- ceiling((I * J) / getDoParWorkers())
opt <- list(chunkSize=cs)

Unfortunately, not all parallel backends support task chunking. Also, doMPI doesn't support Windows.

For more information on this topic, see my vignette "Nesting Foreach Loops" in the foreach package:

library(foreach)
vignette('nesting')
like image 127
Steve Weston Avatar answered Oct 06 '22 23:10

Steve Weston


If you end up with several nested foreach loops, I'd rethink my approach. Using parallel versions of tapply can solve a lot of that hassle. In general, you shouldn't use nested parallelization, as that doesn't bring you anything. Parallelize the outer loop, and forget about the inner loop.

Reason is simple : if you have 3 connections in your cluster, the outer dopar loop will use all three. The inner dopar loop will not be able to use any extra connections, as there are none available. So you don't gain a thing. Hence, the mock-up you give doesn't make sense at all from a programming point of view.

Your second question is answered pretty easily by the function getDoParRegistered() which returns TRUE when a backend is registered, and FALSE otherwise. Pay attention though :

  • it also returns TRUE if a sequential backend is registered (i.e. after calling registerDoSEQ).
  • It will return TRUE as well after a cluster has been stopped, but in that case %dopar% will return an error.

eg :

require(foreach)
require(doSNOW)
cl <- makeCluster(rep("localhost",2),type="SOCK")
getDoParRegistered()
[1] FALSE
registerDoSNOW(cl)
getDoParRegistered()
[1] TRUE
stopCluster(cl)
getDoParRegistered()
[1] TRUE

But now running this code :

a <- matrix(1:16, 4, 4)
b <- t(a)
foreach(b=iter(b, by='col'), .combine=cbind) %dopar%
  (a %*% b)

will return in an error :

Error in summary.connection(connection) : invalid connection

You could build an extra check. A (hideously ugly) hack you can use to check that the connection registered by doSNOW is valid, can be :

isvalid <- function(){
    if (getDoParRegistered() ){
      X <- foreach:::.foreachGlobals$objs[[1]]$data
      x <- try(capture.output(print(X)),silent=TRUE)
      if(is(x,"try-error")) FALSE else TRUE
    } else {
      FALSE
    }
}

Which you could use as

if(!isvalid()) registerDoSEQ()

This will register the sequential backend if getDoParRegistered() returns TRUE but there is no valid cluster connection any longer. But again, this is a hack, and I have no idea if it works with other backends or even other types of cluster types (I use sockets mostly).

like image 34
Joris Meys Avatar answered Oct 06 '22 23:10

Joris Meys


In reverse order of the questions you asked:

  1. @Joris is correct regarding checking for a registered parallel backend. However, note that there is a difference between a machine being single core and whether or not a parallel backend is registered. Checking the # of cores is a very platform (operating system) specific task. On Linux, this may work for you:

    CountUnixCPUs  <- function(cpuinfo = "/proc/cpuinfo"){
    tmpCmd  <- paste("grep processor ", cpuinfo, " | wc -l", sep = "")
    numCPU  <- as.numeric(system(tmpCmd, intern = TRUE))
    return(numCPU)
    }
    

    Edit: See @Joris's link to another page, below, which gives advice for Windows and Linux. I will likely rewrite my own code, at least to include more options for counting cores.

  2. Regarding the nested loops, I take a different tack: I prepare a table of parameters and then iterate over rows. A very simple way is, e.g.:

    library(Matrix)
    ptable <- which(triu(matrix(1, ncol = 20, nrow = 20))==1, arr.ind = TRUE)
    foreach(ix_row = 1:nrow(ptable)) %dopar% { myFunction(ptable[ix_row,])}
    
like image 3
Iterator Avatar answered Oct 07 '22 00:10

Iterator