The simplest way I've found so far to use a parallel lapply
in R was through the following example code:
library(parallel)
library(pbapply)
cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})
results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)
This has a very useful feature of providing a progress bar for the results, and is very easy to reuse the same code when no parallel computations are needed, by setting cl = NULL
.
However, one issue that I've noted is that the pblapply
is looping through the list in batches. For example, if one worker is stuck for a long time on a certain task, the remaining workers will wait for it to finish before starting a new batch of jobs. For certain tasks this adds a lot of unnecessary time to the workflow.
My question:
Are there any similar parallel frameworks that would allow for the workers to run independently? Progress bar and the ability to reuse the code with cl=NULL
would be a big plus.
Maybe it is possible to modify the existing code of pbapply
to add this option/feature?
(Disclaimer: I'm the author of the future framework and the progressr package)
A close solution that resembles base::lapply()
, and your pbapply::pblapply()
example, is to use the future.apply as:
library(future.apply)
## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
})
Chunking:
You can control the amount of chunking with argument future.chunk.size
or supplementary future.schedule
. To disable chunking such that each element is processed in a unique parallel task, use future.chunk.size=1
. This way, if there is one element that takes much longer than other elements, it will not hold up any other elements.
xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
Progress updates in parallel: If you want to receive progress updates when doing parallel processing, you can use progressr package and configure it to use the progress package to report updates as a progress bar (here also with an ETA).
library(future.apply)
plan(multisession, workers=4)
library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))
with_progress({
p <- progressor(along=xs)
results <- future_lapply(xs, FUN=function(x) {
p() ## signal progress
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
})
You can wrap this into a function, e.g.
my_fcn <- function(xs) {
p <- progressor(along=xs)
future_lapply(xs, FUN=function(x) {
p()
Sys.sleep(0.1)
sqrt(x)
}, future.chunk.size=1)
}
This way you can call it as a regular function:
> result <- my_fcn(xs)
and use plan()
to control exactly how you want it to parallelize. This will not report on progress. To do that, you'll have to do:
> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------] 9% 1m
Run everything in the background: If your question was how to run the whole shebang in the background, see the 'Future Topologies' vignette. That's another level of parallelization but it's possible.
You could use the furrr
package which uses future
to run purrr
in multiprocess mode :
library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ────────────────────────────── 100%
You can switch off parallel computations with plan(sequential)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With