Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

R: asynchronous parallel lapply

The simplest way I've found so far to use a parallel lapply in R was through the following example code:

library(parallel)
library(pbapply)

cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})

results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)

This has a very useful feature of providing a progress bar for the results, and is very easy to reuse the same code when no parallel computations are needed, by setting cl = NULL.

However, one issue that I've noted is that the pblapply is looping through the list in batches. For example, if one worker is stuck for a long time on a certain task, the remaining workers will wait for it to finish before starting a new batch of jobs. For certain tasks this adds a lot of unnecessary time to the workflow.

My question: Are there any similar parallel frameworks that would allow for the workers to run independently? Progress bar and the ability to reuse the code with cl=NULL would be a big plus.

Maybe it is possible to modify the existing code of pbapply to add this option/feature?

like image 270
runr Avatar asked Sep 05 '25 16:09

runr


2 Answers

(Disclaimer: I'm the author of the future framework and the progressr package)

A close solution that resembles base::lapply(), and your pbapply::pblapply() example, is to use the future.apply as:

library(future.apply)

## The below is same as plan(multisession, workers=4)
cl <- parallel::makeCluster(4)
plan(cluster, workers=cl)

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
})

Chunking: You can control the amount of chunking with argument future.chunk.size or supplementary future.schedule. To disable chunking such that each element is processed in a unique parallel task, use future.chunk.size=1. This way, if there is one element that takes much longer than other elements, it will not hold up any other elements.

xs <- 1:100
results <- future_lapply(xs, FUN=function(x) {
  Sys.sleep(0.1)
  sqrt(x)
}, future.chunk.size=1)

Progress updates in parallel: If you want to receive progress updates when doing parallel processing, you can use progressr package and configure it to use the progress package to report updates as a progress bar (here also with an ETA).

library(future.apply)
plan(multisession, workers=4)

library(progressr)
handlers(handler_progress(format="[:bar] :percent :eta :message"))

with_progress({
  p <- progressor(along=xs)
  results <- future_lapply(xs, FUN=function(x) {
    p()  ## signal progress
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
})

You can wrap this into a function, e.g.

my_fcn <- function(xs) {
  p <- progressor(along=xs)
  future_lapply(xs, FUN=function(x) {
    p()
    Sys.sleep(0.1)
    sqrt(x)
  }, future.chunk.size=1)
}

This way you can call it as a regular function:

> result <- my_fcn(xs)

and use plan() to control exactly how you want it to parallelize. This will not report on progress. To do that, you'll have to do:

> with_progress(result <- my_fcn(xs))
[====>-----------------------------------------------------]   9%  1m

Run everything in the background: If your question was how to run the whole shebang in the background, see the 'Future Topologies' vignette. That's another level of parallelization but it's possible.

like image 196
HenrikB Avatar answered Sep 07 '25 06:09

HenrikB


You could use the furrr package which uses future to run purrr in multiprocess mode :

library(furrr)
plan(multisession, workers = nbrOfWorkers()-1)
nbrOfWorkers()
1:100 %>% future_map(~{Sys.sleep(1); rnorm(.x)},.progress = T)
Progress: ──────────────────────────────                                   100%

You can switch off parallel computations with plan(sequential)

like image 41
Waldi Avatar answered Sep 07 '25 06:09

Waldi