I have a generic chunking function that breaks big calls into smaller pieces and runs them in parallel.
chunk_it <- function(d, n, some_fun) {
# run n chunks of d in parallel
dat <- foreach(...) %doPar% {
some_fun(...)
}
}
I want to make it so that this generic chunking function can identify if it's being called by a process that's already in parallel (chunked in my terminology)
chunked_highlevel <- function(d, n, some_fun) {
# run n chunks of d in parallel
...
chunk_it(lowerlevel_d, n) # do not chunk!
}
What I would like to happen here is that if I have chunked the process at a higher level, that it does not activate the chunking function at the lower level.
Is there a way to identify when you're already inside a parallel process?
So, that we could code like this:
chunk_it <- function(d, n, some_fun) {
# run n chunks of d in parallel
if(!already_parallel) {
dat <- foreach(...) %doPar% {
some_fun(...)
}
} else {
dat <- some_fun()
}
}
I don't think there's an official way of doing this. However, in general there should be code evident in the call stack which makes it obvious whether you're in parallel code. What I've got so far looks like this. It seems to work for doSNOW with either MPI or SOCK, but will probably need adjustment for other packages that implement %dopar%. It's also dependent on some internal details of snow which may be subject to change in future versions.
library(doSNOW)
library(foreach)
my_fn <- function(bit) {
is_parallel <- any(unlist(lapply(sys.calls(), function(cal) {
as.character(cal[[1]]) %in% c("slaveLoop", "%dopar%")
})))
is_parallel
}
foreach(x = 1:2) %do% my_fn(x)
# [[1]]
# [1] FALSE
#
# [[2]]
# [1] FALSE
cl <- makeCluster(2)
registerDoSNOW()
foreach(x = 1:2) %dopar% my_fn(x)
# [[1]]
# [1] TRUE
#
# [[2]]
# [1] TRUE
The future package (I'm the author) has built in support for nested parallelism so that you do not have to worry about it as a developer while still giving the end user full power to control how and where parallelization is taking place.
Here's an example from one of the future vignettes:
library("future")
library("listenv")
x <- listenv()
for (ii in 1:3) {
x[[ii]] %<-% {
y <- listenv()
for (jj in 1:3) {
y[[jj]] %<-% { ii + jj/10 }
}
y
}
}
unlist(x)
## [1] 1.1 1.2 1.3 2.1 2.2 2.3 3.1 3.2 3.3
Note how there are two-layers of future assignments (%<-%). The default is to always process them sequentially unless specificiation says otherwise. For instance, to process the outer loop of future assignments in parallel on your local machine, use:
plan(multiprocess)
This will cause x[[ii]] %<-% { ... } for ii = 1, 2, 3 to run in parallel, while the contained y[[jj]] %<-% { ... } will run sequentially. The equivalent fully explicit setting for this is:
plan(list(multiprocess, sequential))
Now, if you want to run the outer loop of futures (x[[ii]]) sequentially and the inner loop of futures (y[[jj]]) in parallel, you can specify:
plan(list(sequential, multiprocess))
before running the code.
BTW, the number of parallel processes used with multiprocess is future::availableCores(). Think of it as parallel::detectCores() but that is also agile to mc.cores, HPC cluster environments etc. Importantly, future::availableCores() will return 1 if it's already running in parallel ("is a parallel child"). This means that if you do:
plan(list(multiprocess, multiprocess))
the inner layer of futures will actually only see a single core. You can think of this as a built-in automatic protection from creating a huge number of parallel processes by mistake through recursive parallelism.
You can force a different setting though (but not recommended). For instance, say you want the outer layer to run four parallel tasks at the same time, and each of those tasks to run two parallel tasks at the same time (on your local machine), then you can use:
plan(list(
tweak(multiprocess, workers = 4L),
tweak(multiprocess, workers = 2L)
))
That will run at most 4*2 = 8 parallel tasks at the same time (plus the master process).
If you have a set of machines available, you can do:
plan(list(
tweak(cliuster, workers = c("machine1", "machine2", "machine3")),
multiprocess
))
that will distribute the outer layer of futures (x[[ii]]) to those three machines, and the inner layer of futures (y[[ii]]) will run in parallel using all the available cores on those machines.
Note how the code doesn't change - only the settings (= plan() call). This is in the spirit of "write once, run wherever". There are many different future-strategy setups you can use; see the vignettes of the future package.
Now, what if you wanna use foreach()? You can use the doFuture %dopar% adapter that works on top of the future framework. For example,
library("doFuture")
registerDoFuture()
some_fun <- function(j) {
list(j = j, pid.j = Sys.getpid())
}
my_fun <- function(i) {
y <- foreach(j = 1:3) %dopar% { some_fun(j = j) }
list(i = i, pid.i = Sys.getpid(), y = y)
}
x <- foreach(i = 1:3) %dopar% { my_fun(i = i) }
Run the above and look at str(x) and its different PIDs for the different plan():s exemplified above. That'll illustrate what's going on.
Hope this helps
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With