Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Building a function for .combine in foreach

I have a process I want to do in parallel but I fail due to some strange error. Now I am considering to combine, and calculate the failing task on the master CPU. However I don't know how to write such a function for .combine.

How should it be written?

I know how to write them, for example this answer provides an example, but it doesn't provide how to handle with failing tasks, neither repeating a task on the master.

I would do something like:

foreach(i=1:100, .combine = function(x, y){tryCatch(?)} %dopar% {
    long_process_which_fails_randomly(i)
}

However, how do I use the input of that task in the .combine function (if it can be done)? Or should I provide inside the %dopar% to return a flag or a list to calculate it?

like image 711
llrs Avatar asked Oct 29 '22 16:10

llrs


1 Answers

To execute tasks in the combine function, you need to include extra information in the result object returned by the body of the foreach loop. In this case, that would be an error flag and the value of i. There are many ways to do this, but here's an example:

comb <- function(results, x) {
  i <- x$i
  result <- x$result
  if (x$error) {
    cat(sprintf('master computing failed task %d\n', i))
    # Could call function repeatedly until it succeeds,
    # but that could hang the master
    result <- try(fails_randomly(i))
  }
  results[i] <- list(result)  # guard against a NULL result
  results
}

r <- foreach(i=1:100, .combine='comb',
             .init=vector('list', 100)) %dopar% {
  tryCatch({
    list(error=FALSE, i=i, result=fails_randomly(i))
  },
  error=function(e) {
    list(error=TRUE, i=i, result=e)
  })
}

I'd be tempted to deal with this problem by executing the parallel loop repeatedly until all the tasks have been computed:

x <- rnorm(100)
results <- lapply(x, function(i) simpleError(''))

# Might want to put a limit on the number of retries
repeat {
  ix <- which(sapply(results, function(x) inherits(x, 'error')))
  if (length(ix) == 0)
    break

  cat(sprintf('computing tasks %s\n', paste(ix, collapse=',')))
  r <- foreach(i=x[ix], .errorhandling='pass') %dopar% {
    fails_randomly(i)
  }

  results[ix] <- r
}

Note that this solution uses the .errorhandling option which is very useful if errors can occur. For more information on this option, see the foreach man page.

like image 182
Steve Weston Avatar answered Nov 12 '22 19:11

Steve Weston